add missing defined for M$
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14
15 use vars qw($VERSION $BRANCH);
16 $VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ );
17 $BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ ) || 0;
18 $main::build += $VERSION;
19 $main::branch += $BRANCH;
20
21 use IO::Select;
22 use IO::Socket;
23 use DXDebug;
24 use Timer;
25
26 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum);
27
28 %rd_callbacks = ();
29 %wt_callbacks = ();
30 %er_callbacks = ();
31 $rd_handles   = IO::Select->new();
32 $wt_handles   = IO::Select->new();
33 $er_handles   = IO::Select->new();
34
35 $now = time;
36
37 BEGIN {
38     # Checks if blocking is supported
39     eval {
40         require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL))
41     };
42         if ($@ || $main::is_win) {
43 #               print STDERR "POSIX Blocking *** NOT *** supported $@\n";
44                 $blocking_supported = 0;
45         } else {
46                 $blocking_supported = 1;
47 #               print STDERR "POSIX Blocking enabled\n";
48         }
49
50
51         # import as many of these errno values as are available
52         eval {
53                 require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
54         };
55
56         eval {
57                 require Socket; Socket->import(qw(IPPROTO_TCP TCP_NODELAY));
58         };
59         unless (*IPPROTO_TCP  && !$^O =~ /^MS/) {
60                 dbg("IPPROTO_TCP and TCP_NODELAY manually defined");
61                 eval '*IPPROTO_TCP     = sub {     6 };';
62                 eval '*TCP_NODELAY     = sub {     1 };';
63         }
64         # http://support.microsoft.com/support/kb/articles/Q150/5/37.asp
65         # defines EINPROGRESS as 10035.  We provide it here because some
66         # Win32 users report POSIX::EINPROGRESS is not vendor-supported.
67         if ($^O eq 'MSWin32') { 
68                 eval '*EINPROGRESS = sub { 10036 };';
69                 eval '*EWOULDBLOCK = *EAGAIN = sub { 10035 };';
70                 eval '*F_GETFL     = sub {     0 };';
71                 eval '*F_SETFL     = sub {     0 };';
72                 eval '*IPPROTO_TCP     = sub {     6 };';
73                 eval '*TCP_NODELAY     = sub {     1 };';
74                 $blocking_supported = 1;
75         } 
76 }
77
78 my $w = $^W;
79 $^W = 0;
80 my $eagain = eval {EAGAIN()};
81 my $einprogress = eval {EINPROGRESS()};
82 my $ewouldblock = eval {EWOULDBLOCK()};
83 $^W = $w;
84 $cnum = 0;
85
86
87 #
88 #-----------------------------------------------------------------
89 # Generalised initializer
90
91 sub new
92 {
93     my ($pkg, $rproc) = @_;
94         my $obj = ref($pkg);
95         my $class = $obj || $pkg;
96
97     my $conn = {
98         rproc => $rproc,
99                 inqueue => [],
100                 outqueue => [],
101                 state => 0,
102                 lineend => "\r\n",
103                 csort => 'telnet',
104                 timeval => 60,
105                 blocking => 0,
106                 cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)),
107     };
108
109         $noconns++;
110         
111         dbg("Connection created ($noconns)") if isdbg('connll');
112         return bless $conn, $class;
113 }
114
115 sub set_error
116 {
117         my $conn = shift;
118         my $callback = shift;
119         $conn->{eproc} = $callback;
120         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
121 }
122
123 sub set_rproc
124 {
125         my $conn = shift;
126         my $callback = shift;
127         $conn->{rproc} = $callback;
128 }
129
130 sub blocking
131 {
132         return unless $blocking_supported;
133
134         # Make the handle stop blocking, the Windows way.
135         if ($main::is_win) { 
136           # 126 is FIONBIO (some docs say 0x7F << 16)
137                 ioctl( $_[0],
138                            0x80000000 | (4 << 16) | (ord('f') << 8) | 126,
139                            "$_[1]"
140                          );
141         } else {
142                 my $flags = fcntl ($_[0], F_GETFL, 0);
143                 if ($_[1]) {
144                         $flags &= ~O_NONBLOCK;
145                 } else {
146                         $flags |= O_NONBLOCK;
147                 }
148                 fcntl ($_[0], F_SETFL, $flags);
149         }
150 }
151
152 # save it
153 sub conns
154 {
155         my $pkg = shift;
156         my $call = shift;
157         my $ref;
158         
159         if (ref $pkg) {
160                 $call = $pkg->{call} unless $call;
161                 return undef unless $call;
162                 dbg("changing $pkg->{call} to $call") if isdbg('connll') && exists $pkg->{call} && $call ne $pkg->{call};
163                 delete $conns{$pkg->{call}} if exists $pkg->{call} && exists $conns{$pkg->{call}} && $pkg->{call} ne $call; 
164                 $pkg->{call} = $call;
165                 $ref = $conns{$call} = $pkg;
166                 dbg("Connection $pkg->{cnum} $call stored") if isdbg('connll');
167         } else {
168                 $ref = $conns{$call};
169         }
170         return $ref;
171 }
172
173 # this is only called by any dependent processes going away unexpectedly
174 sub pid_gone
175 {
176         my ($pkg, $pid) = @_;
177         
178         my @pid = grep {$_->{pid} == $pid} values %conns;
179         foreach my $p (@pid) {
180                 &{$p->{eproc}}($p, "$pid has gorn") if exists $p->{eproc};
181                 $p->disconnect;
182         }
183 }
184
185 #-----------------------------------------------------------------
186 # Send side routines
187 sub connect {
188     my ($pkg, $to_host, $to_port, $rproc) = @_;
189
190     # Create a connection end-point object
191     my $conn = $pkg;
192         unless (ref $pkg) {
193                 $conn = $pkg->new($rproc);
194         }
195         $conn->{peerhost} = $to_host;
196         $conn->{peerport} = $to_port;
197         $conn->{sort} = 'Outgoing';
198         
199     # Create a new internet socket
200     my $sock = IO::Socket::INET->new();
201     return undef unless $sock;
202         
203         my $proto = getprotobyname('tcp');
204         $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
205         
206         blocking($sock, 0);
207         $conn->{blocking} = 0;
208
209         my $ip = gethostbyname($to_host);
210 #       my $r = $sock->connect($to_port, $ip);
211         my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
212         return undef unless $r || _err_will_block($!);
213         
214         $conn->{sock} = $sock;
215     
216     if ($conn->{rproc}) {
217         my $callback = sub {$conn->_rcv};
218         set_event_handler ($sock, read => $callback);
219     }
220     return $conn;
221 }
222
223 sub disconnect {
224     my $conn = shift;
225         return if exists $conn->{disconnecting};
226
227         $conn->{disconnecting} = 1;
228     my $sock = delete $conn->{sock};
229         $conn->{state} = 'E';
230         $conn->{timeout}->del if $conn->{timeout};
231
232         # be careful to delete the correct one
233         my $call;
234         if ($call = $conn->{call}) {
235                 my $ref = $conns{$call};
236                 delete $conns{$call} if $ref && $ref == $conn;
237         }
238         $call ||= 'unallocated';
239         dbg("Connection $conn->{cnum} $call disconnected") if isdbg('connll');
240         
241         # get rid of any references
242         for (keys %$conn) {
243                 if (ref($conn->{$_})) {
244                         delete $conn->{$_};
245                 }
246         }
247
248         if (defined($sock)) {
249                 set_event_handler ($sock, read => undef, write => undef, error => undef);
250                 shutdown($sock, 3);
251                 close($sock);
252         }
253         
254         unless ($main::is_win) {
255                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
256         }
257
258 }
259
260 sub send_now {
261     my ($conn, $msg) = @_;
262     $conn->enqueue($msg);
263     $conn->_send (1); # 1 ==> flush
264 }
265
266 sub send_later {
267     my ($conn, $msg) = @_;
268     $conn->enqueue($msg);
269     my $sock = $conn->{sock};
270     return unless defined($sock);
271     set_event_handler ($sock, write => sub {$conn->_send(0)});
272 }
273
274 sub enqueue {
275     my $conn = shift;
276     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
277 }
278
279 sub _send {
280     my ($conn, $flush) = @_;
281     my $sock = $conn->{sock};
282     return unless defined($sock);
283     my $rq = $conn->{outqueue};
284
285     # If $flush is set, set the socket to blocking, and send all
286     # messages in the queue - return only if there's an error
287     # If $flush is 0 (deferred mode) make the socket non-blocking, and
288     # return to the event loop only after every message, or if it
289     # is likely to block in the middle of a message.
290
291         if ($conn->{blocking} != $flush) {
292                 blocking($sock, $flush);
293                 $conn->{blocking} = $flush;
294         }
295     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
296
297     while (@$rq) {
298         my $msg            = $rq->[0];
299                 my $mlth           = length($msg);
300         my $bytes_to_write = $mlth - $offset;
301         my $bytes_written  = 0;
302                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
303         while ($bytes_to_write > 0) {
304             $bytes_written = syswrite ($sock, $msg,
305                                        $bytes_to_write, $offset);
306             if (!defined($bytes_written)) {
307                 if (_err_will_block($!)) {
308                     # Should happen only in deferred mode. Record how
309                     # much we have already sent.
310                     $conn->{send_offset} = $offset;
311                     # Event handler should already be set, so we will
312                     # be called back eventually, and will resume sending
313                     return 1;
314                 } else {    # Uh, oh
315                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
316                                         $conn->disconnect;
317                     return 0; # fail. Message remains in queue ..
318                 }
319             } elsif (isdbg('raw')) {
320                                 my $call = $conn->{call} || 'none';
321                                 dbgdump('raw', "$call send $bytes_written: ", $msg);
322                         }
323             $offset         += $bytes_written;
324             $bytes_to_write -= $bytes_written;
325         }
326         delete $conn->{send_offset};
327         $offset = 0;
328         shift @$rq;
329         #last unless $flush; # Go back to select and wait
330                             # for it to fire again.
331     }
332     # Call me back if queue has not been drained.
333     unless (@$rq) {
334         set_event_handler ($sock, write => undef);
335                 if (exists $conn->{close_on_empty}) {
336                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
337                         $conn->disconnect; 
338                 }
339     }
340     1;  # Success
341 }
342
343 sub dup_sock
344 {
345         my $conn = shift;
346         my $oldsock = $conn->{sock};
347         my $rc = $rd_callbacks{$oldsock};
348         my $wc = $wt_callbacks{$oldsock};
349         my $ec = $er_callbacks{$oldsock};
350         my $sock = $oldsock->new_from_fd($oldsock, "w+");
351         if ($sock) {
352                 set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
353                 $conn->{sock} = $sock;
354                 set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
355                 $oldsock->close;
356         }
357 }
358
359 sub _err_will_block {
360         return 0 unless $blocking_supported;
361         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
362 }
363
364 sub close_on_empty
365 {
366         my $conn = shift;
367         $conn->{close_on_empty} = 1;
368 }
369
370 #-----------------------------------------------------------------
371 # Receive side routines
372
373 sub new_server {
374     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
375     my ($pkg, $my_host, $my_port, $login_proc) = @_;
376         my $self = $pkg->new($login_proc);
377         
378     $self->{sock} = IO::Socket::INET->new (
379                                           LocalAddr => "$my_host:$my_port",
380 #                                          LocalPort => $my_port,
381                                           Listen    => SOMAXCONN,
382                                           Proto     => 'tcp',
383                                           Reuse => 1);
384     die "Could not create socket: $! \n" unless $self->{sock};
385     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
386         return $self;
387 }
388
389
390 sub nolinger
391 {
392         my $conn = shift;
393
394         if (isdbg('sock')) {
395                 my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
396                 my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
397                 my $n = unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
398                 dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
399         }
400
401         setsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER, pack("ll", 0, 0)) or confess "setsockopt linger: $!";
402         setsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE, 1) or confess "setsockopt keepalive: $!";
403         unless ($main::is_win) {
404                 setsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY, 1) or confess "setsockopt: $!";
405         } 
406         $conn->{sock}->autoflush(0);
407         
408         if (isdbg('sock')) {
409                 my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
410                 my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
411                 my $n = unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
412                 dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
413         }
414 }
415
416 sub dequeue
417 {
418         my $conn = shift;
419
420         if ($conn->{msg} =~ /\n/) {
421                 my @lines = split /\r?\n/, $conn->{msg};
422                 if ($conn->{msg} =~ /\n$/) {
423                         delete $conn->{msg};
424                 } else {
425                         $conn->{msg} = pop @lines;
426                 }
427                 for (@lines) {
428                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
429                 }
430         }
431 }
432
433 sub _rcv {                     # Complement to _send
434     my $conn = shift; # $rcv_now complement of $flush
435     # Find out how much has already been received, if at all
436     my ($msg, $offset, $bytes_to_read, $bytes_read);
437     my $sock = $conn->{sock};
438     return unless defined($sock);
439
440         my @lines;
441         if ($conn->{blocking}) {
442                 blocking($sock, 0);
443                 $conn->{blocking} = 0;
444         }
445         $bytes_read = sysread ($sock, $msg, 1024, 0);
446         if (defined ($bytes_read)) {
447                 if ($bytes_read > 0) {
448                         if (isdbg('raw')) {
449                                 my $call = $conn->{call} || 'none';
450                                 dbgdump('raw', "$call read $bytes_read: ", $msg);
451                         }
452                         if ($conn->{echo}) {
453                                 my @ch = split //, $msg;
454                                 my $out;
455                                 for (@ch) {
456                                         if (/[\cH\x7f]/) {
457                                                 $out .= "\cH \cH";
458                                                 $conn->{msg} =~ s/.$//;
459                                         } else {
460                                                 $out .= $_;
461                                                 $conn->{msg} .= $_;
462                                         }
463                                 }
464                                 if (defined $out) {
465                                         set_event_handler ($sock, write => sub{$conn->_send(0)});
466                                         push @{$conn->{outqueue}}, $out;
467                                 }
468                         } else {
469                                 $conn->{msg} .= $msg;
470                         }
471                 } 
472         } else {
473                 if (_err_will_block($!)) {
474                         return ; 
475                 } else {
476                         $bytes_read = 0;
477                 }
478     }
479
480 FINISH:
481     if (defined $bytes_read && $bytes_read == 0) {
482                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
483                 $conn->disconnect;
484     } else {
485                 unless ($conn->{disable_read}) {
486                         $conn->dequeue if exists $conn->{msg};
487                 }
488         }
489 }
490
491 sub new_client {
492         my $server_conn = shift;
493     my $sock = $server_conn->{sock}->accept();
494         if ($sock) {
495                 my $conn = $server_conn->new($server_conn->{rproc});
496                 $conn->{sock} = $sock;
497                 blocking($sock, 0);
498                 $conn->nolinger;
499                 $conn->{blocking} = 0;
500                 my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
501                 $conn->{sort} = 'Incoming';
502                 if ($eproc) {
503                         $conn->{eproc} = $eproc;
504                         set_event_handler ($sock, error => $eproc);
505                 }
506                 if ($rproc) {
507                         $conn->{rproc} = $rproc;
508                         my $callback = sub {$conn->_rcv};
509                         set_event_handler ($sock, read => $callback);
510                 } else {  # Login failed
511                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
512                         $conn->disconnect();
513                 }
514         } else {
515                 dbg("Msg: error on accept ($!)") if isdbg('err');
516         }
517 }
518
519 sub close_server
520 {
521         my $conn = shift;
522         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
523         $conn->{sock}->close;
524 }
525
526 # close all clients (this is for forking really)
527 sub close_all_clients
528 {
529         foreach my $conn (values %conns) {
530                 $conn->disconnect;
531         }
532 }
533
534 sub disable_read
535 {
536         my $conn = shift;
537         set_event_handler ($conn->{sock}, read => undef);
538         return $_[0] ? $conn->{disable_read} = $_[0] : $_[0];
539 }
540
541 #
542 #----------------------------------------------------
543 # Event loop routines used by both client and server
544
545 sub set_event_handler {
546     shift unless ref($_[0]); # shift if first arg is package name
547     my ($handle, %args) = @_;
548     my $callback;
549     if (exists $args{'write'}) {
550         $callback = $args{'write'};
551         if ($callback) {
552             $wt_callbacks{$handle} = $callback;
553             $wt_handles->add($handle);
554         } else {
555             delete $wt_callbacks{$handle};
556             $wt_handles->remove($handle);
557         }
558     }
559     if (exists $args{'read'}) {
560         $callback = $args{'read'};
561         if ($callback) {
562             $rd_callbacks{$handle} = $callback;
563             $rd_handles->add($handle);
564         } else {
565             delete $rd_callbacks{$handle};
566             $rd_handles->remove($handle);
567        }
568     }
569     if (exists $args{'error'}) {
570         $callback = $args{'error'};
571         if ($callback) {
572             $er_callbacks{$handle} = $callback;
573             $er_handles->add($handle);
574         } else {
575             delete $er_callbacks{$handle};
576             $er_handles->remove($handle);
577        }
578     }
579 }
580
581 sub event_loop {
582     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
583     my ($conn, $r, $w, $e, $rset, $wset, $eset);
584     while (1) {
585  
586        # Quit the loop if no handles left to process
587         last unless ($rd_handles->count() || $wt_handles->count());
588         
589                 ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
590                 
591         foreach $e (@$eset) {
592             &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
593         }
594         foreach $r (@$rset) {
595             &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
596         }
597         foreach $w (@$wset) {
598             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
599         }
600
601                 Timer::handler;
602                 
603         if (defined($loop_count)) {
604             last unless --$loop_count;
605         }
606     }
607 }
608
609 sub sleep
610 {
611         my ($pkg, $interval) = @_;
612         my $now = time;
613         while (time - $now < $interval) {
614                 $pkg->event_loop(10, 0.01);
615         }
616 }
617
618 sub DESTROY
619 {
620         my $conn = shift;
621         my $call = $conn->{call} || 'unallocated';
622         my $host = $conn->{peerhost} || '';
623         my $port = $conn->{peerport} || '';
624         dbg("Connection $conn->{cnum} $call [$host $port] being destroyed") if isdbg('connll');
625         $noconns--;
626 }
627
628 1;
629
630 __END__
631