X-Git-Url: http://dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=2028f8b1da8403498950c746c2c3ab54c2408140;hb=30dbf70f84b53174005810f64f546d2181e1a8c6;hp=e5fa41a8f3bce122e574f0728660d31899875fe9;hpb=c1eb1d4013a7d748c0fc22f778ddb719dc151a1b;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index e5fa41a8..2028f8b1 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -32,7 +32,8 @@ use Carp; use strict; use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean - @badmsg $badmsgfn $forwardfn @forward $timeout $waittime); + @badmsg $badmsgfn $forwardfn @forward $timeout $waittime + $queueinterval $lastq); %work = (); # outstanding jobs @msg = (); # messages we have @@ -43,33 +44,36 @@ $last_clean = 0; # last time we did a clean @forward = (); # msg forward table $timeout = 30*60; # forwarding timeout $waittime = 60*60; # time an aborted outgoing message waits before trying again +$queueinterval = 5*60; # run the queue every 5 minutes +$lastq = 0; + $badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store $forwardfn = "$msgdir/forward.pl"; # the forwarding table %valid = ( - fromnode => '9,From Node', - tonode => '9,To Node', + fromnode => '5,From Node', + tonode => '5,To Node', to => '0,To', from => '0,From', t => '0,Msg Time,cldatetime', - private => '9,Private', + private => '5,Private', subject => '0,Subject', linesreq => '0,Lines per Gob', - rrreq => '9,Read Confirm', + rrreq => '5,Read Confirm', origin => '0,Origin', lines => '5,Data', stream => '9,Stream No', - count => '9,Gob Linecnt', - file => '9,File?,yesno', - gotit => '9,Got it Nodes,parray', - lines => '9,Lines,parray', - 'read' => '9,Times read', + count => '5,Gob Linecnt', + file => '5,File?,yesno', + gotit => '5,Got it Nodes,parray', + lines => '5,Lines,parray', + 'read' => '5,Times read', size => '0,Size', msgno => '0,Msgno', keep => '0,Keep this?,yesno', - lastt => '9,Last processed,cldatetime', - waitt => '9,Wait until,cldatetime', + lastt => '5,Last processed,cldatetime', + waitt => '5,Wait until,cldatetime', ); sub DESTROY @@ -134,10 +138,16 @@ sub process $ref->stop_msg($node); # delay any outgoing messages that fail - $ref->{waitt} = $main::systime + $waittime if $node ne $main::mycall; + $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall; } } - + + # queue some message if the interval timer has gone off + if ($main::systime > $lastq + $queueinterval) { + queue_msg(0); + $lastq = $main::systime; + } + # clean the message queue clean_old() if $main::systime - $last_clean > 3600 ; return; @@ -173,6 +183,7 @@ sub process $work{"$f[2]$stream"} = $ref; # store in work $busy{$f[2]} = $ref; # set interlock $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack + $ref->{lastt} = $main::systime; last SWITCH; } @@ -264,16 +275,15 @@ sub process $ref->store($ref->{lines}); add_dir($ref); my $dxchan = DXChannel->get($ref->{to}); - $dxchan->send($dxchan->msg('m9')) if $dxchan; + $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user; Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}"); } } $ref->stop_msg($self->call); - queue_msg(0); } else { $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } - queue_msg(0); + # queue_msg(0); last SWITCH; } @@ -292,6 +302,8 @@ sub process } else { $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } + + # send next one if present queue_msg(0); last SWITCH; } @@ -830,7 +842,6 @@ sub do_send_stuff delete $self->{loc}; $self->func(undef); - DXMsg::queue_msg(0); $self->state('prompt'); } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") { #push @out, $self->msg('sendabort');