1. added WWV filtering
[spider.git] / perl / DXMsg.pm
index 13af2cc02eac81e6bb85369add0fa39b3ce4a75e..c1f0ae7a2ac8ef3ef970edda16f733485721f90c 100644 (file)
@@ -32,7 +32,7 @@ use Carp;
 
 use strict;
 use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
-                       @badmsg $badmsgfn $forwardfn @forward);
+                       @badmsg $badmsgfn $forwardfn @forward $timeout $waittime);
 
 %work = ();                                            # outstanding jobs
 @msg = ();                                             # messages we have
@@ -41,6 +41,8 @@ $msgdir = "$main::root/msg";  # directory contain the msgs
 $maxage = 30 * 86400;                  # the maximum age that a message shall live for if not marked 
 $last_clean = 0;                               # last time we did a clean
 @forward = ();                  # msg forward table
+$timeout = 30*60;               # forwarding timeout
+$waittime = 60*60;              # time an aborted outgoing message waits before trying again
 
 $badmsgfn = "$msgdir/badmsg.pl";  # list of TO address we wont store
 $forwardfn = "$msgdir/forward.pl";  # the forwarding table
@@ -66,6 +68,8 @@ $forwardfn = "$msgdir/forward.pl";  # the forwarding table
                  size => '0,Size',
                  msgno => '0,Msgno',
                  keep => '0,Keep this?,yesno',
+                 lastt => '9,Last processed,cldatetime',
+                 waitt => '9,Wait until,cldatetime',
                 );
 
 sub DESTROY
@@ -95,6 +99,7 @@ sub alloc
        $self->{'read'} = shift;
        $self->{rrreq} = shift;
        $self->{gotit} = [];
+       $self->{lastt} = $main::systime;
     
        return $self;
 }
@@ -110,16 +115,49 @@ sub workclean
        delete $ref->{lines};
        delete $ref->{file};
        delete $ref->{count};
+       delete $ref->{lastt} if exists $ref->{lastt};
+       delete $ref->{waitt} if exists $ref->{waitt};
 }
 
 sub process
 {
        my ($self, $line) = @_;
+
+       # this is periodic processing
+       if (undef $self || undef $line) {
+
+               # wander down the work queue stopping any messages that have timed out
+               for (keys %work) {
+                       my $ref = $work{$_};
+                       if ($main::systime > $ref->{lastt} + $timeout) {
+                               my $tonode = $ref->{tonode};
+                               $ref->stop_msg();
+
+                               # delay any outgoing messages that fail
+                               $ref->{waitt} = $main::systime + $waittime if $tonode ne $main::mycall;
+                       }
+               }
+               
+               # clean the message queue
+               clean_old() if $main::systime - $last_clean > 3600 ;
+               return;
+       }
+
        my @f = split /\^/, $line;
        my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
-       
+
  SWITCH: {
                if ($pcno == 28) {              # incoming message
+
+                       # first look for any messages in the busy queue 
+                       # and cancel them this should both resolve timed out incoming messages
+                       # and crossing of message between nodes, incoming messages have priority
+                       if (exists $busy{$f[2]}) {
+                               my $ref = $busy{$f[2]};
+                               my $tonode = $ref->{tonode};
+                               $ref->stop_msg();
+                       }
+
                        my $t = cltounix($f[5], $f[6]);
                        my $stream = next_transno($f[2]);
                        my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
@@ -148,6 +186,7 @@ sub process
                                        dbg('msg', "stream $f[3]: $ref->{count} lines received\n");
                                        $ref->{count} = 0;
                                }
+                               $ref->{lastt} = $main::systime;
                        }
                        last SWITCH;
                }
@@ -165,6 +204,7 @@ sub process
                                $ref->{lines} = [];
                                push @{$ref->{lines}}, ($ref->read_msg_body);
                                $ref->send_tranche($self);
+                               $ref->{lastt} = $main::systime;
                        } else {
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
@@ -176,6 +216,7 @@ sub process
                        if ($ref) {
                                dbg('msg', "tranche ack stream $f[3]\n");
                                $ref->send_tranche($self);
+                               $ref->{lastt} = $main::systime;
                        } else {
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
@@ -202,7 +243,7 @@ sub process
                                                my $m;
                                                for $m (@msg) {
                                                        if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from}) {
-                                                               $ref->stop_msg($self);
+                                                               $ref->stop_msg();
                                                                my $msgno = $m->{msgno};
                                                                dbg('msg', "duplicate message to $msgno\n");
                                                                Log('msg', "duplicate message to $msgno");
@@ -212,7 +253,7 @@ sub process
                                                        
                                                # look for 'bad' to addresses 
                                                if (grep $ref->{to} eq $_, @badmsg) {
-                                                       $ref->stop_msg($self);
+                                                       $ref->stop_msg();
                                                        dbg('msg', "'Bad' TO address $ref->{to}");
                                                        Log('msg', "'Bad' TO address $ref->{to}");
                                                        return;
@@ -223,11 +264,11 @@ sub process
                                                $ref->store($ref->{lines});
                                                add_dir($ref);
                                                my $dxchan = DXChannel->get($ref->{to});
-                                               $dxchan->send($dxchan->msg('msgnew')) if $dxchan;
+                                               $dxchan->send($dxchan->msg('m9')) if $dxchan;
                                                Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
                                        }
                                }
-                               $ref->stop_msg($self);
+                               $ref->stop_msg();
                                queue_msg(0);
                        } else {
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
@@ -247,7 +288,7 @@ sub process
                                        push @{$ref->{gotit}}, $f[2]; # mark this up as being received
                                        $ref->store($ref->{lines});     # re- store the file
                                }
-                               $ref->stop_msg($self);
+                               $ref->stop_msg();
                        } else {
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
@@ -294,7 +335,7 @@ sub process
                        dbg('msg', "stream $f[3]: abort received\n");
                        my $ref = $work{"$f[2]$f[3]"};
                        if ($ref) {
-                               $ref->stop_msg($self);
+                               $ref->stop_msg();
                                $ref = undef;
                        }
                        
@@ -310,8 +351,6 @@ sub process
                        }
                }
        }
-       
-       clean_old() if $main::systime - $last_clean > 3600 ; # clean the message queue
 }
 
 
@@ -516,6 +555,13 @@ sub queue_msg
        foreach $ref (@msg) {
                # firstly, is it private and unread? if so can I find the recipient
                # in my cluster node list offsite?
+
+               # ignore 'delayed' messages until their waiting time has expired
+               if (exists $ref->{waitt}) {
+                       next if $ref->{waitt} < $main::systime;
+                       delete $ref->{waitt};
+               } 
+               
                if ($ref->{private}) {
                        if ($ref->{'read'} == 0) {
                                $clref = DXCluster->get_exact($ref->{to});
@@ -606,11 +652,13 @@ sub get_fwq
 sub stop_msg
 {
        my ($self, $dxchan) = @_;
-       my $node = $dxchan->call;
+       my $node = $self->{tonode}
+       my $stream = $self->{stream} if exists $self->{stream};
        
-       dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n");
+       
+       dbg('msg', "stop msg $self->{msgno} -> node $node\n");
        delete $work{$node};
-       delete $work{"$node$self->{stream}"};
+       delete $work{"$node$stream"} if $stream;
        $self->workclean;
        delete $busy{$node};
 }
@@ -743,7 +791,7 @@ sub do_send_stuff
                $loc->{lines} = [];
                $self->state('sendbody');
                #push @out, $self->msg('sendbody');
-               push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit";
+               push @out, $self->msg('m8');)
        } elsif ($self->state eq 'sendbody') {
                confess "local var gone missing" if !ref $self->{loc};
                my $loc = $self->{loc};
@@ -766,12 +814,12 @@ sub do_send_stuff
                                                                                $loc->{rrreq});
                                        $ref->store($loc->{lines});
                                        $ref->add_dir();
-                                       #push @out, $self->msg('sendsent', $to);
-                                       push @out, "msgno $ref->{msgno} sent to $to";
+                                       push @out, $self->msg('m11', $ref->{msgno}, $to);
+                                       #push @out, "msgno $ref->{msgno} sent to $to";
                                        my $dxchan = DXChannel->get(uc $to);
                                        if ($dxchan) {
                                                if ($dxchan->is_user()) {
-                                                       $dxchan->send("New mail has arrived for you");
+                                                       $dxchan->send($dxchan->msg('m9'));
                                                }
                                        }
                                }
@@ -780,11 +828,12 @@ sub do_send_stuff
                        delete $loc->{to};
                        delete $self->{loc};
                        $self->func(undef);
+                       
                        DXMsg::queue_msg(0);
                        $self->state('prompt');
                } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
                        #push @out, $self->msg('sendabort');
-                       push @out, "aborted";
+                       push @out, $self->msg('m10');
                        delete $loc->{lines};
                        delete $loc->{to};
                        delete $self->{loc};