change DXChannel::get_all_ak1a to get_all_nodes
[spider.git] / perl / DXMsg.pm
index 0bfdfefbbe3cfaba1a8ae2594ce5bd0d141635ff..58c548a22ac19e000bdc417d3e9091b3c351ca89 100644 (file)
@@ -136,7 +136,7 @@ sub process
        # this is periodic processing
        if (!$self || !$line) {
 
-               if ($main::systime > $lastq + $queueinterval) {
+               if ($main::systime >= $lastq + $queueinterval) {
 
                        # wander down the work queue stopping any messages that have timed out
                        for (keys %busy) {
@@ -183,6 +183,7 @@ sub process
 
                        my $t = cltounix($f[5], $f[6]);
                        my $stream = next_transno($f[2]);
+                       $f[13] = $self->call unless $f[13] && $f[13] gt ' ';
                        my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
                        
                        # fill in various forwarding state variables
@@ -193,6 +194,7 @@ sub process
                        $ref->{stream} = $stream;
                        $ref->{count} = 0;      # no of lines between PC31s
                        dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
+                       Log('msg', "Incoming message $f[4] to $f[3] '$f[8]'" );
                        $work{"$f[2]$stream"} = $ref; # store in work
                        $busy{$f[2]} = $ref; # set interlock
                        $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
@@ -281,8 +283,8 @@ sub process
                                                        if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) {
                                                                $ref->stop_msg($self->call);
                                                                my $msgno = $m->{msgno};
-                                                               dbg('msg', "duplicate message to $msgno\n");
-                                                               Log('msg', "duplicate message to $msgno");
+                                                               dbg('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno");
+                                                               Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno");
                                                                return;
                                                        }
                                                }
@@ -506,14 +508,21 @@ sub read_msg_header
        $file = new IO::File "$fn";
        if (!$file) {
            dbg('err', "Error reading $fn $!");
+           Log('err', "Error reading $fn $!");
                return undef;
        }
        $size = -s $fn;
        $line = <$file>;                        # first line
+       if ($size == 0 || !$line) {
+           dbg('err', "Empty $fn $!");
+           Log('err', "Empty $fn $!");
+               return undef;
+       }
        chomp $line;
        $size -= length $line;
        if (! $line =~ /^===/o) {
                dbg('err', "corrupt first line in $fn ($line)");
+               Log('err', "corrupt first line in $fn ($line)");
                return undef;
        }
        $line =~ s/^=== //o;
@@ -525,6 +534,7 @@ sub read_msg_header
        $size -= length $line;
        if (! $line =~ /^===/o) {
            dbg('err', "corrupt second line in $fn ($line)");
+           Log('err', "corrupt second line in $fn ($line)");
                return undef;
        }
        $line =~ s/^=== //o;
@@ -551,6 +561,7 @@ sub read_msg_body
        $file = new IO::File;
        if (!open($file, $fn)) {
                dbg('err' ,"Error reading $fn $!");
+               Log('err' ,"Error reading $fn $!");
                return undef;
        }
        @out = map {chomp; $_} <$file>;
@@ -589,12 +600,12 @@ sub queue_msg
        my $call = shift;
        my $ref;
        my $clref;
-       my @nodelist = DXChannel::get_all_ak1a();
        
        # bat down the message list looking for one that needs to go off site and whose
        # nearest node is not busy.
 
        dbg('msg', "queue msg ($sort)\n");
+       my @nodelist = DXChannel::get_all_nodes;
        foreach $ref (@msg) {
                # firstly, is it private and unread? if so can I find the recipient
                # in my cluster node list offsite?
@@ -611,7 +622,7 @@ sub queue_msg
                        next if $ref->{'read'};           # if it is read, it is stuck here
                        $clref = DXCluster->get_exact($ref->{to});
                        unless ($clref) {             # otherwise look for a homenode
-                               my $uref = DXUser->get($ref->{to});
+                               my $uref = DXUser->get_current($ref->{to});
                                my $hnode =  $uref->homenode if $uref;
                                $clref = DXCluster->get_exact($hnode) if $hnode;
                        }
@@ -750,10 +761,15 @@ sub init
 
        @msg = ();
        for (sort @dir) {
-               next unless /^m\d+$/o;
+               next unless /^m\d\d\d\d\d\d$/;
                
                $ref = read_msg_header("$msgdir/$_");
-               next unless $ref;
+               unless ($ref) {
+                       dbg('err', "Deleting $_");
+                       Log('err', "Deleting $_");
+                       unlink "$msgdir/$_";
+                       next;
+               }
                
                # delete any messages to 'badmsg.pl' places
                if (grep $ref->{to} eq $_, @badmsg) {
@@ -1230,6 +1246,9 @@ sub AUTOLOAD
        $name =~ s/.*:://o;
        
        confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
+       # this clever line of code creates a subroutine which takes over from autoload
+       # from OO Perl - Conway
+       *{$AUTOLOAD} = sub {@_ > 1 ? $_[0]->{$name} = $_[1] : $_[0]->{$name}} ;
        @_ ? $self->{$name} = shift : $self->{$name} ;
 }