X-Git-Url: http://dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=81c2e40a0090aa09ecdeba30e94dd01ae6cd70a6;hb=refs%2Fheads%2Fnewusers;hp=ad09c85da0f784d98b12634f01ad1a0e421144d5;hpb=d2b28488d70d97c2e467cd7c57077024b7241b45;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index ad09c85d..81c2e40a 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -20,7 +20,7 @@ use Mojo::IOLoop::Stream; use DXDebug; use Timer; -use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout); +use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout $disc_waittime); $total_in = $total_out = 0; @@ -28,6 +28,9 @@ $now = time; $cnum = 0; $connect_timeout = 5; +$disc_waittime = 1.5; + +our %delqueue; # #----------------------------------------------------------------- @@ -143,7 +146,7 @@ sub _on_connect $sock->timeout(0); $sock->start; $conn->{peerhost} = eval { $handle->peerhost; }; - dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('connll'); + dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('conn') || isdbg ('connect'); if ($conn->{on_connect}) { &{$conn->{on_connect}}($conn, $handle); } @@ -174,9 +177,18 @@ sub connect { my $sock; $conn->{sock} = $sock = Mojo::IOLoop::Client->new; - $sock->on(connect => sub {$conn->_on_connect($_[1])} ); - $sock->on(error => sub {&{$conn->{eproc}}($conn, $_[1]) if exists $conn->{eproc}; $conn->disconnect}); - $sock->on(close => sub {$conn->disconnect}); + $sock->on(connect => sub { + $conn->_on_connect($_[1]) + } ); + $sock->on(error => sub { + &{$conn->{eproc}}($conn, $_[1]) if exists $conn->{eproc}; + delete $conn->{sock}; + $conn->disconnect + }); + $sock->on(close => sub { + delete $conn->{sock}; + $conn->disconnect} + ); # copy any args like on_connect, on_disconnect etc while (my ($k, $v) = each %args) { @@ -237,16 +249,22 @@ sub start_program return $pid; } -sub disconnect +sub disconnect { - my $conn = shift; - return if exists $conn->{disconnecting}; - - $conn->{disconnecting} = 1; - my $sock = delete $conn->{sock}; - $conn->{state} = 'E'; - $conn->{timeout}->del if $conn->{timeout}; + my $conn = shift; + my $count = $conn->{disconnecting}++; + my $dbg = isdbg('connll'); + my ($pkg, $fn, $line) = caller if $dbg; + + if ($count >= 2) { + dbgtrace((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line FORCING CLOSE ") if $dbg; + _close_it($conn); + return; + } + dbg((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line ") if $dbg; + return if $count; + # remove this conn from the active queue # be careful to delete the correct one my $call; if ($call = $conn->{call}) { @@ -254,12 +272,52 @@ sub disconnect delete $conns{$call} if $ref && $ref == $conn; } $call ||= 'unallocated'; - dbg((ref $conn) . " Connection $conn->{cnum} $call disconnected") if isdbg('connll'); + + $delqueue{$conn} = $conn; # save this connection until everything is finished + my $sock = $conn->{sock}; + if ($sock) { + if ($sock->{buffer}) { + my $lth = length $sock->{buffer}; + Mojo::IOLoop->timer($disc_waittime, sub { + dbg("Buffer contained $lth characters, coordinated for $disc_waittime secs, now disconnecting $call") if $dbg; + _close_it($conn); + }); + } else { + dbg("Buffer empty, just close $call") if $dbg; + _close_it($conn); + } + } else { + dbg((ref $conn) . " socket missing on $conn->{call}") if $dbg; + _close_it($conn); + } +} + +sub _close_it +{ + my $conn = shift; + my $sock = delete $conn->{sock}; + $conn->{state} = 'E'; + $conn->{timeout}->del if $conn->{timeout}; + + my $call = $conn->{call}; + + if (isdbg('connll')) { + my ($pkg, $fn, $line) = caller; + dbg((ref $conn) . "::_close_it on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line "); + } + + + dbg((ref $conn) . " Connection $conn->{cnum} $call starting to close") if isdbg('connll'); if ($conn->{on_disconnect}) { &{$conn->{on_disconnect}}($conn); } + if ($sock) { + dbg((ref $conn) . " Connection $conn->{cnum} $call closing gracefully") if isdbg('connll'); + $sock->close_gracefully; + } + # get rid of any references for (keys %$conn) { if (ref($conn->{$_})) { @@ -267,8 +325,7 @@ sub disconnect } } - $sock->close_gracefully if defined $sock && $sock->can('close_gracefully'); - undef $sock; + delete $delqueue{$conn}; # finally remove the $conn unless ($main::is_win) { kill 'TERM', $conn->{pid} if exists $conn->{pid}; @@ -425,9 +482,11 @@ sub new_client { $sock->on(read => sub {$conn->_rcv($_[1])}); $sock->timeout(0); $sock->start; - dbg((ref $conn) . "accept $conn->{cnum} from $conn->{peerhost} $conn->{peerport}") if isdbg('connll'); - - my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $handle->peerhost, $conn->{peerport} = $handle->peerport); + $conn->{peerhost} = $handle->peerhost || 'unknown'; + $conn->{peerhost} =~ s|^::ffff:||; # chop off leading pseudo IPV6 stuff on dual stack listeners + $conn->{peerport} = $handle->peerport || 0; + dbg((ref $conn) . " accept $conn->{cnum} from $conn->{peerhost}:$conn->{peerport}") if isdbg('conn') || isdbg('connect'); + my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost}, $conn->{peerport}); $conn->{sort} = 'Incoming'; if ($eproc) { $conn->{eproc} = $eproc; @@ -490,12 +549,21 @@ sub sleep sub DESTROY { my $conn = shift; + my $call = $conn->{call} || 'unallocated'; + + if (isdbg('connll')) { + my ($pkg, $fn, $line) = caller; + dbgtrace((ref $conn) . "::DESTROY on call $call called from ${pkg}::${fn} line $line "); + } + my $call = $conn->{call} || 'unallocated'; my $host = $conn->{peerhost} || ''; my $port = $conn->{peerport} || ''; my $sock = $conn->{sock}; - $sock->close_gracefully if defined $sock && $sock->can('close_gracefully'); + if ($sock) { + $sock->close_gracefully; + } $noconns--; dbg((ref $conn) . " Connection $conn->{cnum} $call [$host $port] being destroyed (total $noconns)") if isdbg('connll');