X-Git-Url: http://dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=55834fc3a6edf0a408c6091df4fe7e0caffcef0e;hb=15a742ea0f1983282fdff272a362555afbdb99ad;hp=7cef3adb4f8f5bb7a8415e8a3cf196d8c20e53a4;hpb=866bd5c7cd0fd6c8167d6e6a0c9acfe5feb0ba65;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index 7cef3adb..55834fc3 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -28,12 +28,11 @@ use DXDebug; use DXLog; use IO::File; use Fcntl; -use Carp; use strict; use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean @badmsg @swop $swopfn $badmsgfn $forwardfn @forward $timeout $waittime - $queueinterval $lastq $importfn); + $queueinterval $lastq $importfn $minchunk $maxchunk); %work = (); # outstanding jobs @msg = (); # messages we have @@ -49,6 +48,8 @@ $waittime = 30*60; # time an aborted outgoing message waits before $queueinterval = 1*60; # run the queue every 1 minute $lastq = 0; +$minchunk = 4800; # minimum chunk size for a split message +$maxchunk = 6000; # maximum chunk size $badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store $forwardfn = "$msgdir/forward.pl"; # the forwarding table @@ -109,6 +110,7 @@ sub alloc $self->{rrreq} = shift; $self->{gotit} = []; $self->{lastt} = $main::systime; + $self->{lines} = []; return $self; } @@ -121,7 +123,6 @@ sub workclean delete $ref->{tonode}; delete $ref->{fromnode}; delete $ref->{stream}; - delete $ref->{lines}; delete $ref->{file}; delete $ref->{count}; delete $ref->{lastt} if exists $ref->{lastt}; @@ -135,7 +136,7 @@ sub process # this is periodic processing if (!$self || !$line) { - if ($main::systime > $lastq + $queueinterval) { + if ($main::systime >= $lastq + $queueinterval) { # wander down the work queue stopping any messages that have timed out for (keys %busy) { @@ -182,6 +183,7 @@ sub process my $t = cltounix($f[5], $f[6]); my $stream = next_transno($f[2]); + $f[13] = $self->call unless $f[13] && $f[13] gt ' '; my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]); # fill in various forwarding state variables @@ -192,6 +194,7 @@ sub process $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n"); + Log('msg', "Incoming message $f[4] to $f[3] '$f[8]'" ); $work{"$f[2]$stream"} = $ref; # store in work $busy{$f[2]} = $ref; # set interlock $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack @@ -209,6 +212,7 @@ sub process if ($pcno == 29) { # incoming text my $ref = $work{"$f[2]$f[3]"}; if ($ref) { + $f[4] =~ s/\%5E/^/g; push @{$ref->{lines}}, $f[4]; $ref->{count}++; if ($ref->{count} >= $ref->{linesreq}) { @@ -234,7 +238,6 @@ sub process $work{"$f[2]$f[3]"} = $ref; # new ref dbg('msg', "incoming subject ack stream $f[3]\n"); $busy{$f[2]} = $ref; # interlock - $ref->{lines} = []; push @{$ref->{lines}}, ($ref->read_msg_body); $ref->send_tranche($self); $ref->{lastt} = $main::systime; @@ -280,8 +283,8 @@ sub process if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) { $ref->stop_msg($self->call); my $msgno = $m->{msgno}; - dbg('msg', "duplicate message to $msgno\n"); - Log('msg', "duplicate message to $msgno"); + dbg('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno"); + Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno"); return; } } @@ -404,12 +407,7 @@ sub store { my $ref = shift; my $lines = shift; - - # we only proceed if there are actually any lines in the file -# if (!$lines || @{$lines} == 0) { -# return; -# } - + if ($ref->{file}) { # a file dbg('msg', "To be stored in $ref->{to}\n"); @@ -502,17 +500,24 @@ sub read_msg_header my @f; my $size; - $file = new IO::File; - if (!open($file, $fn)) { - print "Error reading $fn $!\n"; + $file = new IO::File "$fn"; + if (!$file) { + dbg('err', "Error reading $fn $!"); + Log('err', "Error reading $fn $!"); return undef; } $size = -s $fn; $line = <$file>; # first line + if ($size == 0 || !$line) { + dbg('err', "Empty $fn $!"); + Log('err', "Empty $fn $!"); + return undef; + } chomp $line; $size -= length $line; if (! $line =~ /^===/o) { - print "corrupt first line in $fn ($line)\n"; + dbg('err', "corrupt first line in $fn ($line)"); + Log('err', "corrupt first line in $fn ($line)"); return undef; } $line =~ s/^=== //o; @@ -523,7 +528,8 @@ sub read_msg_header chomp $line; $size -= length $line; if (! $line =~ /^===/o) { - print "corrupt second line in $fn ($line)\n"; + dbg('err', "corrupt second line in $fn ($line)"); + Log('err', "corrupt second line in $fn ($line)"); return undef; } $line =~ s/^=== //o; @@ -549,10 +555,11 @@ sub read_msg_body $file = new IO::File; if (!open($file, $fn)) { - print "Error reading $fn $!\n"; + dbg('err' ,"Error reading $fn $!"); + Log('err' ,"Error reading $fn $!"); return undef; } - chomp (@out = <$file>); + @out = map {chomp; $_} <$file>; close($file); shift @out if $out[0] =~ /^=== /; @@ -588,12 +595,12 @@ sub queue_msg my $call = shift; my $ref; my $clref; - my @nodelist = DXChannel::get_all_ak1a(); # bat down the message list looking for one that needs to go off site and whose # nearest node is not busy. dbg('msg', "queue msg ($sort)\n"); + my @nodelist = DXChannel::get_all_nodes; foreach $ref (@msg) { # firstly, is it private and unread? if so can I find the recipient # in my cluster node list offsite? @@ -610,11 +617,11 @@ sub queue_msg next if $ref->{'read'}; # if it is read, it is stuck here $clref = DXCluster->get_exact($ref->{to}); unless ($clref) { # otherwise look for a homenode - my $uref = DXUser->get($ref->{to}); + my $uref = DXUser->get_current($ref->{to}); my $hnode = $uref->homenode if $uref; $clref = DXCluster->get_exact($hnode) if $hnode; } - if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) { + if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all()) { next if $clref->call eq $main::mycall; # i.e. it lives here $noderef = $clref->{dxchan}; $ref->start_msg($noderef) if !get_busy($noderef->call) && $noderef->state eq 'normal'; @@ -738,9 +745,9 @@ sub init my $ref; # load various control files - print "load badmsg: ", (load_badmsg() or "Ok"), "\n"; - print "load forward: ", (load_forward() or "Ok"), "\n"; - print "load swop: ", (load_swop() or "Ok"), "\n"; + dbg('err', "load badmsg: " . (load_badmsg() or "Ok")); + dbg('err', "load forward: " . (load_forward() or "Ok")); + dbg('err', "load swop: " . (load_swop() or "Ok")); # read in the directory opendir($dir, $msgdir) or confess "can't open $msgdir $!"; @@ -749,10 +756,15 @@ sub init @msg = (); for (sort @dir) { - next unless /^m\d+$/o; + next unless /^m\d\d\d\d\d\d$/; $ref = read_msg_header("$msgdir/$_"); - next unless $ref; + unless ($ref) { + dbg('err', "Deleting $_"); + Log('err', "Deleting $_"); + unlink "$msgdir/$_"; + next; + } # delete any messages to 'badmsg.pl' places if (grep $ref->{to} eq $_, @badmsg) { @@ -846,13 +858,14 @@ sub do_send_stuff my $mycall = $main::mycall; $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'), uc $to, - $self->call, + exists $loc->{from} ? $loc->{from} : $self->call, $systime, $loc->{private}, $loc->{subject}, - $mycall, + exists $loc->{origin} ? $loc->{origin} : $mycall, '0', $loc->{rrreq}); + $ref->swop_it($self->call); $ref->store($loc->{lines}); $ref->add_dir(); push @out, $self->msg('m11', $ref->{msgno}, $to); @@ -1048,8 +1061,8 @@ sub import_msgs # are there any to do in this directory? return unless -d $importfn; unless (opendir(DIR, $importfn)) { - dbg('msg', "can't open $importfn $!"); - Log('msg', "can't open $importfn $!"); + dbg('msg', "can\'t open $importfn $!"); + Log('msg', "can\'t open $importfn $!"); return; } @@ -1058,18 +1071,19 @@ sub import_msgs my $name; foreach $name (@names) { next if $name =~ /^\./; + my $splitit = $name =~ /^split/; my $fn = "$importfn/$name"; next unless -f $fn; unless (open(MSG, $fn)) { - dbg('msg', "can't open import file $fn $!"); - Log('msg', "can't open import file $fn $!"); + dbg('msg', "can\'t open import file $fn $!"); + Log('msg', "can\'t open import file $fn $!"); unlink($fn); next; } my @msg = map { chomp; $_ } ; close(MSG); unlink($fn); - my @out = import_one($DXProt::me, \@msg); + my @out = import_one($DXProt::me, \@msg, $splitit); Log('msg', @out); } } @@ -1080,6 +1094,7 @@ sub import_one { my $dxchan = shift; my $ref = shift; + my $splitit = shift; my $private = '1'; my $rr = '0'; my $notincalls = 1; @@ -1091,7 +1106,7 @@ sub import_one # first line; my $line = shift @$ref; my @f = split /\s+/, $line; - unless ($f[0] =~ /^(:?S|SP|SB|SEND)$/ ) { + unless (@f && $f[0] =~ /^(:?S|SP|SB|SEND)$/ ) { my $m = "invalid first line in import '$line'"; dbg('MSG', $m ); return (1, $m); @@ -1109,12 +1124,14 @@ sub import_one $rr = '1'; } elsif ($f eq '@' && @f) { # this is bbs syntax, for origin $origin = uc shift @f; + } elsif ($f eq '<' && @f) { # this is bbs syntax for from call + $from = uc shift @f; } elsif ($f =~ /^\$/) { # this is bbs syntax for a bid next; - } elsif ($f =~ /^msg('m3', $f); } else { - push @to, $f; + push @to, $f; } } } - + # subject is the next line my $subject = shift @$ref; # strip off trailing lines - pop @$ref while (@$ref && ($$ref[-1] eq '' || $$ref[-1] =~ /^\s+$/)); - + pop @$ref while (@$ref && $$ref[-1] =~ /^\s*$/); + # strip off /EX or /ABORT - return ("aborted") if (@$ref && $$ref[-1] =~ m{^/ABORT$}i); + return ("aborted") if @$ref && $$ref[-1] =~ m{^/ABORT$}i; pop @$ref if (@$ref && $$ref[-1] =~ m{^/EX$}i); + # sort out any splitting that needs to be done + my @chunk; + if ($splitit) { + my $lth = 0; + my $lines = []; + for (@$ref) { + if ($lth >= $maxchunk || ($lth > $minchunk && /^\s*$/)) { + push @chunk, $lines; + $lines = []; + $lth = 0; + } + push @$lines, $_; + $lth += length; + } + push @chunk, $lines if @$lines; + } else { + push @chunk, $ref; + } + # write all the messages away - my $to; - foreach $to (@to) { - my $systime = $main::systime; - my $mycall = $main::mycall; - my $mref = DXMsg->alloc(DXMsg::next_transno('Msgno'), - $to, - $from, - $systime, - $private, - $subject, - $origin, - '0', - $rr); - $mref->store($ref); - $mref->add_dir(); - push @out, $dxchan->msg('m11', $mref->{msgno}, $to); - #push @out, "msgno $ref->{msgno} sent to $to"; - my $todxchan = DXChannel->get(uc $to); - if ($todxchan) { - if ($todxchan->is_user()) { - $todxchan->send($todxchan->msg('m9')); + my $i; + for ( $i = 0; $i < @chunk; $i++) { + my $chunk = $chunk[$i]; + my $ch_subject; + if (@chunk > 1) { + my $num = " [" . ($i+1) . "/" . scalar @chunk . "]"; + $ch_subject = substr($subject, 0, 27 - length $num) . $num; + } else { + $ch_subject = $subject; + } + my $to; + foreach $to (@to) { + my $systime = $main::systime; + my $mycall = $main::mycall; + my $mref = DXMsg->alloc(DXMsg::next_transno('Msgno'), + $to, + $from, + $systime, + $private, + $ch_subject, + $origin, + '0', + $rr); + $mref->swop_it($main::mycall); + $mref->store($chunk); + $mref->add_dir(); + push @out, $dxchan->msg('m11', $mref->{msgno}, $to); + #push @out, "msgno $ref->{msgno} sent to $to"; + my $todxchan = DXChannel->get(uc $to); + if ($todxchan) { + if ($todxchan->is_user()) { + $todxchan->send($todxchan->msg('m9')); + } } } } - return @out; } @@ -1194,6 +1241,9 @@ sub AUTOLOAD $name =~ s/.*:://o; confess "Non-existant field '$AUTOLOAD'" if !$valid{$name}; + # this clever line of code creates a subroutine which takes over from autoload + # from OO Perl - Conway + *{$AUTOLOAD} = sub {@_ > 1 ? $_[0]->{$name} = $_[1] : $_[0]->{$name}} ; @_ ? $self->{$name} = shift : $self->{$name} ; }