remove all set sockopts for M$
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14
15 use vars qw($VERSION $BRANCH);
16 $VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ );
17 $BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ ) || 0;
18 $main::build += $VERSION;
19 $main::branch += $BRANCH;
20
21 use IO::Select;
22 use IO::Socket;
23 use DXDebug;
24 use Timer;
25
26 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum);
27
28 %rd_callbacks = ();
29 %wt_callbacks = ();
30 %er_callbacks = ();
31 $rd_handles   = IO::Select->new();
32 $wt_handles   = IO::Select->new();
33 $er_handles   = IO::Select->new();
34
35 $now = time;
36
37 BEGIN {
38     # Checks if blocking is supported
39     eval {
40         require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL))
41     };
42         if ($@ || $main::is_win) {
43 #               print STDERR "POSIX Blocking *** NOT *** supported $@\n";
44                 $blocking_supported = 0;
45         } else {
46                 $blocking_supported = 1;
47 #               print STDERR "POSIX Blocking enabled\n";
48         }
49
50
51         # import as many of these errno values as are available
52         eval {
53                 require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
54         };
55
56         unless ($^O eq 'MSWin32') {
57                 if ($] >= 5.6) {
58                         eval {
59                                 require Socket; Socket->import(qw(IPPROTO_TCP TCP_NODELAY));
60                         };
61                 } else {
62                         dbg("IPPROTO_TCP and TCP_NODELAY manually defined");
63                         eval 'sub IPPROTO_TCP {     6 };';
64                         eval 'sub TCP_NODELAY {     1 };';
65                 }
66         }
67         # http://support.microsoft.com/support/kb/articles/Q150/5/37.asp
68         # defines EINPROGRESS as 10035.  We provide it here because some
69         # Win32 users report POSIX::EINPROGRESS is not vendor-supported.
70         if ($^O eq 'MSWin32') { 
71                 eval '*EINPROGRESS = sub { 10036 };';
72                 eval '*EWOULDBLOCK = *EAGAIN = sub { 10035 };';
73                 eval '*F_GETFL     = sub {     0 };';
74                 eval '*F_SETFL     = sub {     0 };';
75                 eval '*IPPROTO_TCP     = sub {     6 };';
76                 eval '*TCP_NODELAY     = sub {     1 };';
77                 $blocking_supported = 0;   # it appears that this DOESN'T work :-(
78         } 
79 }
80
81 my $w = $^W;
82 $^W = 0;
83 my $eagain = eval {EAGAIN()};
84 my $einprogress = eval {EINPROGRESS()};
85 my $ewouldblock = eval {EWOULDBLOCK()};
86 $^W = $w;
87 $cnum = 0;
88
89
90 #
91 #-----------------------------------------------------------------
92 # Generalised initializer
93
94 sub new
95 {
96     my ($pkg, $rproc) = @_;
97         my $obj = ref($pkg);
98         my $class = $obj || $pkg;
99
100     my $conn = {
101         rproc => $rproc,
102                 inqueue => [],
103                 outqueue => [],
104                 state => 0,
105                 lineend => "\r\n",
106                 csort => 'telnet',
107                 timeval => 60,
108                 blocking => 0,
109                 cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)),
110     };
111
112         $noconns++;
113         
114         dbg("Connection created ($noconns)") if isdbg('connll');
115         return bless $conn, $class;
116 }
117
118 sub set_error
119 {
120         my $conn = shift;
121         my $callback = shift;
122         $conn->{eproc} = $callback;
123         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
124 }
125
126 sub set_rproc
127 {
128         my $conn = shift;
129         my $callback = shift;
130         $conn->{rproc} = $callback;
131 }
132
133 sub blocking
134 {
135         return unless $blocking_supported;
136
137         # Make the handle stop blocking, the Windows way.
138         if ($main::is_win) { 
139           # 126 is FIONBIO (some docs say 0x7F << 16)
140                 ioctl( $_[0],
141                            0x80000000 | (4 << 16) | (ord('f') << 8) | 126,
142                            "$_[1]"
143                          );
144         } else {
145                 my $flags = fcntl ($_[0], F_GETFL, 0);
146                 if ($_[1]) {
147                         $flags &= ~O_NONBLOCK;
148                 } else {
149                         $flags |= O_NONBLOCK;
150                 }
151                 fcntl ($_[0], F_SETFL, $flags);
152         }
153 }
154
155 # save it
156 sub conns
157 {
158         my $pkg = shift;
159         my $call = shift;
160         my $ref;
161         
162         if (ref $pkg) {
163                 $call = $pkg->{call} unless $call;
164                 return undef unless $call;
165                 dbg("changing $pkg->{call} to $call") if isdbg('connll') && exists $pkg->{call} && $call ne $pkg->{call};
166                 delete $conns{$pkg->{call}} if exists $pkg->{call} && exists $conns{$pkg->{call}} && $pkg->{call} ne $call; 
167                 $pkg->{call} = $call;
168                 $ref = $conns{$call} = $pkg;
169                 dbg("Connection $pkg->{cnum} $call stored") if isdbg('connll');
170         } else {
171                 $ref = $conns{$call};
172         }
173         return $ref;
174 }
175
176 # this is only called by any dependent processes going away unexpectedly
177 sub pid_gone
178 {
179         my ($pkg, $pid) = @_;
180         
181         my @pid = grep {$_->{pid} == $pid} values %conns;
182         foreach my $p (@pid) {
183                 &{$p->{eproc}}($p, "$pid has gorn") if exists $p->{eproc};
184                 $p->disconnect;
185         }
186 }
187
188 #-----------------------------------------------------------------
189 # Send side routines
190 sub connect {
191     my ($pkg, $to_host, $to_port, $rproc) = @_;
192
193     # Create a connection end-point object
194     my $conn = $pkg;
195         unless (ref $pkg) {
196                 $conn = $pkg->new($rproc);
197         }
198         $conn->{peerhost} = $to_host;
199         $conn->{peerport} = $to_port;
200         $conn->{sort} = 'Outgoing';
201         
202     # Create a new internet socket
203     my $sock = IO::Socket::INET->new();
204     return undef unless $sock;
205         
206         my $proto = getprotobyname('tcp');
207         $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
208         
209         blocking($sock, 0);
210         $conn->{blocking} = 0;
211
212         my $ip = gethostbyname($to_host);
213 #       my $r = $sock->connect($to_port, $ip);
214         my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
215         return undef unless $r || _err_will_block($!);
216         
217         $conn->{sock} = $sock;
218     
219     if ($conn->{rproc}) {
220         my $callback = sub {$conn->_rcv};
221         set_event_handler ($sock, read => $callback);
222     }
223     return $conn;
224 }
225
226 sub disconnect {
227     my $conn = shift;
228         return if exists $conn->{disconnecting};
229
230         $conn->{disconnecting} = 1;
231     my $sock = delete $conn->{sock};
232         $conn->{state} = 'E';
233         $conn->{timeout}->del if $conn->{timeout};
234
235         # be careful to delete the correct one
236         my $call;
237         if ($call = $conn->{call}) {
238                 my $ref = $conns{$call};
239                 delete $conns{$call} if $ref && $ref == $conn;
240         }
241         $call ||= 'unallocated';
242         dbg("Connection $conn->{cnum} $call disconnected") if isdbg('connll');
243         
244         # get rid of any references
245         for (keys %$conn) {
246                 if (ref($conn->{$_})) {
247                         delete $conn->{$_};
248                 }
249         }
250
251         if (defined($sock)) {
252                 set_event_handler ($sock, read => undef, write => undef, error => undef);
253                 shutdown($sock, 3);
254                 close($sock);
255         }
256         
257         unless ($main::is_win) {
258                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
259         }
260
261 }
262
263 sub send_now {
264     my ($conn, $msg) = @_;
265     $conn->enqueue($msg);
266     $conn->_send (1); # 1 ==> flush
267 }
268
269 sub send_later {
270     my ($conn, $msg) = @_;
271     $conn->enqueue($msg);
272     my $sock = $conn->{sock};
273     return unless defined($sock);
274     set_event_handler ($sock, write => sub {$conn->_send(0)});
275 }
276
277 sub enqueue {
278     my $conn = shift;
279     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
280 }
281
282 sub _send {
283     my ($conn, $flush) = @_;
284     my $sock = $conn->{sock};
285     return unless defined($sock);
286     my $rq = $conn->{outqueue};
287
288     # If $flush is set, set the socket to blocking, and send all
289     # messages in the queue - return only if there's an error
290     # If $flush is 0 (deferred mode) make the socket non-blocking, and
291     # return to the event loop only after every message, or if it
292     # is likely to block in the middle of a message.
293
294         if ($conn->{blocking} != $flush) {
295                 blocking($sock, $flush);
296                 $conn->{blocking} = $flush;
297         }
298     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
299
300     while (@$rq) {
301         my $msg            = $rq->[0];
302                 my $mlth           = length($msg);
303         my $bytes_to_write = $mlth - $offset;
304         my $bytes_written  = 0;
305                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
306         while ($bytes_to_write > 0) {
307             $bytes_written = syswrite ($sock, $msg,
308                                        $bytes_to_write, $offset);
309             if (!defined($bytes_written)) {
310                 if (_err_will_block($!)) {
311                     # Should happen only in deferred mode. Record how
312                     # much we have already sent.
313                     $conn->{send_offset} = $offset;
314                     # Event handler should already be set, so we will
315                     # be called back eventually, and will resume sending
316                     return 1;
317                 } else {    # Uh, oh
318                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
319                                         $conn->disconnect;
320                     return 0; # fail. Message remains in queue ..
321                 }
322             } elsif (isdbg('raw')) {
323                                 my $call = $conn->{call} || 'none';
324                                 dbgdump('raw', "$call send $bytes_written: ", $msg);
325                         }
326             $offset         += $bytes_written;
327             $bytes_to_write -= $bytes_written;
328         }
329         delete $conn->{send_offset};
330         $offset = 0;
331         shift @$rq;
332         #last unless $flush; # Go back to select and wait
333                             # for it to fire again.
334     }
335     # Call me back if queue has not been drained.
336     unless (@$rq) {
337         set_event_handler ($sock, write => undef);
338                 if (exists $conn->{close_on_empty}) {
339                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
340                         $conn->disconnect; 
341                 }
342     }
343     1;  # Success
344 }
345
346 sub dup_sock
347 {
348         my $conn = shift;
349         my $oldsock = $conn->{sock};
350         my $rc = $rd_callbacks{$oldsock};
351         my $wc = $wt_callbacks{$oldsock};
352         my $ec = $er_callbacks{$oldsock};
353         my $sock = $oldsock->new_from_fd($oldsock, "w+");
354         if ($sock) {
355                 set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
356                 $conn->{sock} = $sock;
357                 set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
358                 $oldsock->close;
359         }
360 }
361
362 sub _err_will_block {
363         return 0 unless $blocking_supported;
364         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
365 }
366
367 sub close_on_empty
368 {
369         my $conn = shift;
370         $conn->{close_on_empty} = 1;
371 }
372
373 #-----------------------------------------------------------------
374 # Receive side routines
375
376 sub new_server {
377     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
378     my ($pkg, $my_host, $my_port, $login_proc) = @_;
379         my $self = $pkg->new($login_proc);
380         
381     $self->{sock} = IO::Socket::INET->new (
382                                           LocalAddr => "$my_host:$my_port",
383 #                                          LocalPort => $my_port,
384                                           Listen    => SOMAXCONN,
385                                           Proto     => 'tcp',
386                                           Reuse => 1);
387     die "Could not create socket: $! \n" unless $self->{sock};
388     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
389         return $self;
390 }
391
392
393 sub nolinger
394 {
395         my $conn = shift;
396
397         unless ($main::is_win) {
398                 if (isdbg('sock')) {
399                         my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
400                         my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
401                         my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
402                         dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
403                 }
404                 
405                 setsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE, 1) or confess "setsockopt keepalive: $!";
406                 setsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER, pack("ll", 0, 0)) or confess "setsockopt linger: $!";
407                 setsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY, 1) or confess "setsockopt: $!";
408
409                 if (isdbg('sock')) {
410                         my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
411                         my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
412                         my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
413                         dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
414                 }
415         } 
416         $conn->{sock}->autoflush(0);
417 }
418
419 sub dequeue
420 {
421         my $conn = shift;
422
423         if ($conn->{msg} =~ /\n/) {
424                 my @lines = split /\r?\n/, $conn->{msg};
425                 if ($conn->{msg} =~ /\n$/) {
426                         delete $conn->{msg};
427                 } else {
428                         $conn->{msg} = pop @lines;
429                 }
430                 for (@lines) {
431                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
432                 }
433         }
434 }
435
436 sub _rcv {                     # Complement to _send
437     my $conn = shift; # $rcv_now complement of $flush
438     # Find out how much has already been received, if at all
439     my ($msg, $offset, $bytes_to_read, $bytes_read);
440     my $sock = $conn->{sock};
441     return unless defined($sock);
442
443         my @lines;
444         if ($conn->{blocking}) {
445                 blocking($sock, 0);
446                 $conn->{blocking} = 0;
447         }
448         $bytes_read = sysread ($sock, $msg, 1024, 0);
449         if (defined ($bytes_read)) {
450                 if ($bytes_read > 0) {
451                         if (isdbg('raw')) {
452                                 my $call = $conn->{call} || 'none';
453                                 dbgdump('raw', "$call read $bytes_read: ", $msg);
454                         }
455                         if ($conn->{echo}) {
456                                 my @ch = split //, $msg;
457                                 my $out;
458                                 for (@ch) {
459                                         if (/[\cH\x7f]/) {
460                                                 $out .= "\cH \cH";
461                                                 $conn->{msg} =~ s/.$//;
462                                         } else {
463                                                 $out .= $_;
464                                                 $conn->{msg} .= $_;
465                                         }
466                                 }
467                                 if (defined $out) {
468                                         set_event_handler ($sock, write => sub{$conn->_send(0)});
469                                         push @{$conn->{outqueue}}, $out;
470                                 }
471                         } else {
472                                 $conn->{msg} .= $msg;
473                         }
474                 } 
475         } else {
476                 if (_err_will_block($!)) {
477                         return ; 
478                 } else {
479                         $bytes_read = 0;
480                 }
481     }
482
483 FINISH:
484     if (defined $bytes_read && $bytes_read == 0) {
485                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
486                 $conn->disconnect;
487     } else {
488                 unless ($conn->{disable_read}) {
489                         $conn->dequeue if exists $conn->{msg};
490                 }
491         }
492 }
493
494 sub new_client {
495         my $server_conn = shift;
496     my $sock = $server_conn->{sock}->accept();
497         if ($sock) {
498                 my $conn = $server_conn->new($server_conn->{rproc});
499                 $conn->{sock} = $sock;
500                 blocking($sock, 0);
501                 $conn->nolinger;
502                 $conn->{blocking} = 0;
503                 my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
504                 $conn->{sort} = 'Incoming';
505                 if ($eproc) {
506                         $conn->{eproc} = $eproc;
507                         set_event_handler ($sock, error => $eproc);
508                 }
509                 if ($rproc) {
510                         $conn->{rproc} = $rproc;
511                         my $callback = sub {$conn->_rcv};
512                         set_event_handler ($sock, read => $callback);
513                 } else {  # Login failed
514                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
515                         $conn->disconnect();
516                 }
517         } else {
518                 dbg("Msg: error on accept ($!)") if isdbg('err');
519         }
520 }
521
522 sub close_server
523 {
524         my $conn = shift;
525         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
526         $conn->{sock}->close;
527 }
528
529 # close all clients (this is for forking really)
530 sub close_all_clients
531 {
532         foreach my $conn (values %conns) {
533                 $conn->disconnect;
534         }
535 }
536
537 sub disable_read
538 {
539         my $conn = shift;
540         set_event_handler ($conn->{sock}, read => undef);
541         return $_[0] ? $conn->{disable_read} = $_[0] : $_[0];
542 }
543
544 #
545 #----------------------------------------------------
546 # Event loop routines used by both client and server
547
548 sub set_event_handler {
549     shift unless ref($_[0]); # shift if first arg is package name
550     my ($handle, %args) = @_;
551     my $callback;
552     if (exists $args{'write'}) {
553         $callback = $args{'write'};
554         if ($callback) {
555             $wt_callbacks{$handle} = $callback;
556             $wt_handles->add($handle);
557         } else {
558             delete $wt_callbacks{$handle};
559             $wt_handles->remove($handle);
560         }
561     }
562     if (exists $args{'read'}) {
563         $callback = $args{'read'};
564         if ($callback) {
565             $rd_callbacks{$handle} = $callback;
566             $rd_handles->add($handle);
567         } else {
568             delete $rd_callbacks{$handle};
569             $rd_handles->remove($handle);
570        }
571     }
572     if (exists $args{'error'}) {
573         $callback = $args{'error'};
574         if ($callback) {
575             $er_callbacks{$handle} = $callback;
576             $er_handles->add($handle);
577         } else {
578             delete $er_callbacks{$handle};
579             $er_handles->remove($handle);
580        }
581     }
582 }
583
584 sub event_loop {
585     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
586     my ($conn, $r, $w, $e, $rset, $wset, $eset);
587     while (1) {
588  
589        # Quit the loop if no handles left to process
590         last unless ($rd_handles->count() || $wt_handles->count());
591         
592                 ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
593                 
594         foreach $e (@$eset) {
595             &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
596         }
597         foreach $r (@$rset) {
598             &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
599         }
600         foreach $w (@$wset) {
601             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
602         }
603
604                 Timer::handler;
605                 
606         if (defined($loop_count)) {
607             last unless --$loop_count;
608         }
609     }
610 }
611
612 sub sleep
613 {
614         my ($pkg, $interval) = @_;
615         my $now = time;
616         while (time - $now < $interval) {
617                 $pkg->event_loop(10, 0.01);
618         }
619 }
620
621 sub DESTROY
622 {
623         my $conn = shift;
624         my $call = $conn->{call} || 'unallocated';
625         my $host = $conn->{peerhost} || '';
626         my $port = $conn->{peerport} || '';
627         dbg("Connection $conn->{cnum} $call [$host $port] being destroyed") if isdbg('connll');
628         $noconns--;
629 }
630
631 1;
632
633 __END__
634