]> dxcluster.net Git - spider.git/blob - perl/DXMsg.pm
d5f56c330a59945e944ad9b0a608bea0d0d65c7e
[spider.git] / perl / DXMsg.pm
1 #!/usr/bin/perl
2 #
3 # This module impliments the message handling for a dx cluster
4 #
5 # Copyright (c) 1998 Dirk Koopman G1TLH
6 #
7 # $Id$
8 #
9 #
10 # Notes for implementors:-
11 #
12 # PC28 field 11 is the RR required flag
13 # PC28 field 12 is a VIA routing (ie it is a node call) 
14 #
15
16 package DXMsg;
17
18 @ISA = qw(DXProt DXChannel);
19
20 use DXUtil;
21 use DXChannel;
22 use DXUser;
23 use DXM;
24 use DXCluster;
25 use DXProtVars;
26 use DXProtout;
27 use DXDebug;
28 use DXLog;
29 use IO::File;
30 use Fcntl;
31 use Carp;
32
33 use strict;
34 use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
35                         @badmsg $badmsgfn $forwardfn @forward $timeout $waittime
36                     $queueinterval $lastq);
37
38 %work = ();                                             # outstanding jobs
39 @msg = ();                                              # messages we have
40 %busy = ();                                             # station interlocks
41 $msgdir = "$main::root/msg";    # directory contain the msgs
42 $maxage = 30 * 86400;                   # the maximum age that a message shall live for if not marked 
43 $last_clean = 0;                                # last time we did a clean
44 @forward = ();                  # msg forward table
45 $timeout = 30*60;               # forwarding timeout
46 $waittime = 60*60;              # time an aborted outgoing message waits before trying again
47 $queueinterval = 2*60;          # run the queue every 2 minutes
48 $lastq = 0;
49
50
51 $badmsgfn = "$msgdir/badmsg.pl";  # list of TO address we wont store
52 $forwardfn = "$msgdir/forward.pl";  # the forwarding table
53
54 %valid = (
55                   fromnode => '5,From Node',
56                   tonode => '5,To Node',
57                   to => '0,To',
58                   from => '0,From',
59                   t => '0,Msg Time,cldatetime',
60                   private => '5,Private',
61                   subject => '0,Subject',
62                   linesreq => '0,Lines per Gob',
63                   rrreq => '5,Read Confirm',
64                   origin => '0,Origin',
65                   lines => '5,Data',
66                   stream => '9,Stream No',
67                   count => '5,Gob Linecnt',
68                   file => '5,File?,yesno',
69                   gotit => '5,Got it Nodes,parray',
70                   lines => '5,Lines,parray',
71                   'read' => '5,Times read',
72                   size => '0,Size',
73                   msgno => '0,Msgno',
74                   keep => '0,Keep this?,yesno',
75                   lastt => '5,Last processed,cldatetime',
76                   waitt => '5,Wait until,cldatetime',
77                  );
78
79 sub DESTROY
80 {
81         my $self = shift;
82         undef $self->{lines};
83         undef $self->{gotit};
84 }
85
86 # allocate a new object
87 # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper  
88 sub alloc                  
89 {
90         my $pkg = shift;
91         my $self = bless {}, $pkg;
92         $self->{msgno} = shift;
93         my $to = shift;
94         #  $to =~ s/-\d+$//o;
95         $self->{to} = ($to eq $main::mycall) ? $main::myalias : $to;
96         my $from = shift;
97         $from =~ s/-\d+$//o;
98         $self->{from} = uc $from;
99         $self->{t} = shift;
100         $self->{private} = shift;
101         $self->{subject} = shift;
102         $self->{origin} = shift;
103         $self->{'read'} = shift;
104         $self->{rrreq} = shift;
105         $self->{gotit} = [];
106         $self->{lastt} = $main::systime;
107     
108         return $self;
109 }
110
111 sub workclean
112 {
113         my $ref = shift;
114         delete $ref->{lines};
115         delete $ref->{linesreq};
116         delete $ref->{tonode};
117         delete $ref->{fromnode};
118         delete $ref->{stream};
119         delete $ref->{lines};
120         delete $ref->{file};
121         delete $ref->{count};
122         delete $ref->{lastt} if exists $ref->{lastt};
123         delete $ref->{waitt} if exists $ref->{waitt};
124 }
125
126 sub process
127 {
128         my ($self, $line) = @_;
129
130         # this is periodic processing
131         if (!$self || !$line) {
132
133                 # wander down the work queue stopping any messages that have timed out
134                 for (keys %busy) {
135                         my $node = $_;
136                         my $ref = $busy{$_};
137                         if (exists $ref->{lastt} && $main::systime > $ref->{lastt} + $timeout) {
138                                 dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node");
139                                 $ref->stop_msg($node);
140
141                                 # delay any outgoing messages that fail
142                                 $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall;
143                         }
144                 }
145
146                 # queue some message if the interval timer has gone off
147                 if ($main::systime > $lastq + $queueinterval) {
148                         queue_msg(0);
149                         $lastq = $main::systime;
150                 }
151
152                 # clean the message queue
153                 clean_old() if $main::systime - $last_clean > 3600 ;
154                 return;
155         }
156
157         my @f = split /\^/, $line;
158         my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
159
160  SWITCH: {
161                 if ($pcno == 28) {              # incoming message
162
163                         # first look for any messages in the busy queue 
164                         # and cancel them this should both resolve timed out incoming messages
165                         # and crossing of message between nodes, incoming messages have priority
166                         if (exists $busy{$f[2]}) {
167                                 my $ref = $busy{$f[2]};
168                                 my $tonode = $ref->{tonode};
169                                 dbg('msg', "Busy, stopping msgno: $ref->{msgno} -> $f[2]");
170                                 $ref->stop_msg($self->call);
171                         }
172
173                         my $t = cltounix($f[5], $f[6]);
174                         my $stream = next_transno($f[2]);
175                         my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
176                         
177                         # fill in various forwarding state variables
178                         $ref->{fromnode} = $f[2];
179                         $ref->{tonode} = $f[1];
180                         $ref->{rrreq} = $f[11];
181                         $ref->{linesreq} = $f[10];
182                         $ref->{stream} = $stream;
183                         $ref->{count} = 0;      # no of lines between PC31s
184                         dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
185                         $work{"$f[2]$stream"} = $ref; # store in work
186                         $busy{$f[2]} = $ref; # set interlock
187                         $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
188                         $ref->{lastt} = $main::systime;
189
190                         # look to see whether this is a non private message sent to a known callsign
191                         my $uref = DXUser->get_current($ref->{to});
192                         if (iscallsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) {
193                                 $ref->{private} = 1;
194                                 dbg('msg', "set bull to $ref->{to} to private");
195                         }
196                         last SWITCH;
197                 }
198                 
199                 if ($pcno == 29) {              # incoming text
200                         my $ref = $work{"$f[2]$f[3]"};
201                         if ($ref) {
202                                 push @{$ref->{lines}}, $f[4];
203                                 $ref->{count}++;
204                                 if ($ref->{count} >= $ref->{linesreq}) {
205                                         $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
206                                         dbg('msg', "stream $f[3]: $ref->{count} lines received\n");
207                                         $ref->{count} = 0;
208                                 }
209                                 $ref->{lastt} = $main::systime;
210                         } else {
211                                 dbg('msg', "PC29 from unknown stream $f[3] from $f[2]" );
212                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
213                         }
214                         last SWITCH;
215                 }
216                 
217                 if ($pcno == 30) {              # this is a incoming subject ack
218                         my $ref = $work{$f[2]}; # note no stream at this stage
219                         if ($ref) {
220                                 delete $work{$f[2]};
221                                 $ref->{stream} = $f[3];
222                                 $ref->{count} = 0;
223                                 $ref->{linesreq} = 5;
224                                 $work{"$f[2]$f[3]"} = $ref;     # new ref
225                                 dbg('msg', "incoming subject ack stream $f[3]\n");
226                                 $busy{$f[2]} = $ref; # interlock
227                                 $ref->{lines} = [];
228                                 push @{$ref->{lines}}, ($ref->read_msg_body);
229                                 $ref->send_tranche($self);
230                                 $ref->{lastt} = $main::systime;
231                         } else {
232                                 dbg('msg', "PC30 from unknown stream $f[3] from $f[2]" );
233                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
234                         } 
235                         last SWITCH;
236                 }
237                 
238                 if ($pcno == 31) {              # acknowledge a tranche of lines
239                         my $ref = $work{"$f[2]$f[3]"};
240                         if ($ref) {
241                                 dbg('msg', "tranche ack stream $f[3]\n");
242                                 $ref->send_tranche($self);
243                                 $ref->{lastt} = $main::systime;
244                         } else {
245                                 dbg('msg', "PC31 from unknown stream $f[3] from $f[2]" );
246                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
247                         } 
248                         last SWITCH;
249                 }
250                 
251                 if ($pcno == 32) {              # incoming EOM
252                         dbg('msg', "stream $f[3]: EOM received\n");
253                         my $ref = $work{"$f[2]$f[3]"};
254                         if ($ref) {
255                                 $self->send(DXProt::pc33($f[2], $f[1], $f[3])); # acknowledge it
256                                 
257                                 # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
258                                 # store the file or message
259                                 # remove extraneous rubbish from the hash
260                                 # remove it from the work in progress vector
261                                 # stuff it on the msg queue
262                                 if ($ref->{lines} && @{$ref->{lines}} > 0) { # ignore messages with 0 lines
263                                         if ($ref->{file}) {
264                                                 $ref->store($ref->{lines});
265                                         } else {
266
267                                                 # does an identical message already exist?
268                                                 my $m;
269                                                 for $m (@msg) {
270                                                         if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) {
271                                                                 $ref->stop_msg($self->call);
272                                                                 my $msgno = $m->{msgno};
273                                                                 dbg('msg', "duplicate message to $msgno\n");
274                                                                 Log('msg', "duplicate message to $msgno");
275                                                                 return;
276                                                         }
277                                                 }
278                                                         
279                                                 # look for 'bad' to addresses 
280                                                 if (grep $ref->{to} eq $_, @badmsg) {
281                                                         $ref->stop_msg($self->call);
282                                                         dbg('msg', "'Bad' TO address $ref->{to}");
283                                                         Log('msg', "'Bad' TO address $ref->{to}");
284                                                         return;
285                                                 }
286
287                                                 $ref->{msgno} = next_transno("Msgno");
288                                                 push @{$ref->{gotit}}, $f[2]; # mark this up as being received
289                                                 $ref->store($ref->{lines});
290                                                 add_dir($ref);
291                                                 my $dxchan = DXChannel->get($ref->{to});
292                                                 $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user;
293                                                 Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
294                                         }
295                                 }
296                                 $ref->stop_msg($self->call);
297                         } else {
298                                 dbg('msg', "PC32 from unknown stream $f[3] from $f[2]" );
299                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
300                         }
301                         # queue_msg(0);
302                         last SWITCH;
303                 }
304                 
305                 if ($pcno == 33) {              # acknowledge the end of message
306                         my $ref = $work{"$f[2]$f[3]"};
307                         if ($ref) {
308                                 if ($ref->{private}) { # remove it if it private and gone off site#
309                                         Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2] and deleted");
310                                         $ref->del_msg;
311                                 } else {
312                                         Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2]");
313                                         push @{$ref->{gotit}}, $f[2]; # mark this up as being received
314                                         $ref->store($ref->{lines});     # re- store the file
315                                 }
316                                 $ref->stop_msg($self->call);
317                         } else {
318                                 dbg('msg', "PC33 from unknown stream $f[3] from $f[2]" );
319                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
320                         } 
321
322                         # send next one if present
323                         queue_msg(0);
324                         last SWITCH;
325                 }
326                 
327                 if ($pcno == 40) {              # this is a file request
328                         $f[3] =~ s/\\/\//og; # change the slashes
329                         $f[3] =~ s/\.//og;      # remove dots
330                         $f[3] =~ s/^\///o;   # remove the leading /
331                         $f[3] = lc $f[3];       # to lower case;
332                         dbg('msg', "incoming file $f[3]\n");
333                         $f[3] = 'packclus/' . $f[3] unless $f[3] =~ /^packclus\//o;
334                         
335                         # create any directories
336                         my @part = split /\//, $f[3];
337                         my $part;
338                         my $fn = "$main::root";
339                         pop @part;                      # remove last part
340                         foreach $part (@part) {
341                                 $fn .= "/$part";
342                                 next if -e $fn;
343                                 last SWITCH if !mkdir $fn, 0777;
344                                 dbg('msg', "created directory $fn\n");
345                         }
346                         my $stream = next_transno($f[2]);
347                         my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0');
348                         
349                         # forwarding variables
350                         $ref->{fromnode} = $f[1];
351                         $ref->{tonode} = $f[2];
352                         $ref->{linesreq} = $f[5];
353                         $ref->{stream} = $stream;
354                         $ref->{count} = 0;      # no of lines between PC31s
355                         $ref->{file} = 1;
356                         $ref->{lastt} = $main::systime;
357                         $work{"$f[2]$stream"} = $ref; # store in work
358                         $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack 
359                         
360                         last SWITCH;
361                 }
362                 
363                 if ($pcno == 42) {              # abort transfer
364                         dbg('msg', "stream $f[3]: abort received\n");
365                         my $ref = $work{"$f[2]$f[3]"};
366                         if ($ref) {
367                                 $ref->stop_msg($self->call);
368                                 $ref = undef;
369                         }
370                         
371                         last SWITCH;
372                 }
373
374                 if ($pcno == 49) {      # global delete on subject
375                         for (@msg) {
376                                 if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
377                                         $_->del_msg();
378                                         Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
379                                         DXProt::broadcast_ak1a($line, $self);
380                                 }
381                         }
382                 }
383         }
384 }
385
386
387 # store a message away on disc or whatever
388 #
389 # NOTE the second arg is a REFERENCE not a list
390 sub store
391 {
392         my $ref = shift;
393         my $lines = shift;
394         
395         # we only proceed if there are actually any lines in the file
396 #       if (!$lines || @{$lines} == 0) {
397 #               return;
398 #       }
399         
400         if ($ref->{file}) {                     # a file
401                 dbg('msg', "To be stored in $ref->{to}\n");
402                 
403                 my $fh = new IO::File "$ref->{to}", "w";
404                 if (defined $fh) {
405                         my $line;
406                         foreach $line (@{$lines}) {
407                                 print $fh "$line\n";
408                         }
409                         $fh->close;
410                         dbg('msg', "file $ref->{to} stored\n");
411                         Log('msg', "file $ref->{to} from $ref->{from} stored" );
412                 } else {
413                         confess "can't open file $ref->{to} $!";  
414                 }
415         } else {                                        # a normal message
416
417                 # attempt to open the message file
418                 my $fn = filename($ref->{msgno});
419                 
420                 dbg('msg', "To be stored in $fn\n");
421                 
422                 # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem)
423                 my $fh = new IO::File "$fn", "w";
424                 if (defined $fh) {
425                         my $rr = $ref->{rrreq} ? '1' : '0';
426                         my $priv = $ref->{private} ? '1': '0';
427                         print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{'read'}^$rr\n";
428                         print $fh "=== ", join('^', @{$ref->{gotit}}), "\n";
429                         my $line;
430                         $ref->{size} = 0;
431                         foreach $line (@{$lines}) {
432                                 $ref->{size} += (length $line) + 1;
433                                 print $fh "$line\n";
434                         }
435                         $fh->close;
436                         dbg('msg', "msg $ref->{msgno} stored\n");
437                         Log('msg', "msg $ref->{msgno} from $ref->{from} to $ref->{to} stored" );
438                 } else {
439                         confess "can't open msg file $fn $!";  
440                 }
441         }
442 }
443
444 # delete a message
445 sub del_msg
446 {
447         my $self = shift;
448         
449         # remove it from the active message list
450         @msg = map { $_ != $self ? $_ : () } @msg;
451         
452         # belt and braces (one day I will ask someone if this is REALLY necessary)
453         delete $self->{gotit};
454         delete $self->{list};
455         
456         # remove the file
457         unlink filename($self->{msgno});
458         dbg('msg', "deleting $self->{msgno}\n");
459 }
460
461 # clean out old messages from the message queue
462 sub clean_old
463 {
464         my $ref;
465         
466         # mark old messages for deletion
467         foreach $ref (@msg) {
468                 if (!$ref->{keep} && $ref->{t} < $main::systime - $maxage) {
469                         $ref->{deleteme} = 1;
470                         delete $ref->{gotit};
471                         delete $ref->{list};
472                         unlink filename($ref->{msgno});
473                         dbg('msg', "deleting old $ref->{msgno}\n");
474                 }
475         }
476         
477         # remove them all from the active message list
478         @msg = map { $_->{deleteme} ? () : $_ } @msg;
479         $last_clean = $main::systime;
480 }
481
482 # read in a message header
483 sub read_msg_header
484
485         my $fn = shift;
486         my $file;
487         my $line;
488         my $ref;
489         my @f;
490         my $size;
491         
492         $file = new IO::File;
493         if (!open($file, $fn)) {
494                 print "Error reading $fn $!\n";
495                 return undef;
496         }
497         $size = -s $fn;
498         $line = <$file>;                        # first line
499         chomp $line;
500         $size -= length $line;
501         if (! $line =~ /^===/o) {
502                 print "corrupt first line in $fn ($line)\n";
503                 return undef;
504         }
505         $line =~ s/^=== //o;
506         @f = split /\^/, $line;
507         $ref = DXMsg->alloc(@f);
508         
509         $line = <$file>;                        # second line
510         chomp $line;
511         $size -= length $line;
512         if (! $line =~ /^===/o) {
513                 print "corrupt second line in $fn ($line)\n";
514                 return undef;
515         }
516         $line =~ s/^=== //o;
517         $ref->{gotit} = [];
518         @f = split /\^/, $line;
519         push @{$ref->{gotit}}, @f;
520         $ref->{size} = $size;
521         
522         close($file);
523         
524         return $ref;
525 }
526
527 # read in a message header
528 sub read_msg_body
529 {
530         my $self = shift;
531         my $msgno = $self->{msgno};
532         my $file;
533         my $line;
534         my $fn = filename($msgno);
535         my @out;
536         
537         $file = new IO::File;
538         if (!open($file, $fn)) {
539                 print "Error reading $fn $!\n";
540                 return undef;
541         }
542         chomp (@out = <$file>);
543         close($file);
544         
545         shift @out if $out[0] =~ /^=== /;
546         shift @out if $out[0] =~ /^=== /;
547         return @out;
548 }
549
550 # send a tranche of lines to the other end
551 sub send_tranche
552 {
553         my ($self, $dxchan) = @_;
554         my @out;
555         my $to = $self->{tonode};
556         my $from = $self->{fromnode};
557         my $stream = $self->{stream};
558         my $lines = $self->{lines};
559         my ($c, $i);
560         
561         for ($i = 0, $c = $self->{count}; $i < $self->{linesreq} && $c < @$lines; $i++, $c++) {
562                 push @out, DXProt::pc29($to, $from, $stream, $lines->[$c]);
563     }
564     $self->{count} = $c;
565
566     push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq};
567         $dxchan->send(@out);
568 }
569
570         
571 # find a message to send out and start the ball rolling
572 sub queue_msg
573 {
574         my $sort = shift;
575         my $call = shift;
576         my $ref;
577         my $clref;
578         my $dxchan;
579         my @nodelist = DXProt::get_all_ak1a();
580         
581         # bat down the message list looking for one that needs to go off site and whose
582         # nearest node is not busy.
583
584         dbg('msg', "queue msg ($sort)\n");
585         foreach $ref (@msg) {
586                 # firstly, is it private and unread? if so can I find the recipient
587                 # in my cluster node list offsite?
588
589                 # ignore 'delayed' messages until their waiting time has expired
590                 if (exists $ref->{waitt}) {
591                         next if $ref->{waitt} > $main::systime;
592                         delete $ref->{waitt};
593                 } 
594                 
595                 if ($ref->{private}) {
596                         if ($ref->{'read'} == 0) {
597                                 $clref = DXCluster->get_exact($ref->{to});
598                                 unless ($clref) {             # otherwise look for a homenode
599                                         my $uref = DXUser->get($ref->{to});
600                                         my $hnode =  $uref->homenode if $uref;
601                                         $clref = DXCluster->get_exact($hnode) if $hnode;
602                                 }
603                                 if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) {
604                                         $dxchan = $clref->{dxchan};
605                                         $ref->start_msg($dxchan) if $dxchan && $clref && !get_busy($dxchan->call) && $dxchan->state eq 'normal';
606                                 }
607                         }
608                 } elsif (!$sort) {
609                         # otherwise we are dealing with a bulletin, compare the gotit list with
610                         # the nodelist up above, if there are sites that haven't got it yet
611                         # then start sending it - what happens when we get loops is anyone's
612                         # guess, use (to, from, time, subject) tuple?
613                         my $noderef;
614                         foreach $noderef (@nodelist) {
615                                 next if $noderef->call eq $main::mycall;
616                                 next if grep { $_ eq $noderef->call } @{$ref->{gotit}};
617                                 next unless $ref->forward_it($noderef->call);           # check the forwarding file
618                                 # next if $noderef->isolate;               # maybe add code for stuff originated here?
619                                 # next if DXUser->get( ${$ref->{gotit}}[0] )->isolate;  # is the origin isolated?
620                                 
621                                 # if we are here we have a node that doesn't have this message
622                                 $ref->start_msg($noderef) if !get_busy($noderef->call)  && $noderef->state eq 'normal';
623                                 last;
624                         }
625                 }
626                 
627                 # if all the available nodes are busy then stop
628                 last if @nodelist == scalar grep { get_busy($_->call) } @nodelist;
629         }
630 }
631
632 # is there a message for me?
633 sub for_me
634 {
635         my $call = uc shift;
636         my $ref;
637         
638         foreach $ref (@msg) {
639                 # is it for me, private and unread? 
640                 if ($ref->{to} eq $call && $ref->{private}) {
641                         return 1 if !$ref->{'read'};
642                 }
643         }
644         return 0;
645 }
646
647 # start the message off on its travels with a PC28
648 sub start_msg
649 {
650         my ($self, $dxchan) = @_;
651         
652         dbg('msg', "start msg $self->{msgno}\n");
653         $self->{linesreq} = 5;
654         $self->{count} = 0;
655         $self->{tonode} = $dxchan->call;
656         $self->{fromnode} = $main::mycall;
657         $busy{$self->{tonode}} = $self;
658         $work{$self->{tonode}} = $self;
659         $self->{lastt} = $main::systime;
660         $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq}));
661 }
662
663 # get the ref of a busy node
664 sub get_busy
665 {
666         my $call = shift;
667         return $busy{$call};
668 }
669
670 # get the busy queue
671 sub get_all_busy
672 {
673         return values %busy;
674 }
675
676 # get the forwarding queue
677 sub get_fwq
678 {
679         return values %work;
680 }
681
682 # stop a message from continuing, clean it out, unlock interlocks etc
683 sub stop_msg
684 {
685         my $self = shift;
686         my $node = shift;
687         my $stream = $self->{stream} if exists $self->{stream};
688         
689         
690         dbg('msg', "stop msg $self->{msgno} -> node $node\n");
691         delete $work{$node};
692         delete $work{"$node$stream"} if $stream;
693         $self->workclean;
694         delete $busy{$node};
695 }
696
697 # get a new transaction number from the file specified
698 sub next_transno
699 {
700         my $name = shift;
701         $name =~ s/\W//og;                      # remove non-word characters
702         my $fn = "$msgdir/$name";
703         my $msgno;
704         
705         my $fh = new IO::File;
706         if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) {
707                 $fh->autoflush(1);
708                 $msgno = $fh->getline;
709                 chomp $msgno;
710                 $msgno++;
711                 seek $fh, 0, 0;
712                 $fh->print("$msgno\n");
713                 dbg('msg', "msgno $msgno allocated for $name\n");
714                 $fh->close;
715         } else {
716                 confess "can't open $fn $!";
717         }
718         return $msgno;
719 }
720
721 # initialise the message 'system', read in all the message headers
722 sub init
723 {
724         my $dir = new IO::File;
725         my @dir;
726         my $ref;
727
728         # load various control files
729         my @in = load_badmsg();
730         print "@in\n" if @in;
731         @in = load_forward();
732         print "@in\n" if @in;
733
734         # read in the directory
735         opendir($dir, $msgdir) or confess "can't open $msgdir $!";
736         @dir = readdir($dir);
737         closedir($dir);
738
739         @msg = ();
740         for (sort @dir) {
741                 next unless /^m\d+$/o;
742                 
743                 $ref = read_msg_header("$msgdir/$_");
744                 next unless $ref;
745                 
746                 # delete any messages to 'badmsg.pl' places
747                 if (grep $ref->{to} eq $_, @badmsg) {
748                         dbg('msg', "'Bad' TO address $ref->{to}");
749                         Log('msg', "'Bad' TO address $ref->{to}");
750                         $ref->del_msg;
751                         next;
752                 }
753
754                 # add the message to the available queue
755                 add_dir($ref); 
756         }
757 }
758
759 # add the message to the directory listing
760 sub add_dir
761 {
762         my $ref = shift;
763         confess "tried to add a non-ref to the msg directory" if !ref $ref;
764         push @msg, $ref;
765 }
766
767 # return all the current messages
768 sub get_all
769 {
770         return @msg;
771 }
772
773 # get a particular message
774 sub get
775 {
776         my $msgno = shift;
777         for (@msg) {
778                 return $_ if $_->{msgno} == $msgno;
779                 last if $_->{msgno} > $msgno;
780         }
781         return undef;
782 }
783
784 # return the official filename for a message no
785 sub filename
786 {
787         return sprintf "$msgdir/m%06d", shift;
788 }
789
790 #
791 # return a list of valid elements 
792
793
794 sub fields
795 {
796         return keys(%valid);
797 }
798
799 #
800 # return a prompt for a field
801 #
802
803 sub field_prompt
804
805         my ($self, $ele) = @_;
806         return $valid{$ele};
807 }
808
809 #
810 # send a message state machine
811 sub do_send_stuff
812 {
813         my $self = shift;
814         my $line = shift;
815         my @out;
816         
817         if ($self->state eq 'send1') {
818                 #  $DB::single = 1;
819                 confess "local var gone missing" if !ref $self->{loc};
820                 my $loc = $self->{loc};
821                 $loc->{subject} = $line;
822                 $loc->{lines} = [];
823                 $self->state('sendbody');
824                 #push @out, $self->msg('sendbody');
825                 push @out, $self->msg('m8');
826         } elsif ($self->state eq 'sendbody') {
827                 confess "local var gone missing" if !ref $self->{loc};
828                 my $loc = $self->{loc};
829                 if ($line eq "\032" || uc $line eq "/EX") {
830                         my $to;
831                         
832                         if (@{$loc->{lines}} > 0) {
833                                 foreach $to (@{$loc->{to}}) {
834                                         my $ref;
835                                         my $systime = $main::systime;
836                                         my $mycall = $main::mycall;
837                                         $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'),
838                                                                                 uc $to,
839                                                                                 $self->call, 
840                                                                                 $systime,
841                                                                                 $loc->{private}, 
842                                                                                 $loc->{subject}, 
843                                                                                 $mycall,
844                                                                                 '0',
845                                                                                 $loc->{rrreq});
846                                         $ref->store($loc->{lines});
847                                         $ref->add_dir();
848                                         push @out, $self->msg('m11', $ref->{msgno}, $to);
849                                         #push @out, "msgno $ref->{msgno} sent to $to";
850                                         my $dxchan = DXChannel->get(uc $to);
851                                         if ($dxchan) {
852                                                 if ($dxchan->is_user()) {
853                                                         $dxchan->send($dxchan->msg('m9'));
854                                                 }
855                                         }
856                                 }
857                         }
858                         delete $loc->{lines};
859                         delete $loc->{to};
860                         delete $self->{loc};
861                         $self->func(undef);
862                         
863                         $self->state('prompt');
864                 } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
865                         #push @out, $self->msg('sendabort');
866                         push @out, $self->msg('m10');
867                         delete $loc->{lines};
868                         delete $loc->{to};
869                         delete $self->{loc};
870                         $self->func(undef);
871                         $self->state('prompt');
872                 } else {
873                         
874                         # i.e. it ain't and end or abort, therefore store the line
875                         push @{$loc->{lines}}, length($line) > 0 ? $line : " ";
876                 }
877         }
878         return (1, @out);
879 }
880
881 # return the standard directory line for this ref 
882 sub dir
883 {
884         my $ref = shift;
885         return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s", 
886                 $ref->msgno, $ref->read ? '-' : ' ', $ref->private ? 'p' : ' ', $ref->size,
887                         $ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject;
888 }
889
890 # load the forward table
891 sub load_forward
892 {
893         my @out;
894         do "$forwardfn" if -e "$forwardfn";
895         push @out, $@ if $@;
896         return @out;
897 }
898
899 # load the bad message table
900 sub load_badmsg
901 {
902         my @out;
903         do "$badmsgfn" if -e "$badmsgfn";
904         push @out, $@ if $@;
905         return @out;
906 }
907
908 #
909 # forward that message or not according to the forwarding table
910 # returns 1 for forward, 0 - to ignore
911 #
912
913 sub forward_it
914 {
915         my $ref = shift;
916         my $call = shift;
917         my $i;
918         
919         for ($i = 0; $i < @forward; $i += 5) {
920                 my ($sort, $field, $pattern, $action, $bbs) = @forward[$i..($i+4)]; 
921                 my $tested;
922                 
923                 # are we interested?
924                 last if $ref->{private} && $sort ne 'P';
925                 last if !$ref->{private} && $sort ne 'B';
926                 
927                 # select field
928                 $tested = $ref->{to} if $field eq 'T';
929                 $tested = $ref->{from} if $field eq 'F';
930                 $tested = $ref->{origin} if $field eq 'O';
931                 $tested = $ref->{subject} if $field eq 'S';
932
933                 if (!$pattern || $tested =~ m{$pattern}i) {
934                         return 0 if $action eq 'I';
935                         return 1 if !$bbs || grep $_ eq $call, @{$bbs};
936                 }
937         }
938         return 0;
939 }
940
941 no strict;
942 sub AUTOLOAD
943 {
944         my $self = shift;
945         my $name = $AUTOLOAD;
946         return if $name =~ /::DESTROY$/;
947         $name =~ s/.*:://o;
948         
949         confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
950         @_ ? $self->{$name} = shift : $self->{$name} ;
951 }
952
953 1;
954
955 __END__