Prepare for git repository
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14
15 use IO::Select;
16 use IO::Socket;
17 use DXDebug;
18 use Timer;
19
20 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum $total_in $total_out);
21
22 %rd_callbacks = ();
23 %wt_callbacks = ();
24 %er_callbacks = ();
25 $rd_handles   = IO::Select->new();
26 $wt_handles   = IO::Select->new();
27 $er_handles   = IO::Select->new();
28 $total_in = $total_out = 0;
29
30 $now = time;
31
32 BEGIN {
33     # Checks if blocking is supported
34     eval {
35                 local $^W;
36         require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL))
37     };
38         if ($@ || $main::is_win) {
39                 $blocking_supported = IO::Socket->can('blocking') ? 2 : 0;
40         } else {
41                 $blocking_supported = IO::Socket->can('blocking') ? 2 : 1;
42         }
43
44
45         # import as many of these errno values as are available
46         eval {
47                 local $^W;
48                 require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
49         };
50
51         unless ($^O eq 'MSWin32') {
52                 if ($] >= 5.6) {
53                         eval {
54                                 local $^W;
55                                 require Socket; Socket->import(qw(IPPROTO_TCP TCP_NODELAY));
56                         };
57                 } else {
58                         dbg("IPPROTO_TCP and TCP_NODELAY manually defined");
59                         eval 'sub IPPROTO_TCP {     6 };';
60                         eval 'sub TCP_NODELAY {     1 };';
61                 }
62         }
63         # http://support.microsoft.com/support/kb/articles/Q150/5/37.asp
64         # defines EINPROGRESS as 10035.  We provide it here because some
65         # Win32 users report POSIX::EINPROGRESS is not vendor-supported.
66         if ($^O eq 'MSWin32') { 
67                 eval '*EINPROGRESS = sub { 10036 };';
68                 eval '*EWOULDBLOCK = *EAGAIN = sub { 10035 };';
69                 eval '*F_GETFL     = sub {     0 };';
70                 eval '*F_SETFL     = sub {     0 };';
71                 eval '*IPPROTO_TCP     = sub {     6 };';
72                 eval '*TCP_NODELAY     = sub {     1 };';
73                 $blocking_supported = 0;   # it appears that this DOESN'T work :-(
74         } 
75 }
76
77 my $w = $^W;
78 $^W = 0;
79 my $eagain = eval {EAGAIN()};
80 my $einprogress = eval {EINPROGRESS()};
81 my $ewouldblock = eval {EWOULDBLOCK()};
82 $^W = $w;
83 $cnum = 0;
84
85
86 #
87 #-----------------------------------------------------------------
88 # Generalised initializer
89
90 sub new
91 {
92     my ($pkg, $rproc) = @_;
93         my $obj = ref($pkg);
94         my $class = $obj || $pkg;
95
96     my $conn = {
97         rproc => $rproc,
98                 inqueue => [],
99                 outqueue => [],
100                 state => 0,
101                 lineend => "\r\n",
102                 csort => 'telnet',
103                 timeval => 60,
104                 blocking => 0,
105                 cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)),
106     };
107
108         $noconns++;
109         
110         dbg("Connection created ($noconns)") if isdbg('connll');
111         return bless $conn, $class;
112 }
113
114 sub set_error
115 {
116         my $conn = shift;
117         my $callback = shift;
118         $conn->{eproc} = $callback;
119         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
120 }
121
122 sub set_rproc
123 {
124         my $conn = shift;
125         my $callback = shift;
126         $conn->{rproc} = $callback;
127 }
128
129 sub blocking
130 {
131         return unless $blocking_supported;
132
133         # Make the handle stop blocking, the Windows way.
134         if ($blocking_supported) { 
135                 $_[0]->blocking($_[1]);
136         } else {
137                 my $flags = fcntl ($_[0], F_GETFL, 0);
138                 if ($_[1]) {
139                         $flags &= ~O_NONBLOCK;
140                 } else {
141                         $flags |= O_NONBLOCK;
142                 }
143                 fcntl ($_[0], F_SETFL, $flags);
144         }
145 }
146
147 # save it
148 sub conns
149 {
150         my $pkg = shift;
151         my $call = shift;
152         my $ref;
153         
154         if (ref $pkg) {
155                 $call = $pkg->{call} unless $call;
156                 return undef unless $call;
157                 dbg("changing $pkg->{call} to $call") if isdbg('connll') && exists $pkg->{call} && $call ne $pkg->{call};
158                 delete $conns{$pkg->{call}} if exists $pkg->{call} && exists $conns{$pkg->{call}} && $pkg->{call} ne $call; 
159                 $pkg->{call} = $call;
160                 $ref = $conns{$call} = $pkg;
161                 dbg("Connection $pkg->{cnum} $call stored") if isdbg('connll');
162         } else {
163                 $ref = $conns{$call};
164         }
165         return $ref;
166 }
167
168 # this is only called by any dependent processes going away unexpectedly
169 sub pid_gone
170 {
171         my ($pkg, $pid) = @_;
172         
173         my @pid = grep {$_->{pid} == $pid} values %conns;
174         foreach my $p (@pid) {
175                 &{$p->{eproc}}($p, "$pid has gorn") if exists $p->{eproc};
176                 $p->disconnect;
177         }
178 }
179
180 #-----------------------------------------------------------------
181 # Send side routines
182 sub connect {
183     my ($pkg, $to_host, $to_port, $rproc) = @_;
184
185     # Create a connection end-point object
186     my $conn = $pkg;
187         unless (ref $pkg) {
188                 $conn = $pkg->new($rproc);
189         }
190         $conn->{peerhost} = $to_host;
191         $conn->{peerport} = $to_port;
192         $conn->{sort} = 'Outgoing';
193         
194     # Create a new internet socket
195     my $sock = IO::Socket::INET->new();
196     return undef unless $sock;
197         
198         my $proto = getprotobyname('tcp');
199         $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
200         
201         blocking($sock, 0);
202         $conn->{blocking} = 0;
203
204         # does the host resolve?
205         my $ip = gethostbyname($to_host);
206         return undef unless $ip;
207         
208         my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
209         return undef unless $r || _err_will_block($!);
210         
211         $conn->{sock} = $sock;
212     
213     if ($conn->{rproc}) {
214         my $callback = sub {$conn->_rcv};
215         set_event_handler ($sock, read => $callback);
216     }
217     return $conn;
218 }
219
220 sub start_program
221 {
222         my ($conn, $line, $sort) = @_;
223         my $pid;
224         
225         local $^F = 10000;              # make sure it ain't closed on exec
226         my ($a, $b) = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
227         if ($a && $b) {
228                 $a->autoflush(1);
229                 $b->autoflush(1);
230                 $pid = fork;
231                 if (defined $pid) {
232                         if ($pid) {
233                                 close $b;
234                                 $conn->{sock} = $a;
235                                 $conn->{csort} = $sort;
236                                 $conn->{lineend} = "\cM" if $sort eq 'ax25';
237                                 $conn->{pid} = $pid;
238                                 if ($conn->{rproc}) {
239                                         my $callback = sub {$conn->_rcv};
240                                         Msg::set_event_handler ($a, read => $callback);
241                                 }
242                                 dbg("connect $conn->{cnum}: started pid: $conn->{pid} as $line") if isdbg('connect');
243                         } else {
244                                 $^W = 0;
245                                 dbgclose();
246                                 STDIN->close;
247                                 STDOUT->close;
248                                 STDOUT->close;
249                                 *STDIN = IO::File->new_from_fd($b, 'r') or die;
250                                 *STDOUT = IO::File->new_from_fd($b, 'w') or die;
251                                 *STDERR = IO::File->new_from_fd($b, 'w') or die;
252                                 close $a;
253                                 unless ($main::is_win) {
254                                         #                                               $SIG{HUP} = 'IGNORE';
255                                         $SIG{HUP} = $SIG{CHLD} = $SIG{TERM} = $SIG{INT} = 'DEFAULT';
256                                         alarm(0);
257                                 }
258                                 exec "$line" or dbg("exec '$line' failed $!");
259                         } 
260                 } else {
261                         dbg("cannot fork for $line");
262                 }
263         } else {
264                 dbg("no socket pair $! for $line");
265         }
266         return $pid;
267 }
268
269 sub disconnect 
270 {
271     my $conn = shift;
272         return if exists $conn->{disconnecting};
273
274         $conn->{disconnecting} = 1;
275     my $sock = delete $conn->{sock};
276         $conn->{state} = 'E';
277         $conn->{timeout}->del if $conn->{timeout};
278
279         # be careful to delete the correct one
280         my $call;
281         if ($call = $conn->{call}) {
282                 my $ref = $conns{$call};
283                 delete $conns{$call} if $ref && $ref == $conn;
284         }
285         $call ||= 'unallocated';
286         dbg("Connection $conn->{cnum} $call disconnected") if isdbg('connll');
287         
288         # get rid of any references
289         for (keys %$conn) {
290                 if (ref($conn->{$_})) {
291                         delete $conn->{$_};
292                 }
293         }
294
295         if (defined($sock)) {
296                 set_event_handler ($sock, read => undef, write => undef, error => undef);
297                 shutdown($sock, 3);
298                 close($sock);
299         }
300         
301         unless ($main::is_win) {
302                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
303         }
304 }
305
306 sub send_now {
307     my ($conn, $msg) = @_;
308     $conn->enqueue($msg);
309     $conn->_send (1); # 1 ==> flush
310 }
311
312 sub send_later {
313     my ($conn, $msg) = @_;
314     $conn->enqueue($msg);
315     my $sock = $conn->{sock};
316     return unless defined($sock);
317     set_event_handler ($sock, write => sub {$conn->_send(0)});
318 }
319
320 sub enqueue {
321     my $conn = shift;
322     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
323 }
324
325 sub _send {
326     my ($conn, $flush) = @_;
327     my $sock = $conn->{sock};
328     return unless defined($sock);
329     my $rq = $conn->{outqueue};
330
331     # If $flush is set, set the socket to blocking, and send all
332     # messages in the queue - return only if there's an error
333     # If $flush is 0 (deferred mode) make the socket non-blocking, and
334     # return to the event loop only after every message, or if it
335     # is likely to block in the middle of a message.
336
337 #       if ($conn->{blocking} != $flush) {
338 #               blocking($sock, $flush);
339 #               $conn->{blocking} = $flush;
340 #       }
341     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
342
343     while (@$rq) {
344         my $msg            = $rq->[0];
345                 my $mlth           = length($msg);
346         my $bytes_to_write = $mlth - $offset;
347         my $bytes_written  = 0;
348                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
349         while ($bytes_to_write > 0) {
350             $bytes_written = syswrite ($sock, $msg,
351                                        $bytes_to_write, $offset);
352             if (!defined($bytes_written)) {
353                 if (_err_will_block($!)) {
354                     # Should happen only in deferred mode. Record how
355                     # much we have already sent.
356                     $conn->{send_offset} = $offset;
357                     # Event handler should already be set, so we will
358                     # be called back eventually, and will resume sending
359                     return 1;
360                 } else {    # Uh, oh
361                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
362                                         $conn->disconnect;
363                     return 0; # fail. Message remains in queue ..
364                 }
365             } elsif (isdbg('raw')) {
366                                 my $call = $conn->{call} || 'none';
367                                 dbgdump('raw', "$call send $bytes_written: ", $msg);
368                         }
369                         $total_out      += $bytes_written;
370             $offset         += $bytes_written;
371             $bytes_to_write -= $bytes_written;
372         }
373         delete $conn->{send_offset};
374         $offset = 0;
375         shift @$rq;
376         #last unless $flush; # Go back to select and wait
377                             # for it to fire again.
378     }
379     # Call me back if queue has not been drained.
380     unless (@$rq) {
381         set_event_handler ($sock, write => undef);
382                 if (exists $conn->{close_on_empty}) {
383                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
384                         $conn->disconnect; 
385                 }
386     }
387     1;  # Success
388 }
389
390 sub dup_sock
391 {
392         my $conn = shift;
393         my $oldsock = $conn->{sock};
394         my $rc = $rd_callbacks{$oldsock};
395         my $wc = $wt_callbacks{$oldsock};
396         my $ec = $er_callbacks{$oldsock};
397         my $sock = $oldsock->new_from_fd($oldsock, "w+");
398         if ($sock) {
399                 set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
400                 $conn->{sock} = $sock;
401                 set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
402                 $oldsock->close;
403         }
404 }
405
406 sub _err_will_block {
407         return 0 unless $blocking_supported;
408         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
409 }
410
411 sub close_on_empty
412 {
413         my $conn = shift;
414         $conn->{close_on_empty} = 1;
415 }
416
417 #-----------------------------------------------------------------
418 # Receive side routines
419
420 sub new_server {
421     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
422     my ($pkg, $my_host, $my_port, $login_proc) = @_;
423         my $self = $pkg->new($login_proc);
424         
425     $self->{sock} = IO::Socket::INET->new (
426                                           LocalAddr => "$my_host:$my_port",
427 #                                          LocalPort => $my_port,
428                                           Listen    => SOMAXCONN,
429                                           Proto     => 'tcp',
430                                           Reuse => 1);
431     die "Could not create socket: $! \n" unless $self->{sock};
432     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
433         return $self;
434 }
435
436
437 sub nolinger
438 {
439         my $conn = shift;
440
441         unless ($main::is_win) {
442                 if (isdbg('sock')) {
443                         my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
444                         my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
445                         my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
446                         dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
447                 }
448                 
449                 eval {setsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE, 1)} or dbg("setsockopt keepalive: $!");
450                 eval {setsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER, pack("ll", 0, 0))} or dbg("setsockopt linger: $!");
451                 eval {setsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY, 1)} or eval {setsockopt($conn->{sock}, SOL_SOCKET, TCP_NODELAY, 1)} or dbg("setsockopt tcp_nodelay: $!");
452                 $conn->{sock}->autoflush(0);
453
454                 if (isdbg('sock')) {
455                         my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
456                         my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
457                         my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
458                         dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
459                 }
460         } 
461 }
462
463 sub dequeue
464 {
465         my $conn = shift;
466
467         if ($conn->{msg} =~ /\n/) {
468                 my @lines = split /\r?\n/, $conn->{msg};
469                 if ($conn->{msg} =~ /\n$/) {
470                         delete $conn->{msg};
471                 } else {
472                         $conn->{msg} = pop @lines;
473                 }
474                 for (@lines) {
475                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
476                 }
477         }
478 }
479
480 sub _rcv {                     # Complement to _send
481     my $conn = shift; # $rcv_now complement of $flush
482     # Find out how much has already been received, if at all
483     my ($msg, $offset, $bytes_to_read, $bytes_read);
484     my $sock = $conn->{sock};
485     return unless defined($sock);
486
487         my @lines;
488         if ($conn->{blocking}) {
489                 blocking($sock, 0);
490                 $conn->{blocking} = 0;
491         }
492         $bytes_read = sysread ($sock, $msg, 1024, 0);
493         if (defined ($bytes_read)) {
494                 if ($bytes_read > 0) {
495                         $total_in += $bytes_read;
496                         if (isdbg('raw')) {
497                                 my $call = $conn->{call} || 'none';
498                                 dbgdump('raw', "$call read $bytes_read: ", $msg);
499                         }
500                         if ($conn->{echo}) {
501                                 my @ch = split //, $msg;
502                                 my $out;
503                                 for (@ch) {
504                                         if (/[\cH\x7f]/) {
505                                                 $out .= "\cH \cH";
506                                                 $conn->{msg} =~ s/.$//;
507                                         } else {
508                                                 $out .= $_;
509                                                 $conn->{msg} .= $_;
510                                         }
511                                 }
512                                 if (defined $out) {
513                                         set_event_handler ($sock, write => sub{$conn->_send(0)});
514                                         push @{$conn->{outqueue}}, $out;
515                                 }
516                         } else {
517                                 $conn->{msg} .= $msg;
518                         }
519                 } 
520         } else {
521                 if (_err_will_block($!)) {
522                         return ; 
523                 } else {
524                         $bytes_read = 0;
525                 }
526     }
527
528 FINISH:
529     if (defined $bytes_read && $bytes_read == 0) {
530                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
531                 $conn->disconnect;
532     } else {
533                 unless ($conn->{disable_read}) {
534                         $conn->dequeue if exists $conn->{msg};
535                 }
536         }
537 }
538
539 sub new_client {
540         my $server_conn = shift;
541     my $sock = $server_conn->{sock}->accept();
542         if ($sock) {
543                 my $conn = $server_conn->new($server_conn->{rproc});
544                 $conn->{sock} = $sock;
545                 blocking($sock, 0);
546                 $conn->nolinger;
547                 $conn->{blocking} = 0;
548                 my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
549                 $conn->{sort} = 'Incoming';
550                 if ($eproc) {
551                         $conn->{eproc} = $eproc;
552                         set_event_handler ($sock, error => $eproc);
553                 }
554                 if ($rproc) {
555                         $conn->{rproc} = $rproc;
556                         my $callback = sub {$conn->_rcv};
557                         set_event_handler ($sock, read => $callback);
558                 } else {  # Login failed
559                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
560                         $conn->disconnect();
561                 }
562         } else {
563                 dbg("Msg: error on accept ($!)") if isdbg('err');
564         }
565 }
566
567 sub close_server
568 {
569         my $conn = shift;
570         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
571         $conn->{sock}->close;
572 }
573
574 # close all clients (this is for forking really)
575 sub close_all_clients
576 {
577         foreach my $conn (values %conns) {
578                 $conn->disconnect;
579         }
580 }
581
582 sub disable_read
583 {
584         my $conn = shift;
585         set_event_handler ($conn->{sock}, read => undef);
586         return $_[0] ? $conn->{disable_read} = $_[0] : $_[0];
587 }
588
589 #
590 #----------------------------------------------------
591 # Event loop routines used by both client and server
592
593 sub set_event_handler {
594     shift unless ref($_[0]); # shift if first arg is package name
595     my ($handle, %args) = @_;
596     my $callback;
597     if (exists $args{'write'}) {
598         $callback = $args{'write'};
599         if ($callback) {
600             $wt_callbacks{$handle} = $callback;
601             $wt_handles->add($handle);
602         } else {
603             delete $wt_callbacks{$handle};
604             $wt_handles->remove($handle);
605         }
606     }
607     if (exists $args{'read'}) {
608         $callback = $args{'read'};
609         if ($callback) {
610             $rd_callbacks{$handle} = $callback;
611             $rd_handles->add($handle);
612         } else {
613             delete $rd_callbacks{$handle};
614             $rd_handles->remove($handle);
615        }
616     }
617     if (exists $args{'error'}) {
618         $callback = $args{'error'};
619         if ($callback) {
620             $er_callbacks{$handle} = $callback;
621             $er_handles->add($handle);
622         } else {
623             delete $er_callbacks{$handle};
624             $er_handles->remove($handle);
625        }
626     }
627 }
628
629 sub event_loop {
630     my ($pkg, $loop_count, $timeout, $wronly) = @_; # event_loop(1) to process events once
631     my ($conn, $r, $w, $e, $rset, $wset, $eset);
632     while (1) {
633  
634        # Quit the loop if no handles left to process
635                 if ($wronly) {
636                         last unless $wt_handles->count();
637         
638                         ($rset, $wset, $eset) = IO::Select->select(undef, $wt_handles, undef, $timeout);
639                         
640                         foreach $w (@$wset) {
641                                 &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
642                         }
643                 } else {
644                         
645                         last unless ($rd_handles->count() || $wt_handles->count());
646         
647                         ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
648                         
649                         foreach $e (@$eset) {
650                                 &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
651                         }
652                         foreach $r (@$rset) {
653                                 &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
654                         }
655                         foreach $w (@$wset) {
656                                 &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
657                         }
658                 }
659
660                 Timer::handler;
661                 
662         if (defined($loop_count)) {
663             last unless --$loop_count;
664         }
665     }
666 }
667
668 sub sleep
669 {
670         my ($pkg, $interval) = @_;
671         my $now = time;
672         while (time - $now < $interval) {
673                 $pkg->event_loop(10, 0.01);
674         }
675 }
676
677 sub DESTROY
678 {
679         my $conn = shift;
680         my $call = $conn->{call} || 'unallocated';
681         my $host = $conn->{peerhost} || '';
682         my $port = $conn->{peerport} || '';
683         dbg("Connection $conn->{cnum} $call [$host $port] being destroyed") if isdbg('connll');
684         $noconns--;
685 }
686
687 1;
688
689 __END__
690