X-Git-Url: http://dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=a3c0798e73f948cfcebdf74665da8b069dc243fd;hb=638d9efe6fe3d3c4eec08d5e985fce4dd760423b;hp=c1f0ae7a2ac8ef3ef970edda16f733485721f90c;hpb=f0ac8322367c66080b6dbb74da4de72dae126dc3;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index c1f0ae7a..a3c0798e 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -32,7 +32,8 @@ use Carp; use strict; use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean - @badmsg $badmsgfn $forwardfn @forward $timeout $waittime); + @badmsg $badmsgfn $forwardfn @forward $timeout $waittime + $queueinterval $lastq); %work = (); # outstanding jobs @msg = (); # messages we have @@ -42,34 +43,37 @@ $maxage = 30 * 86400; # the maximum age that a message shall live for if not m $last_clean = 0; # last time we did a clean @forward = (); # msg forward table $timeout = 30*60; # forwarding timeout -$waittime = 60*60; # time an aborted outgoing message waits before trying again +$waittime = 30*60; # time an aborted outgoing message waits before trying again +$queueinterval = 1*60; # run the queue every 1 minute +$lastq = 0; + $badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store $forwardfn = "$msgdir/forward.pl"; # the forwarding table %valid = ( - fromnode => '9,From Node', - tonode => '9,To Node', + fromnode => '5,From Node', + tonode => '5,To Node', to => '0,To', from => '0,From', t => '0,Msg Time,cldatetime', - private => '9,Private', + private => '5,Private', subject => '0,Subject', linesreq => '0,Lines per Gob', - rrreq => '9,Read Confirm', + rrreq => '5,Read Confirm', origin => '0,Origin', lines => '5,Data', stream => '9,Stream No', - count => '9,Gob Linecnt', - file => '9,File?,yesno', - gotit => '9,Got it Nodes,parray', - lines => '9,Lines,parray', - 'read' => '9,Times read', + count => '5,Gob Linecnt', + file => '5,File?,yesno', + gotit => '5,Got it Nodes,parray', + lines => '5,Lines,parray', + 'read' => '5,Times read', size => '0,Size', msgno => '0,Msgno', keep => '0,Keep this?,yesno', - lastt => '9,Last processed,cldatetime', - waitt => '9,Wait until,cldatetime', + lastt => '5,Last processed,cldatetime', + waitt => '5,Wait until,cldatetime', ); sub DESTROY @@ -124,20 +128,28 @@ sub process my ($self, $line) = @_; # this is periodic processing - if (undef $self || undef $line) { - - # wander down the work queue stopping any messages that have timed out - for (keys %work) { - my $ref = $work{$_}; - if ($main::systime > $ref->{lastt} + $timeout) { - my $tonode = $ref->{tonode}; - $ref->stop_msg(); - - # delay any outgoing messages that fail - $ref->{waitt} = $main::systime + $waittime if $tonode ne $main::mycall; + if (!$self || !$line) { + + if ($main::systime > $lastq + $queueinterval) { + + # wander down the work queue stopping any messages that have timed out + for (keys %busy) { + my $node = $_; + my $ref = $busy{$_}; + if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) { + dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node"); + $ref->stop_msg($node); + + # delay any outgoing messages that fail + $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall; + } } + + # queue some message if the interval timer has gone off + queue_msg(0); + $lastq = $main::systime; } - + # clean the message queue clean_old() if $main::systime - $last_clean > 3600 ; return; @@ -155,7 +167,8 @@ sub process if (exists $busy{$f[2]}) { my $ref = $busy{$f[2]}; my $tonode = $ref->{tonode}; - $ref->stop_msg(); + dbg('msg', "Busy, stopping msgno: $ref->{msgno} -> $f[2]"); + $ref->stop_msg($self->call); } my $t = cltounix($f[5], $f[6]); @@ -173,6 +186,14 @@ sub process $work{"$f[2]$stream"} = $ref; # store in work $busy{$f[2]} = $ref; # set interlock $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack + $ref->{lastt} = $main::systime; + + # look to see whether this is a non private message sent to a known callsign + my $uref = DXUser->get_current($ref->{to}); + if (iscallsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) { + $ref->{private} = 1; + dbg('msg', "set bull to $ref->{to} to private"); + } last SWITCH; } @@ -187,6 +208,9 @@ sub process $ref->{count} = 0; } $ref->{lastt} = $main::systime; + } else { + dbg('msg', "PC29 from unknown stream $f[3] from $f[2]" ); + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; } @@ -206,6 +230,7 @@ sub process $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { + dbg('msg', "PC30 from unknown stream $f[3] from $f[2]" ); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; @@ -218,6 +243,7 @@ sub process $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { + dbg('msg', "PC31 from unknown stream $f[3] from $f[2]" ); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; @@ -242,8 +268,8 @@ sub process # does an identical message already exist? my $m; for $m (@msg) { - if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from}) { - $ref->stop_msg(); + if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) { + $ref->stop_msg($self->call); my $msgno = $m->{msgno}; dbg('msg', "duplicate message to $msgno\n"); Log('msg', "duplicate message to $msgno"); @@ -253,7 +279,7 @@ sub process # look for 'bad' to addresses if (grep $ref->{to} eq $_, @badmsg) { - $ref->stop_msg(); + $ref->stop_msg($self->call); dbg('msg', "'Bad' TO address $ref->{to}"); Log('msg', "'Bad' TO address $ref->{to}"); return; @@ -264,16 +290,16 @@ sub process $ref->store($ref->{lines}); add_dir($ref); my $dxchan = DXChannel->get($ref->{to}); - $dxchan->send($dxchan->msg('m9')) if $dxchan; + $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user; Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}"); } } - $ref->stop_msg(); - queue_msg(0); + $ref->stop_msg($self->call); } else { + dbg('msg', "PC32 from unknown stream $f[3] from $f[2]" ); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } - queue_msg(0); + # queue_msg(0); last SWITCH; } @@ -288,10 +314,13 @@ sub process push @{$ref->{gotit}}, $f[2]; # mark this up as being received $ref->store($ref->{lines}); # re- store the file } - $ref->stop_msg(); + $ref->stop_msg($self->call); } else { + dbg('msg', "PC33 from unknown stream $f[3] from $f[2]" ); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } + + # send next one if present queue_msg(0); last SWITCH; } @@ -325,6 +354,7 @@ sub process $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s $ref->{file} = 1; + $ref->{lastt} = $main::systime; $work{"$f[2]$stream"} = $ref; # store in work $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack @@ -335,18 +365,18 @@ sub process dbg('msg', "stream $f[3]: abort received\n"); my $ref = $work{"$f[2]$f[3]"}; if ($ref) { - $ref->stop_msg(); + $ref->stop_msg($self->call); $ref = undef; } - last SWITCH; } if ($pcno == 49) { # global delete on subject for (@msg) { - if ($_->{subject} eq $f[2]) { + if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) { $_->del_msg(); - Log('msg', "Message $_->{msgno} fully deleted by $f[1]"); + Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted"); + DXProt::broadcast_ak1a($line, $self); } } } @@ -363,9 +393,9 @@ sub store my $lines = shift; # we only proceed if there are actually any lines in the file - if (!$lines || @{$lines} == 0) { - return; - } +# if (!$lines || @{$lines} == 0) { +# return; +# } if ($ref->{file}) { # a file dbg('msg', "To be stored in $ref->{to}\n"); @@ -545,8 +575,7 @@ sub queue_msg my $call = shift; my $ref; my $clref; - my $dxchan; - my @nodelist = DXProt::get_all_ak1a(); + my @nodelist = DXChannel::get_all_ak1a(); # bat down the message list looking for one that needs to go off site and whose # nearest node is not busy. @@ -558,42 +587,41 @@ sub queue_msg # ignore 'delayed' messages until their waiting time has expired if (exists $ref->{waitt}) { - next if $ref->{waitt} < $main::systime; + next if $ref->{waitt} > $main::systime; delete $ref->{waitt}; } - + + # deal with routed private messages + my $noderef; if ($ref->{private}) { - if ($ref->{'read'} == 0) { - $clref = DXCluster->get_exact($ref->{to}); - unless ($clref) { # otherwise look for a homenode - my $uref = DXUser->get($ref->{to}); - my $hnode = $uref->homenode if $uref; - $clref = DXCluster->get_exact($hnode) if $hnode; - } - if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) { - $dxchan = $clref->{dxchan}; - $ref->start_msg($dxchan) if $dxchan && $clref && !get_busy($dxchan->call) && $dxchan->state eq 'normal'; - } + $clref = DXCluster->get_exact($ref->{to}); + unless ($clref) { # otherwise look for a homenode + my $uref = DXUser->get($ref->{to}); + my $hnode = $uref->homenode if $uref; + $clref = DXCluster->get_exact($hnode) if $hnode; } - } elsif (!$sort) { - # otherwise we are dealing with a bulletin, compare the gotit list with - # the nodelist up above, if there are sites that haven't got it yet - # then start sending it - what happens when we get loops is anyone's - # guess, use (to, from, time, subject) tuple? - my $noderef; - foreach $noderef (@nodelist) { - next if $noderef->call eq $main::mycall; - next if grep { $_ eq $noderef->call } @{$ref->{gotit}}; - next unless $ref->forward_it($noderef->call); # check the forwarding file - # next if $noderef->isolate; # maybe add code for stuff originated here? - # next if DXUser->get( ${$ref->{gotit}}[0] )->isolate; # is the origin isolated? - - # if we are here we have a node that doesn't have this message + if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) { + next if $clref->call eq $main::mycall; # i.e. it lives here + $noderef = $clref->{dxchan}; $ref->start_msg($noderef) if !get_busy($noderef->call) && $noderef->state eq 'normal'; - last; } } + # otherwise we are dealing with a bulletin or forwarded private message + # compare the gotit list with + # the nodelist up above, if there are sites that haven't got it yet + # then start sending it - what happens when we get loops is anyone's + # guess, use (to, from, time, subject) tuple? + foreach $noderef (@nodelist) { + next if $noderef->call eq $main::mycall; + next if grep { $_ eq $noderef->call } @{$ref->{gotit}}; + next unless $ref->forward_it($noderef->call); # check the forwarding file + + # if we are here we have a node that doesn't have this message + $ref->start_msg($noderef) if !get_busy($noderef->call) && $noderef->state eq 'normal'; + last; + } + # if all the available nodes are busy then stop last if @nodelist == scalar grep { get_busy($_->call) } @nodelist; } @@ -624,8 +652,9 @@ sub start_msg $self->{count} = 0; $self->{tonode} = $dxchan->call; $self->{fromnode} = $main::mycall; - $busy{$dxchan->call} = $self; - $work{"$self->{tonode}"} = $self; + $busy{$self->{tonode}} = $self; + $work{$self->{tonode}} = $self; + $self->{lastt} = $main::systime; $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq})); } @@ -651,8 +680,8 @@ sub get_fwq # stop a message from continuing, clean it out, unlock interlocks etc sub stop_msg { - my ($self, $dxchan) = @_; - my $node = $self->{tonode} + my $self = shift; + my $node = shift; my $stream = $self->{stream} if exists $self->{stream}; @@ -791,11 +820,11 @@ sub do_send_stuff $loc->{lines} = []; $self->state('sendbody'); #push @out, $self->msg('sendbody'); - push @out, $self->msg('m8');) + push @out, $self->msg('m8'); } elsif ($self->state eq 'sendbody') { confess "local var gone missing" if !ref $self->{loc}; my $loc = $self->{loc}; - if ($line eq "\032" || uc $line eq "/EX") { + if ($line eq "\032" || $line eq '%1A' || uc $line eq "/EX") { my $to; if (@{$loc->{lines}} > 0) { @@ -829,7 +858,6 @@ sub do_send_stuff delete $self->{loc}; $self->func(undef); - DXMsg::queue_msg(0); $self->state('prompt'); } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") { #push @out, $self->msg('sendabort'); @@ -861,8 +889,11 @@ sub dir sub load_forward { my @out; - do "$forwardfn" if -e "$forwardfn"; - push @out, $@ if $@; + my $s = readfilestr($forwardfn); + if ($s) { + eval $s; + push @out, $@ if $@; + } return @out; } @@ -870,8 +901,11 @@ sub load_forward sub load_badmsg { my @out; - do "$badmsgfn" if -e "$badmsgfn"; - push @out, $@ if $@; + my $s = readfilestr($badmsgfn); + if ($s) { + eval $s; + push @out, $@ if $@; + } return @out; } @@ -891,11 +925,13 @@ sub forward_it my $tested; # are we interested? - last if $ref->{private} && $sort ne 'P'; - last if !$ref->{private} && $sort ne 'B'; + next if $ref->{private} && $sort ne 'P'; + next if !$ref->{private} && $sort ne 'B'; # select field $tested = $ref->{to} if $field eq 'T'; + my $at = $ref->{to} =~ /\@\s*(\S+)/; + $tested = $at if $field eq '\@'; $tested = $ref->{from} if $field eq 'F'; $tested = $ref->{origin} if $field eq 'O'; $tested = $ref->{subject} if $field eq 'S';