X-Git-Url: http://dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;fp=perl%2FMsg.pm;h=e3385d9166585436dad106bd94e8b94aa6b0c411;hb=abbcfa7500858a2eba4135b0af5db9f3fca8d68e;hp=ad09c85da0f784d98b12634f01ad1a0e421144d5;hpb=1eabcdd60799121b40f4d2ddb3b977fa0b2aeb94;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index ad09c85d..e3385d91 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -20,7 +20,7 @@ use Mojo::IOLoop::Stream; use DXDebug; use Timer; -use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout); +use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout $disc_waittime); $total_in = $total_out = 0; @@ -28,6 +28,9 @@ $now = time; $cnum = 0; $connect_timeout = 5; +$disc_waittime = 3; + +our %delqueue; # #----------------------------------------------------------------- @@ -237,16 +240,61 @@ sub start_program return $pid; } -sub disconnect +sub disconnect { - my $conn = shift; - return if exists $conn->{disconnecting}; + my $conn = shift; + my $count = $conn->{disconnecting}++; + if (isdbg('connll')) { + my ($pkg, $fn, $line) = caller; + dbg((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line "); + } + return if $count; + + + my $sock = $conn->{sock}; + if ($sock) { + + # remove me from the active list + my $call; + if ($call = $conn->{call}) { + my $ref = $conns{$call}; + delete $conns{$call} if $ref && $ref == $conn; + } + $conn->{delay} = Mojo::IOLoop->delay ( +# Mojo::IOLoop->delay ( + sub { + my $delay = shift; + dbg("before drain $call"); + $sock->on(drain => $delay->begin); + 1; + }, + sub { + my $delay = shift; + _close_it($conn); + 1; + } + ); + $conn->{delay}->wait; + + $delqueue{$conn} = $conn; # save this connection until everything is finished + } else { + dbg((ref $conn) . " socket missing on $conn->{call}") if isdbg('connll'); + _close_it($conn); + } +} - $conn->{disconnecting} = 1; +sub _close_it +{ + my $conn = shift; my $sock = delete $conn->{sock}; $conn->{state} = 'E'; $conn->{timeout}->del if $conn->{timeout}; + if (isdbg('connll')) { + my ($pkg, $fn, $line) = caller; + dbg((ref $conn) . "::_close_it on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line "); + } + # be careful to delete the correct one my $call; if ($call = $conn->{call}) { @@ -254,12 +302,18 @@ sub disconnect delete $conns{$call} if $ref && $ref == $conn; } $call ||= 'unallocated'; - dbg((ref $conn) . " Connection $conn->{cnum} $call disconnected") if isdbg('connll'); + + dbg((ref $conn) . " Connection $conn->{cnum} $call starting to close") if isdbg('connll'); if ($conn->{on_disconnect}) { &{$conn->{on_disconnect}}($conn); } + if ($sock) { + dbg((ref $conn) . " Connection $conn->{cnum} $call closing gracefully") if isdbg('connll'); + $sock->close_gracefully; + } + # get rid of any references for (keys %$conn) { if (ref($conn->{$_})) { @@ -267,8 +321,7 @@ sub disconnect } } - $sock->close_gracefully if defined $sock && $sock->can('close_gracefully'); - undef $sock; + delete $delqueue{$conn}; # finally remove the $conn unless ($main::is_win) { kill 'TERM', $conn->{pid} if exists $conn->{pid}; @@ -490,12 +543,22 @@ sub sleep sub DESTROY { my $conn = shift; + my $call = $conn->{call} || 'unallocated'; + + if (isdbg('connll')) { + my ($pkg, $fn, $line) = caller; + dbg((ref $conn) . "::DESTROY on call $call called from ${pkg}::${fn} line $line "); + + } + my $call = $conn->{call} || 'unallocated'; my $host = $conn->{peerhost} || ''; my $port = $conn->{peerport} || ''; my $sock = $conn->{sock}; - $sock->close_gracefully if defined $sock && $sock->can('close_gracefully'); + if ($sock) { + $sock->close_gracefully; + } $noconns--; dbg((ref $conn) . " Connection $conn->{cnum} $call [$host $port] being destroyed (total $noconns)") if isdbg('connll');