tidy it up a bit more
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14
15 use vars qw($VERSION $BRANCH);
16 $VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ );
17 $BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ ) || 0;
18 $main::build += $VERSION;
19 $main::branch += $BRANCH;
20
21 use IO::Select;
22 use IO::Socket;
23 use DXDebug;
24 use Timer;
25
26 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum);
27
28 %rd_callbacks = ();
29 %wt_callbacks = ();
30 %er_callbacks = ();
31 $rd_handles   = IO::Select->new();
32 $wt_handles   = IO::Select->new();
33 $er_handles   = IO::Select->new();
34
35 $now = time;
36
37 BEGIN {
38     # Checks if blocking is supported
39     eval {
40         require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL))
41     };
42         if ($@ || $main::is_win) {
43 #               print STDERR "POSIX Blocking *** NOT *** supported $@\n";
44                 $blocking_supported = 0;
45         } else {
46                 $blocking_supported = 1;
47 #               print STDERR "POSIX Blocking enabled\n";
48         }
49
50
51         # import as many of these errno values as are available
52         eval {
53                 require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
54         };
55
56         eval {
57                 require Socket; Socket->import(qw(IPPROTO_TCP TCP_NODELAY));
58         };
59         if ($@ && !$^O =~ /^MS/) {
60                 dbg("IPPROTO_TCP and TCP_NODELAY manually defined");
61                 eval '*IPPROTO_TCP     = sub {     6 };';
62                 eval '*TCP_NODELAY     = sub {     1 };';
63         }
64         # http://support.microsoft.com/support/kb/articles/Q150/5/37.asp
65         # defines EINPROGRESS as 10035.  We provide it here because some
66         # Win32 users report POSIX::EINPROGRESS is not vendor-supported.
67         if ($^O eq 'MSWin32') { 
68                 eval '*EINPROGRESS = sub { 10036 };';
69                 eval '*EWOULDBLOCK = *EAGAIN = sub { 10035 };';
70                 eval '*F_GETFL     = sub {     0 };';
71                 eval '*F_SETFL     = sub {     0 };';
72                 $blocking_supported = 1;
73         } 
74 }
75
76 my $w = $^W;
77 $^W = 0;
78 my $eagain = eval {EAGAIN()};
79 my $einprogress = eval {EINPROGRESS()};
80 my $ewouldblock = eval {EWOULDBLOCK()};
81 $^W = $w;
82 $cnum = 0;
83
84
85 #
86 #-----------------------------------------------------------------
87 # Generalised initializer
88
89 sub new
90 {
91     my ($pkg, $rproc) = @_;
92         my $obj = ref($pkg);
93         my $class = $obj || $pkg;
94
95     my $conn = {
96         rproc => $rproc,
97                 inqueue => [],
98                 outqueue => [],
99                 state => 0,
100                 lineend => "\r\n",
101                 csort => 'telnet',
102                 timeval => 60,
103                 blocking => 0,
104                 cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)),
105     };
106
107         $noconns++;
108         
109         dbg("Connection created ($noconns)") if isdbg('connll');
110         return bless $conn, $class;
111 }
112
113 sub set_error
114 {
115         my $conn = shift;
116         my $callback = shift;
117         $conn->{eproc} = $callback;
118         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
119 }
120
121 sub set_rproc
122 {
123         my $conn = shift;
124         my $callback = shift;
125         $conn->{rproc} = $callback;
126 }
127
128 sub blocking
129 {
130         return unless $blocking_supported;
131
132         # Make the handle stop blocking, the Windows way.
133         if ($main::is_win) { 
134           # 126 is FIONBIO (some docs say 0x7F << 16)
135                 ioctl( $_[0],
136                            0x80000000 | (4 << 16) | (ord('f') << 8) | 126,
137                            "$_[1]"
138                          );
139         } else {
140                 my $flags = fcntl ($_[0], F_GETFL, 0);
141                 if ($_[1]) {
142                         $flags &= ~O_NONBLOCK;
143                 } else {
144                         $flags |= O_NONBLOCK;
145                 }
146                 fcntl ($_[0], F_SETFL, $flags);
147         }
148 }
149
150 # save it
151 sub conns
152 {
153         my $pkg = shift;
154         my $call = shift;
155         my $ref;
156         
157         if (ref $pkg) {
158                 $call = $pkg->{call} unless $call;
159                 return undef unless $call;
160                 dbg("changing $pkg->{call} to $call") if isdbg('connll') && exists $pkg->{call} && $call ne $pkg->{call};
161                 delete $conns{$pkg->{call}} if exists $pkg->{call} && exists $conns{$pkg->{call}} && $pkg->{call} ne $call; 
162                 $pkg->{call} = $call;
163                 $ref = $conns{$call} = $pkg;
164                 dbg("Connection $pkg->{cnum} $call stored") if isdbg('connll');
165         } else {
166                 $ref = $conns{$call};
167         }
168         return $ref;
169 }
170
171 # this is only called by any dependent processes going away unexpectedly
172 sub pid_gone
173 {
174         my ($pkg, $pid) = @_;
175         
176         my @pid = grep {$_->{pid} == $pid} values %conns;
177         foreach my $p (@pid) {
178                 &{$p->{eproc}}($p, "$pid has gorn") if exists $p->{eproc};
179                 $p->disconnect;
180         }
181 }
182
183 #-----------------------------------------------------------------
184 # Send side routines
185 sub connect {
186     my ($pkg, $to_host, $to_port, $rproc) = @_;
187
188     # Create a connection end-point object
189     my $conn = $pkg;
190         unless (ref $pkg) {
191                 $conn = $pkg->new($rproc);
192         }
193         $conn->{peerhost} = $to_host;
194         $conn->{peerport} = $to_port;
195         $conn->{sort} = 'Outgoing';
196         
197     # Create a new internet socket
198     my $sock = IO::Socket::INET->new();
199     return undef unless $sock;
200         
201         my $proto = getprotobyname('tcp');
202         $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
203         
204         blocking($sock, 0);
205         $conn->{blocking} = 0;
206
207         my $ip = gethostbyname($to_host);
208 #       my $r = $sock->connect($to_port, $ip);
209         my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
210         return undef unless $r || _err_will_block($!);
211         
212         $conn->{sock} = $sock;
213     
214     if ($conn->{rproc}) {
215         my $callback = sub {$conn->_rcv};
216         set_event_handler ($sock, read => $callback);
217     }
218     return $conn;
219 }
220
221 sub disconnect {
222     my $conn = shift;
223         return if exists $conn->{disconnecting};
224
225         $conn->{disconnecting} = 1;
226     my $sock = delete $conn->{sock};
227         $conn->{state} = 'E';
228         $conn->{timeout}->del if $conn->{timeout};
229
230         # be careful to delete the correct one
231         my $call;
232         if ($call = $conn->{call}) {
233                 my $ref = $conns{$call};
234                 delete $conns{$call} if $ref && $ref == $conn;
235         }
236         $call ||= 'unallocated';
237         dbg("Connection $conn->{cnum} $call disconnected") if isdbg('connll');
238         
239         # get rid of any references
240         for (keys %$conn) {
241                 if (ref($conn->{$_})) {
242                         delete $conn->{$_};
243                 }
244         }
245
246         if (defined($sock)) {
247                 set_event_handler ($sock, read => undef, write => undef, error => undef);
248                 shutdown($sock, 3);
249                 close($sock);
250         }
251         
252         unless ($main::is_win) {
253                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
254         }
255
256 }
257
258 sub send_now {
259     my ($conn, $msg) = @_;
260     $conn->enqueue($msg);
261     $conn->_send (1); # 1 ==> flush
262 }
263
264 sub send_later {
265     my ($conn, $msg) = @_;
266     $conn->enqueue($msg);
267     my $sock = $conn->{sock};
268     return unless defined($sock);
269     set_event_handler ($sock, write => sub {$conn->_send(0)});
270 }
271
272 sub enqueue {
273     my $conn = shift;
274     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
275 }
276
277 sub _send {
278     my ($conn, $flush) = @_;
279     my $sock = $conn->{sock};
280     return unless defined($sock);
281     my $rq = $conn->{outqueue};
282
283     # If $flush is set, set the socket to blocking, and send all
284     # messages in the queue - return only if there's an error
285     # If $flush is 0 (deferred mode) make the socket non-blocking, and
286     # return to the event loop only after every message, or if it
287     # is likely to block in the middle of a message.
288
289         if ($conn->{blocking} != $flush) {
290                 blocking($sock, $flush);
291                 $conn->{blocking} = $flush;
292         }
293     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
294
295     while (@$rq) {
296         my $msg            = $rq->[0];
297                 my $mlth           = length($msg);
298         my $bytes_to_write = $mlth - $offset;
299         my $bytes_written  = 0;
300                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
301         while ($bytes_to_write > 0) {
302             $bytes_written = syswrite ($sock, $msg,
303                                        $bytes_to_write, $offset);
304             if (!defined($bytes_written)) {
305                 if (_err_will_block($!)) {
306                     # Should happen only in deferred mode. Record how
307                     # much we have already sent.
308                     $conn->{send_offset} = $offset;
309                     # Event handler should already be set, so we will
310                     # be called back eventually, and will resume sending
311                     return 1;
312                 } else {    # Uh, oh
313                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
314                                         $conn->disconnect;
315                     return 0; # fail. Message remains in queue ..
316                 }
317             } elsif (isdbg('raw')) {
318                                 my $call = $conn->{call} || 'none';
319                                 dbgdump('raw', "$call send $bytes_written: ", $msg);
320                         }
321             $offset         += $bytes_written;
322             $bytes_to_write -= $bytes_written;
323         }
324         delete $conn->{send_offset};
325         $offset = 0;
326         shift @$rq;
327         #last unless $flush; # Go back to select and wait
328                             # for it to fire again.
329     }
330     # Call me back if queue has not been drained.
331     unless (@$rq) {
332         set_event_handler ($sock, write => undef);
333                 if (exists $conn->{close_on_empty}) {
334                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
335                         $conn->disconnect; 
336                 }
337     }
338     1;  # Success
339 }
340
341 sub dup_sock
342 {
343         my $conn = shift;
344         my $oldsock = $conn->{sock};
345         my $rc = $rd_callbacks{$oldsock};
346         my $wc = $wt_callbacks{$oldsock};
347         my $ec = $er_callbacks{$oldsock};
348         my $sock = $oldsock->new_from_fd($oldsock, "w+");
349         if ($sock) {
350                 set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
351                 $conn->{sock} = $sock;
352                 set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
353                 $oldsock->close;
354         }
355 }
356
357 sub _err_will_block {
358         return 0 unless $blocking_supported;
359         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
360 }
361
362 sub close_on_empty
363 {
364         my $conn = shift;
365         $conn->{close_on_empty} = 1;
366 }
367
368 #-----------------------------------------------------------------
369 # Receive side routines
370
371 sub new_server {
372     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
373     my ($pkg, $my_host, $my_port, $login_proc) = @_;
374         my $self = $pkg->new($login_proc);
375         
376     $self->{sock} = IO::Socket::INET->new (
377                                           LocalAddr => "$my_host:$my_port",
378 #                                          LocalPort => $my_port,
379                                           Listen    => SOMAXCONN,
380                                           Proto     => 'tcp',
381                                           Reuse => 1);
382     die "Could not create socket: $! \n" unless $self->{sock};
383     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
384         return $self;
385 }
386
387
388 sub nolinger
389 {
390         my $conn = shift;
391
392         if (isdbg('sock')) {
393                 my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
394                 my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
395                 my $n = unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
396                 dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
397         }
398
399         setsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER, pack("ll", 0, 0)) or confess "setsockopt linger: $!";
400         setsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE, 1) or confess "setsockopt keepalive: $!";
401         unless ($main::is_win) {
402                 setsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY, 1) or confess "setsockopt: $!";
403         } 
404         $conn->{sock}->autoflush(0);
405         
406         if (isdbg('sock')) {
407                 my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
408                 my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
409                 my $n = unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
410                 dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
411         }
412 }
413
414 sub dequeue
415 {
416         my $conn = shift;
417
418         if ($conn->{msg} =~ /\n/) {
419                 my @lines = split /\r?\n/, $conn->{msg};
420                 if ($conn->{msg} =~ /\n$/) {
421                         delete $conn->{msg};
422                 } else {
423                         $conn->{msg} = pop @lines;
424                 }
425                 for (@lines) {
426                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
427                 }
428         }
429 }
430
431 sub _rcv {                     # Complement to _send
432     my $conn = shift; # $rcv_now complement of $flush
433     # Find out how much has already been received, if at all
434     my ($msg, $offset, $bytes_to_read, $bytes_read);
435     my $sock = $conn->{sock};
436     return unless defined($sock);
437
438         my @lines;
439         if ($conn->{blocking}) {
440                 blocking($sock, 0);
441                 $conn->{blocking} = 0;
442         }
443         $bytes_read = sysread ($sock, $msg, 1024, 0);
444         if (defined ($bytes_read)) {
445                 if ($bytes_read > 0) {
446                         if (isdbg('raw')) {
447                                 my $call = $conn->{call} || 'none';
448                                 dbgdump('raw', "$call read $bytes_read: ", $msg);
449                         }
450                         if ($conn->{echo}) {
451                                 my @ch = split //, $msg;
452                                 my $out;
453                                 for (@ch) {
454                                         if (/[\cH\x7f]/) {
455                                                 $out .= "\cH \cH";
456                                                 $conn->{msg} =~ s/.$//;
457                                         } else {
458                                                 $out .= $_;
459                                                 $conn->{msg} .= $_;
460                                         }
461                                 }
462                                 if (defined $out) {
463                                         set_event_handler ($sock, write => sub{$conn->_send(0)});
464                                         push @{$conn->{outqueue}}, $out;
465                                 }
466                         } else {
467                                 $conn->{msg} .= $msg;
468                         }
469                 } 
470         } else {
471                 if (_err_will_block($!)) {
472                         return ; 
473                 } else {
474                         $bytes_read = 0;
475                 }
476     }
477
478 FINISH:
479     if (defined $bytes_read && $bytes_read == 0) {
480                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
481                 $conn->disconnect;
482     } else {
483                 unless ($conn->{disable_read}) {
484                         $conn->dequeue if exists $conn->{msg};
485                 }
486         }
487 }
488
489 sub new_client {
490         my $server_conn = shift;
491     my $sock = $server_conn->{sock}->accept();
492         if ($sock) {
493                 my $conn = $server_conn->new($server_conn->{rproc});
494                 $conn->{sock} = $sock;
495                 blocking($sock, 0);
496                 $conn->nolinger;
497                 $conn->{blocking} = 0;
498                 my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
499                 $conn->{sort} = 'Incoming';
500                 if ($eproc) {
501                         $conn->{eproc} = $eproc;
502                         set_event_handler ($sock, error => $eproc);
503                 }
504                 if ($rproc) {
505                         $conn->{rproc} = $rproc;
506                         my $callback = sub {$conn->_rcv};
507                         set_event_handler ($sock, read => $callback);
508                 } else {  # Login failed
509                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
510                         $conn->disconnect();
511                 }
512         } else {
513                 dbg("Msg: error on accept ($!)") if isdbg('err');
514         }
515 }
516
517 sub close_server
518 {
519         my $conn = shift;
520         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
521         $conn->{sock}->close;
522 }
523
524 # close all clients (this is for forking really)
525 sub close_all_clients
526 {
527         foreach my $conn (values %conns) {
528                 $conn->disconnect;
529         }
530 }
531
532 sub disable_read
533 {
534         my $conn = shift;
535         set_event_handler ($conn->{sock}, read => undef);
536         return $_[0] ? $conn->{disable_read} = $_[0] : $_[0];
537 }
538
539 #
540 #----------------------------------------------------
541 # Event loop routines used by both client and server
542
543 sub set_event_handler {
544     shift unless ref($_[0]); # shift if first arg is package name
545     my ($handle, %args) = @_;
546     my $callback;
547     if (exists $args{'write'}) {
548         $callback = $args{'write'};
549         if ($callback) {
550             $wt_callbacks{$handle} = $callback;
551             $wt_handles->add($handle);
552         } else {
553             delete $wt_callbacks{$handle};
554             $wt_handles->remove($handle);
555         }
556     }
557     if (exists $args{'read'}) {
558         $callback = $args{'read'};
559         if ($callback) {
560             $rd_callbacks{$handle} = $callback;
561             $rd_handles->add($handle);
562         } else {
563             delete $rd_callbacks{$handle};
564             $rd_handles->remove($handle);
565        }
566     }
567     if (exists $args{'error'}) {
568         $callback = $args{'error'};
569         if ($callback) {
570             $er_callbacks{$handle} = $callback;
571             $er_handles->add($handle);
572         } else {
573             delete $er_callbacks{$handle};
574             $er_handles->remove($handle);
575        }
576     }
577 }
578
579 sub event_loop {
580     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
581     my ($conn, $r, $w, $e, $rset, $wset, $eset);
582     while (1) {
583  
584        # Quit the loop if no handles left to process
585         last unless ($rd_handles->count() || $wt_handles->count());
586         
587                 ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
588                 
589         foreach $e (@$eset) {
590             &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
591         }
592         foreach $r (@$rset) {
593             &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
594         }
595         foreach $w (@$wset) {
596             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
597         }
598
599                 Timer::handler;
600                 
601         if (defined($loop_count)) {
602             last unless --$loop_count;
603         }
604     }
605 }
606
607 sub sleep
608 {
609         my ($pkg, $interval) = @_;
610         my $now = time;
611         while (time - $now < $interval) {
612                 $pkg->event_loop(10, 0.01);
613         }
614 }
615
616 sub DESTROY
617 {
618         my $conn = shift;
619         my $call = $conn->{call} || 'unallocated';
620         my $host = $conn->{peerhost} || '';
621         my $port = $conn->{peerport} || '';
622         dbg("Connection $conn->{cnum} $call [$host $port] being destroyed") if isdbg('connll');
623         $noconns--;
624 }
625
626 1;
627
628 __END__
629