use strict;
use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
- @badmsg $badmsgfn $forwardfn @forward $timeout $waittime);
+ @badmsg $badmsgfn $forwardfn @forward $timeout $waittime
+ $queueinterval $lastq);
%work = (); # outstanding jobs
@msg = (); # messages we have
@forward = (); # msg forward table
$timeout = 30*60; # forwarding timeout
$waittime = 60*60; # time an aborted outgoing message waits before trying again
+$queueinterval = 5*60; # run the queue every 5 minutes
+$lastq = 0;
+
$badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store
$forwardfn = "$msgdir/forward.pl"; # the forwarding table
%valid = (
- fromnode => '9,From Node',
- tonode => '9,To Node',
+ fromnode => '5,From Node',
+ tonode => '5,To Node',
to => '0,To',
from => '0,From',
t => '0,Msg Time,cldatetime',
- private => '9,Private',
+ private => '5,Private',
subject => '0,Subject',
linesreq => '0,Lines per Gob',
- rrreq => '9,Read Confirm',
+ rrreq => '5,Read Confirm',
origin => '0,Origin',
lines => '5,Data',
stream => '9,Stream No',
- count => '9,Gob Linecnt',
- file => '9,File?,yesno',
- gotit => '9,Got it Nodes,parray',
- lines => '9,Lines,parray',
- 'read' => '9,Times read',
+ count => '5,Gob Linecnt',
+ file => '5,File?,yesno',
+ gotit => '5,Got it Nodes,parray',
+ lines => '5,Lines,parray',
+ 'read' => '5,Times read',
size => '0,Size',
msgno => '0,Msgno',
keep => '0,Keep this?,yesno',
- lastt => '9,Last processed,cldatetime',
- waitt => '9,Wait until,cldatetime',
+ lastt => '5,Last processed,cldatetime',
+ waitt => '5,Wait until,cldatetime',
);
sub DESTROY
my ($self, $line) = @_;
# this is periodic processing
- if (undef $self || undef $line) {
+ if (!$self || !$line) {
# wander down the work queue stopping any messages that have timed out
- for (keys %work) {
- my $ref = $work{$_};
- if ($main::systime > $ref->{lastt} + $timeout) {
- my $tonode = $ref->{tonode};
- $ref->stop_msg();
+ for (keys %busy) {
+ my $node = $_;
+ my $ref = $busy{$_};
+ if (exists $ref->{lastt} && $main::systime > $ref->{lastt} + $timeout) {
+ $ref->stop_msg($node);
# delay any outgoing messages that fail
- $ref->{waitt} = $main::systime + $waittime if $tonode ne $main::mycall;
+ $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall;
}
}
-
+
+ # queue some message if the interval timer has gone off
+ if ($main::systime > $lastq + $queueinterval) {
+ queue_msg(0);
+ $lastq = $main::systime;
+ }
+
# clean the message queue
clean_old() if $main::systime - $last_clean > 3600 ;
return;
if (exists $busy{$f[2]}) {
my $ref = $busy{$f[2]};
my $tonode = $ref->{tonode};
- $ref->stop_msg();
+ $ref->stop_msg($self->call);
}
my $t = cltounix($f[5], $f[6]);
$work{"$f[2]$stream"} = $ref; # store in work
$busy{$f[2]} = $ref; # set interlock
$self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
+ $ref->{lastt} = $main::systime;
last SWITCH;
}
my $m;
for $m (@msg) {
if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from}) {
- $ref->stop_msg();
+ $ref->stop_msg($self->call);
my $msgno = $m->{msgno};
dbg('msg', "duplicate message to $msgno\n");
Log('msg', "duplicate message to $msgno");
# look for 'bad' to addresses
if (grep $ref->{to} eq $_, @badmsg) {
- $ref->stop_msg();
+ $ref->stop_msg($self->call);
dbg('msg', "'Bad' TO address $ref->{to}");
Log('msg', "'Bad' TO address $ref->{to}");
return;
$ref->store($ref->{lines});
add_dir($ref);
my $dxchan = DXChannel->get($ref->{to});
- $dxchan->send($dxchan->msg('m9')) if $dxchan;
+ $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user;
Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
}
}
- $ref->stop_msg();
- queue_msg(0);
+ $ref->stop_msg($self->call);
} else {
$self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
}
- queue_msg(0);
+ # queue_msg(0);
last SWITCH;
}
push @{$ref->{gotit}}, $f[2]; # mark this up as being received
$ref->store($ref->{lines}); # re- store the file
}
- $ref->stop_msg();
+ $ref->stop_msg($self->call);
} else {
$self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
}
+
+ # send next one if present
queue_msg(0);
last SWITCH;
}
dbg('msg', "stream $f[3]: abort received\n");
my $ref = $work{"$f[2]$f[3]"};
if ($ref) {
- $ref->stop_msg();
+ $ref->stop_msg($self->call);
$ref = undef;
}
if ($pcno == 49) { # global delete on subject
for (@msg) {
- if ($_->{subject} eq $f[2]) {
+ if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
$_->del_msg();
- Log('msg', "Message $_->{msgno} fully deleted by $f[1]");
+ Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
+ DXProt::broadcast_ak1a($line, $self);
}
}
}
$self->{count} = 0;
$self->{tonode} = $dxchan->call;
$self->{fromnode} = $main::mycall;
- $busy{$dxchan->call} = $self;
- $work{"$self->{tonode}"} = $self;
+ $busy{$self->{tonode}} = $self;
+ $work{$self->{tonode}} = $self;
$dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq}));
}
# stop a message from continuing, clean it out, unlock interlocks etc
sub stop_msg
{
- my ($self, $dxchan) = @_;
- my $node = $self->{tonode}
+ my $self = shift;
+ my $node = shift;
my $stream = $self->{stream} if exists $self->{stream};
$loc->{lines} = [];
$self->state('sendbody');
#push @out, $self->msg('sendbody');
- push @out, $self->msg('m8');)
+ push @out, $self->msg('m8');
} elsif ($self->state eq 'sendbody') {
confess "local var gone missing" if !ref $self->{loc};
my $loc = $self->{loc};
delete $self->{loc};
$self->func(undef);
- DXMsg::queue_msg(0);
$self->state('prompt');
} elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
#push @out, $self->msg('sendabort');