X-Git-Url: http://dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=2cdb6a8028a15bb2d11e4ae27a1755b1e23e15d1;hb=6624dcdf07d628e8d6a16fc6549edf40be25b7b2;hp=1bbe00ec0c8262acddbb285c200c4e81c584e0ec;hpb=ca8e84c32e70ea8eb1f30e716b7dbdc92f7e5083;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index 1bbe00ec..2cdb6a80 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -19,7 +19,6 @@ use DXUtil; use DXChannel; use DXUser; use DXM; -use DXCluster; use DXProtVars; use DXProtout; use DXDebug; @@ -169,7 +168,7 @@ sub process if (exists $busy{$fromnode}) { my $ref = $busy{$fromnode}; my $tonode = $ref->{tonode}; - dbg('msg', "Busy, stopping msgno: $ref->{msgno} -> $fromnode"); + dbg("Busy, stopping msgno: $ref->{msgno} -> $fromnode") if isdbg('msg'); $ref->stop_msg($self->call); } @@ -184,7 +183,7 @@ sub process $ref->{linesreq} = $f[10]; $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s - dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n"); + dbg("new message from $f[4] to $f[3] '$f[8]' stream $fromnode/$stream\n") if isdbg('msg'); Log('msg', "Incoming message $f[4] to $f[3] '$f[8]'" ); $work{"$fromnode$stream"} = $ref; # store in work $busy{$fromnode} = $ref; # set interlock @@ -195,7 +194,7 @@ sub process my $uref = DXUser->get_current($ref->{to}); if (is_callsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) { $ref->{private} = 1; - dbg('msg', "set bull to $ref->{to} to private"); + dbg("set bull to $ref->{to} to private") if isdbg('msg'); } last SWITCH; } @@ -208,12 +207,12 @@ sub process $ref->{count}++; if ($ref->{count} >= $ref->{linesreq}) { $self->send(DXProt::pc31($f[2], $f[1], $f[3])); - dbg('msg', "stream $f[3]: $ref->{count} lines received\n"); + dbg("stream $f[3]: $ref->{count} lines received\n") if isdbg('msg'); $ref->{count} = 0; } $ref->{lastt} = $main::systime; } else { - dbg('msg', "PC29 from unknown stream $f[3] from $f[2]" ); + dbg("PC29 from unknown stream $f[3] from $f[2]") if isdbg('msg'); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; @@ -227,13 +226,13 @@ sub process $ref->{count} = 0; $ref->{linesreq} = 5; $work{"$f[2]$f[3]"} = $ref; # new ref - dbg('msg', "incoming subject ack stream $f[3]\n"); + dbg("incoming subject ack stream $f[3]\n") if isdbg('msg'); $busy{$f[2]} = $ref; # interlock push @{$ref->{lines}}, ($ref->read_msg_body); $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { - dbg('msg', "PC30 from unknown stream $f[3] from $f[2]" ); + dbg("PC30 from unknown stream $f[3] from $f[2]") if isdbg('msg'); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; @@ -242,18 +241,18 @@ sub process if ($pcno == 31) { # acknowledge a tranche of lines my $ref = $work{"$f[2]$f[3]"}; if ($ref) { - dbg('msg', "tranche ack stream $f[3]\n"); + dbg("tranche ack stream $f[3]\n") if isdbg('msg'); $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { - dbg('msg', "PC31 from unknown stream $f[3] from $f[2]" ); + dbg("PC31 from unknown stream $f[3] from $f[2]") if isdbg('msg'); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; } if ($pcno == 32) { # incoming EOM - dbg('msg', "stream $f[3]: EOM received\n"); + dbg("stream $f[3]: EOM received\n") if isdbg('msg'); my $ref = $work{"$f[2]$f[3]"}; if ($ref) { $self->send(DXProt::pc33($f[2], $f[1], $f[3])); # acknowledge it @@ -274,7 +273,7 @@ sub process if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) { $ref->stop_msg($self->call); my $msgno = $m->{msgno}; - dbg('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno"); + dbg("duplicate message from $ref->{from} -> $ref->{to} to $msgno") if isdbg('msg'); Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno"); return; } @@ -286,7 +285,7 @@ sub process # look for 'bad' to addresses if ($ref->dump_it) { $ref->stop_msg($self->call); - dbg('msg', "'Bad' message $ref->{to}"); + dbg("'Bad' message $ref->{to}") if isdbg('msg'); Log('msg', "'Bad' message $ref->{to}"); return; } @@ -302,7 +301,7 @@ sub process } $ref->stop_msg($self->call); } else { - dbg('msg', "PC32 from unknown stream $f[3] from $f[2]" ); + dbg("PC32 from unknown stream $f[3] from $f[2]") if isdbg('msg'); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } # queue_msg(0); @@ -322,7 +321,7 @@ sub process } $ref->stop_msg($self->call); } else { - dbg('msg', "PC33 from unknown stream $f[3] from $f[2]" ); + dbg("PC33 from unknown stream $f[3] from $f[2]") if isdbg('msg'); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } @@ -336,7 +335,7 @@ sub process $f[3] =~ s/\.//og; # remove dots $f[3] =~ s/^\///o; # remove the leading / $f[3] = lc $f[3]; # to lower case; - dbg('msg', "incoming file $f[3]\n"); + dbg("incoming file $f[3]\n") if isdbg('msg'); $f[3] = 'packclus/' . $f[3] unless $f[3] =~ /^packclus\//o; # create any directories @@ -348,7 +347,7 @@ sub process $fn .= "/$part"; next if -e $fn; last SWITCH if !mkdir $fn, 0777; - dbg('msg', "created directory $fn\n"); + dbg("created directory $fn\n") if isdbg('msg'); } my $stream = next_transno($f[2]); my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0'); @@ -368,7 +367,7 @@ sub process } if ($pcno == 42) { # abort transfer - dbg('msg', "stream $f[3]: abort received\n"); + dbg("stream $f[3]: abort received\n") if isdbg('msg'); my $ref = $work{"$f[2]$f[3]"}; if ($ref) { $ref->stop_msg($self->call); @@ -399,7 +398,7 @@ sub store my $lines = shift; if ($ref->{file}) { # a file - dbg('msg', "To be stored in $ref->{to}\n"); + dbg("To be stored in $ref->{to}\n") if isdbg('msg'); my $fh = new IO::File "$ref->{to}", "w"; if (defined $fh) { @@ -408,7 +407,7 @@ sub store print $fh "$line\n"; } $fh->close; - dbg('msg', "file $ref->{to} stored\n"); + dbg("file $ref->{to} stored\n") if isdbg('msg'); Log('msg', "file $ref->{to} from $ref->{from} stored" ); } else { confess "can't open file $ref->{to} $!"; @@ -418,7 +417,7 @@ sub store # attempt to open the message file my $fn = filename($ref->{msgno}); - dbg('msg', "To be stored in $fn\n"); + dbg("To be stored in $fn\n") if isdbg('msg'); # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem) my $fh = new IO::File "$fn", "w"; @@ -434,7 +433,7 @@ sub store print $fh "$line\n"; } $fh->close; - dbg('msg', "msg $ref->{msgno} stored\n"); + dbg("msg $ref->{msgno} stored\n") if isdbg('msg'); Log('msg', "msg $ref->{msgno} from $ref->{from} to $ref->{to} stored" ); } else { confess "can't open msg file $fn $!"; @@ -448,13 +447,13 @@ sub del_msg my $self = shift; # remove it from the active message list - dbg('msg', "\@msg = " . scalar @msg . " before delete"); + dbg("\@msg = " . scalar @msg . " before delete") if isdbg('msg'); @msg = grep { $_ != $self } @msg; # remove the file unlink filename($self->{msgno}); - dbg('msg', "deleting $self->{msgno}\n"); - dbg('msg', "\@msg = " . scalar @msg . " after delete"); + dbg("deleting $self->{msgno}\n") if isdbg('msg'); + dbg("\@msg = " . scalar @msg . " after delete") if isdbg('msg'); } # clean out old messages from the message queue @@ -463,18 +462,18 @@ sub clean_old my $ref; # mark old messages for deletion - dbg('msg', "\@msg = " . scalar @msg . " before delete"); + dbg("\@msg = " . scalar @msg . " before delete") if isdbg('msg'); foreach $ref (@msg) { if (ref($ref) && !$ref->{keep} && $ref->{t} < $main::systime - $maxage) { $ref->{deleteme} = 1; unlink filename($ref->{msgno}); - dbg('msg', "deleting old $ref->{msgno}\n"); + dbg("deleting old $ref->{msgno}\n") if isdbg('msg'); } } # remove them all from the active message list @msg = grep { !$_->{deleteme} } @msg; - dbg('msg', "\@msg = " . scalar @msg . " after delete"); + dbg("\@msg = " . scalar @msg . " after delete") if isdbg('msg'); $last_clean = $main::systime; } @@ -490,21 +489,21 @@ sub read_msg_header $file = new IO::File "$fn"; if (!$file) { - dbg('err', "Error reading $fn $!"); + dbg("Error reading $fn $!"); Log('err', "Error reading $fn $!"); return undef; } $size = -s $fn; $line = <$file>; # first line if ($size == 0 || !$line) { - dbg('err', "Empty $fn $!"); + dbg("Empty $fn $!"); Log('err', "Empty $fn $!"); return undef; } chomp $line; $size -= length $line; if (! $line =~ /^===/o) { - dbg('err', "corrupt first line in $fn ($line)"); + dbg("corrupt first line in $fn ($line)"); Log('err', "corrupt first line in $fn ($line)"); return undef; } @@ -516,7 +515,7 @@ sub read_msg_header chomp $line; $size -= length $line; if (! $line =~ /^===/o) { - dbg('err', "corrupt second line in $fn ($line)"); + dbg("corrupt second line in $fn ($line)"); Log('err', "corrupt second line in $fn ($line)"); return undef; } @@ -543,7 +542,7 @@ sub read_msg_body $file = new IO::File; if (!open($file, $fn)) { - dbg('err' ,"Error reading $fn $!"); + dbg("Error reading $fn $!"); Log('err' ,"Error reading $fn $!"); return undef; } @@ -587,7 +586,7 @@ sub queue_msg # bat down the message list looking for one that needs to go off site and whose # nearest node is not busy. - dbg('msg', "queue msg ($sort)\n"); + dbg("queue msg ($sort)\n") if isdbg('msg'); my @nodelist = DXChannel::get_all_nodes; foreach $ref (@msg) { @@ -600,7 +599,7 @@ sub queue_msg # any time outs? if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) { my $node = $ref->{tonode}; - dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node"); + dbg("Timeout, stopping msgno: $ref->{msgno} -> $node") if isdbg('msg'); Log('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node"); $ref->stop_msg($node); @@ -617,16 +616,22 @@ sub queue_msg my $dxchan; if ($ref->{private}) { next if $ref->{'read'}; # if it is read, it is stuck here - $clref = DXCluster->get_exact($ref->{to}); - unless ($clref) { # otherwise look for a homenode - my $uref = DXUser->get_current($ref->{to}); - my $hnode = $uref->homenode if $uref; - $clref = DXCluster->get_exact($hnode) if $hnode; - } - if ($clref && !grep { $clref->dxchan == $_ } DXCommandmode::get_all()) { - next if $clref->call eq $main::mycall; # i.e. it lives here + $clref = Route::get($ref->{to}); +# unless ($clref) { # otherwise look for a homenode +# my $uref = DXUser->get_current($ref->{to}); +# my $hnode = $uref->homenode if $uref; +# $clref = Route::Node::get($hnode) if $hnode; +# } + if ($clref) { $dxchan = $clref->dxchan; - $ref->start_msg($dxchan) if $dxchan && !get_busy($dxchan->call) && $dxchan->state eq 'normal'; + if ($dxchan) { + if ($dxchan->is_node) { + next if $clref->call eq $main::mycall; # i.e. it lives here + $ref->start_msg($dxchan) if !get_busy($dxchan->call) && $dxchan->state eq 'normal'; + } + } else { + dbg("Route: No dxchan for $ref->{to} " . ref($clref) ) if isdbg('msg'); + } } } @@ -672,7 +677,7 @@ sub start_msg { my ($self, $dxchan) = @_; - dbg('msg', "start msg $self->{msgno}\n"); + dbg("start msg $self->{msgno}\n") if isdbg('msg'); $self->{linesreq} = 10; $self->{count} = 0; $self->{tonode} = $dxchan->call; @@ -710,7 +715,7 @@ sub stop_msg my $stream = $self->{stream} if exists $self->{stream}; - dbg('msg', "stop msg $self->{msgno} -> node $node\n"); + dbg("stop msg $self->{msgno} -> node $node\n") if isdbg('msg'); delete $work{$node}; delete $work{"$node$stream"} if $stream; $self->workclean; @@ -728,12 +733,12 @@ sub next_transno my $fh = new IO::File; if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) { $fh->autoflush(1); - $msgno = $fh->getline; + $msgno = $fh->getline || '0'; chomp $msgno; $msgno++; seek $fh, 0, 0; $fh->print("$msgno\n"); - dbg('msg', "msgno $msgno allocated for $name\n"); + dbg("msgno $msgno allocated for $name\n") if isdbg('msg'); $fh->close; } else { confess "can't open $fn $!"; @@ -749,9 +754,9 @@ sub init my $ref; # load various control files - dbg('err', "load badmsg: " . (load_badmsg() or "Ok")); - dbg('err', "load forward: " . (load_forward() or "Ok")); - dbg('err', "load swop: " . (load_swop() or "Ok")); + dbg("load badmsg: " . (load_badmsg() or "Ok")); + dbg("load forward: " . (load_forward() or "Ok")); + dbg("load swop: " . (load_swop() or "Ok")); # read in the directory opendir($dir, $msgdir) or confess "can't open $msgdir $!"; @@ -764,7 +769,7 @@ sub init $ref = read_msg_header("$msgdir/$_"); unless ($ref) { - dbg('err', "Deleting $_"); + dbg("Deleting $_"); Log('err', "Deleting $_"); unlink "$msgdir/$_"; next; @@ -772,7 +777,7 @@ sub init # delete any messages to 'badmsg.pl' places if ($ref->dump_it) { - dbg('msg', "'Bad' TO address $ref->{to}"); + dbg("'Bad' TO address $ref->{to}") if isdbg('msg'); Log('msg', "'Bad' TO address $ref->{to}"); $ref->del_msg; next; @@ -1064,7 +1069,7 @@ sub import_msgs # are there any to do in this directory? return unless -d $importfn; unless (opendir(DIR, $importfn)) { - dbg('msg', "can\'t open $importfn $!"); + dbg("can\'t open $importfn $!") if isdbg('msg'); Log('msg', "can\'t open $importfn $!"); return; } @@ -1078,7 +1083,7 @@ sub import_msgs my $fn = "$importfn/$name"; next unless -f $fn; unless (open(MSG, $fn)) { - dbg('msg', "can\'t open import file $fn $!"); + dbg("can\'t open import file $fn $!") if isdbg('msg'); Log('msg', "can\'t open import file $fn $!"); unlink($fn); next; @@ -1111,7 +1116,7 @@ sub import_one my @f = split /\s+/, $line; unless (@f && $f[0] =~ /^(:?S|SP|SB|SEND)$/ ) { my $m = "invalid first line in import '$line'"; - dbg('MSG', $m ); + dbg($m) if isdbg('msg'); return (1, $m); } while (@f) {