update html docs
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 use DXDebug;
17 use Timer;
18
19 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported);
20
21 %rd_callbacks = ();
22 %wt_callbacks = ();
23 %er_callbacks = ();
24 $rd_handles   = IO::Select->new();
25 $wt_handles   = IO::Select->new();
26 $er_handles   = IO::Select->new();
27
28 $now = time;
29
30 BEGIN {
31     # Checks if blocking is supported
32     eval {
33         require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL))
34     };
35         if ($@ || $main::is_win) {
36                 print STDERR "POSIX Blocking *** NOT *** supported $@\n";
37                 $blocking_supported = 0;
38         } else {
39                 $blocking_supported = 1;
40                 print STDERR "POSIX Blocking enabled\n";
41         }
42
43
44         # import as many of these errno values as are available
45         eval {
46                 require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
47         };
48 }
49
50 my $w = $^W;
51 $^W = 0;
52 my $eagain = eval {EAGAIN()};
53 my $einprogress = eval {EINPROGRESS()};
54 my $ewouldblock = eval {EWOULDBLOCK()};
55 $^W = $w;
56
57 #
58 #-----------------------------------------------------------------
59 # Generalised initializer
60
61 sub new
62 {
63     my ($pkg, $rproc) = @_;
64         my $obj = ref($pkg);
65         my $class = $obj || $pkg;
66
67     my $conn = {
68         rproc => $rproc,
69                 inqueue => [],
70                 outqueue => [],
71                 state => 0,
72                 lineend => "\r\n",
73                 csort => 'telnet',
74                 timeval => 60,
75                 blocking => 0,
76                 cnum => ++$noconns,
77     };
78
79         dbg('connll', "Connection created ($noconns)");
80         return bless $conn, $class;
81 }
82
83 sub set_error
84 {
85         my $conn = shift;
86         my $callback = shift;
87         $conn->{eproc} = $callback;
88         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
89 }
90
91 sub set_rproc
92 {
93         my $conn = shift;
94         my $callback = shift;
95         $conn->{rproc} = $callback;
96 }
97
98 sub blocking
99 {
100         return unless $blocking_supported;
101         
102         my $flags = fcntl ($_[0], F_GETFL, 0);
103         if ($_[1]) {
104                 $flags &= ~O_NONBLOCK;
105         } else {
106                 $flags |= O_NONBLOCK;
107         }
108         fcntl ($_[0], F_SETFL, $flags);
109 }
110
111 # save it
112 sub conns
113 {
114         my $pkg = shift;
115         my $call = shift;
116         my $ref;
117         
118         if (ref $pkg) {
119                 $call = $pkg->{call} unless $call;
120                 return undef unless $call;
121                 dbg('connll', "changing $pkg->{call} to $call") if exists $pkg->{call} && $call ne $pkg->{call};
122                 $pkg->{call} = $call;
123                 $ref = $conns{$call} = $pkg;
124                 dbg('connll', "Connection $pkg->{cnum} $call stored");
125         } else {
126                 $ref = $conns{$call};
127         }
128         return $ref;
129 }
130
131 # this is only called by any dependent processes going away unexpectedly
132 sub pid_gone
133 {
134         my ($pkg, $pid) = @_;
135         
136         my @pid = grep {$_->{pid} == $pid} values %conns;
137         foreach my $p (@pid) {
138                 &{$p->{eproc}}($p, "$pid has gorn") if exists $p->{eproc};
139                 $p->disconnect;
140         }
141 }
142
143 #-----------------------------------------------------------------
144 # Send side routines
145 sub connect {
146     my ($pkg, $to_host, $to_port, $rproc) = @_;
147
148     # Create a connection end-point object
149     my $conn = $pkg;
150         unless (ref $pkg) {
151                 $conn = $pkg->new($rproc);
152         }
153         $conn->{peerhost} = $to_host;
154         $conn->{peerport} = $to_port;
155         $conn->{sort} = 'Outgoing';
156         
157     # Create a new internet socket
158     my $sock = IO::Socket::INET->new();
159     return undef unless $sock;
160         
161         my $proto = getprotobyname('tcp');
162         $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
163         
164         blocking($sock, 0);
165         $conn->{blocking} = 0;
166
167         my $ip = gethostbyname($to_host);
168 #       my $r = $sock->connect($to_port, $ip);
169         my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
170         return undef unless $r || _err_will_block($!);
171         
172         $conn->{sock} = $sock;
173     
174     if ($conn->{rproc}) {
175         my $callback = sub {$conn->_rcv};
176         set_event_handler ($sock, read => $callback);
177     }
178     return $conn;
179 }
180
181 sub disconnect {
182     my $conn = shift;
183         return if exists $conn->{disconnecting};
184
185         $conn->{disconnecting} = 1;
186     my $sock = delete $conn->{sock};
187         $conn->{state} = 'E';
188         $conn->{timeout}->del if $conn->{timeout};
189
190         # be careful to delete the correct one
191         my $call;
192         if ($call = $conn->{call}) {
193                 my $ref = $conns{$call};
194                 delete $conns{$call} if $ref && $ref == $conn;
195         }
196         $call ||= 'unallocated';
197         dbg('connll', "Connection $conn->{cnum} $call disconnected");
198         
199         unless ($main::is_win) {
200                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
201         }
202
203         # get rid of any references
204         for (keys %$conn) {
205                 if (ref($conn->{$_})) {
206                         delete $conn->{$_};
207                 }
208         }
209
210         return unless defined($sock);
211     set_event_handler ($sock, read => undef, write => undef, error => undef);
212     shutdown($sock, 3);
213         close($sock);
214 }
215
216 sub send_now {
217     my ($conn, $msg) = @_;
218     $conn->enqueue($msg);
219     $conn->_send (1); # 1 ==> flush
220 }
221
222 sub send_later {
223     my ($conn, $msg) = @_;
224     $conn->enqueue($msg);
225     my $sock = $conn->{sock};
226     return unless defined($sock);
227     set_event_handler ($sock, write => sub {$conn->_send(0)});
228 }
229
230 sub enqueue {
231     my $conn = shift;
232     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
233 }
234
235 sub _send {
236     my ($conn, $flush) = @_;
237     my $sock = $conn->{sock};
238     return unless defined($sock);
239     my $rq = $conn->{outqueue};
240
241     # If $flush is set, set the socket to blocking, and send all
242     # messages in the queue - return only if there's an error
243     # If $flush is 0 (deferred mode) make the socket non-blocking, and
244     # return to the event loop only after every message, or if it
245     # is likely to block in the middle of a message.
246
247         if ($conn->{blocking} != $flush) {
248                 blocking($sock, $flush);
249                 $conn->{blocking} = $flush;
250         }
251     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
252
253     while (@$rq) {
254         my $msg            = $rq->[0];
255                 my $mlth           = length($msg);
256         my $bytes_to_write = $mlth - $offset;
257         my $bytes_written  = 0;
258                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
259         while ($bytes_to_write > 0) {
260             $bytes_written = syswrite ($sock, $msg,
261                                        $bytes_to_write, $offset);
262             if (!defined($bytes_written)) {
263                 if (_err_will_block($!)) {
264                     # Should happen only in deferred mode. Record how
265                     # much we have already sent.
266                     $conn->{send_offset} = $offset;
267                     # Event handler should already be set, so we will
268                     # be called back eventually, and will resume sending
269                     return 1;
270                 } else {    # Uh, oh
271                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
272                                         $conn->disconnect;
273                     return 0; # fail. Message remains in queue ..
274                 }
275             } elsif (isdbg('raw')) {
276                                 my $call = $conn->{call} || 'none';
277                                 dbgdump('raw', "$call send $bytes_written: ", $msg);
278                         }
279             $offset         += $bytes_written;
280             $bytes_to_write -= $bytes_written;
281         }
282         delete $conn->{send_offset};
283         $offset = 0;
284         shift @$rq;
285         last unless $flush; # Go back to select and wait
286                             # for it to fire again.
287     }
288     # Call me back if queue has not been drained.
289     if (@$rq) {
290         set_event_handler ($sock, write => sub {$conn->_send(0)});
291     } else {
292         set_event_handler ($sock, write => undef);
293                 if (exists $conn->{close_on_empty}) {
294                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
295                         $conn->disconnect; 
296                 }
297     }
298     1;  # Success
299 }
300
301 sub dup_sock
302 {
303         my $conn = shift;
304         my $oldsock = $conn->{sock};
305         my $rc = $rd_callbacks{$oldsock};
306         my $wc = $wt_callbacks{$oldsock};
307         my $ec = $er_callbacks{$oldsock};
308         my $sock = $oldsock->new_from_fd($oldsock, "w+");
309         if ($sock) {
310                 set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
311                 $conn->{sock} = $sock;
312                 set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
313                 $oldsock->close;
314         }
315 }
316
317 sub _err_will_block {
318         return 0 unless $blocking_supported;
319         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
320 }
321
322 sub close_on_empty
323 {
324         my $conn = shift;
325         $conn->{close_on_empty} = 1;
326 }
327
328 #-----------------------------------------------------------------
329 # Receive side routines
330
331 sub new_server {
332     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
333     my ($pkg, $my_host, $my_port, $login_proc) = @_;
334         my $self = $pkg->new($login_proc);
335         
336     $self->{sock} = IO::Socket::INET->new (
337                                           LocalAddr => $my_host,
338                                           LocalPort => $my_port,
339                                           Listen    => SOMAXCONN,
340                                           Proto     => 'tcp',
341                                           Reuse     => 1);
342     die "Could not create socket: $! \n" unless $self->{sock};
343     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
344         return $self;
345 }
346
347 sub dequeue
348 {
349         my $conn = shift;
350
351         if ($conn->{msg} =~ /\n/) {
352                 my @lines = split /\r?\n/, $conn->{msg};
353                 if ($conn->{msg} =~ /\n$/) {
354                         delete $conn->{msg};
355                 } else {
356                         $conn->{msg} = pop @lines;
357                 }
358                 for (@lines) {
359                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
360                 }
361         }
362 }
363
364 sub _rcv {                     # Complement to _send
365     my $conn = shift; # $rcv_now complement of $flush
366     # Find out how much has already been received, if at all
367     my ($msg, $offset, $bytes_to_read, $bytes_read);
368     my $sock = $conn->{sock};
369     return unless defined($sock);
370
371         my @lines;
372         if ($conn->{blocking}) {
373                 blocking($sock, 0);
374                 $conn->{blocking} = 0;
375         }
376         $bytes_read = sysread ($sock, $msg, 1024, 0);
377         if (defined ($bytes_read)) {
378                 if ($bytes_read > 0) {
379                         $conn->{msg} .= $msg;
380                         if (isdbg('raw')) {
381                                 my $call = $conn->{call} || 'none';
382                                 dbgdump('raw', "$call read $bytes_read: ", $msg);
383                         }
384                 } 
385         } else {
386                 if (_err_will_block($!)) {
387                         return ; 
388                 } else {
389                         $bytes_read = 0;
390                 }
391     }
392
393 FINISH:
394     if (defined $bytes_read && $bytes_read == 0) {
395                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
396                 $conn->disconnect;
397     } else {
398                 $conn->dequeue if exists $conn->{msg};
399         }
400 }
401
402 sub new_client {
403         my $server_conn = shift;
404     my $sock = $server_conn->{sock}->accept();
405         if ($sock) {
406                 my $conn = $server_conn->new($server_conn->{rproc});
407                 $conn->{sock} = $sock;
408                 blocking($sock, 0);
409                 $conn->{blocking} = 0;
410                 my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
411                 $conn->{sort} = 'Incoming';
412                 if ($eproc) {
413                         $conn->{eproc} = $eproc;
414                         set_event_handler ($sock, error => $eproc);
415                 }
416                 if ($rproc) {
417                         $conn->{rproc} = $rproc;
418                         my $callback = sub {$conn->_rcv};
419                         set_event_handler ($sock, read => $callback);
420                 } else {  # Login failed
421                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
422                         $conn->disconnect();
423                 }
424         } else {
425                 dbg('err', "Msg: error on accept ($!)");
426         }
427 }
428
429 sub close_server
430 {
431         my $conn = shift;
432         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
433         $conn->{sock}->close;
434 }
435
436 # close all clients (this is for forking really)
437 sub close_all_clients
438 {
439         foreach my $conn (values %conns) {
440                 $conn->disconnect;
441         }
442 }
443
444 #
445 #----------------------------------------------------
446 # Event loop routines used by both client and server
447
448 sub set_event_handler {
449     shift unless ref($_[0]); # shift if first arg is package name
450     my ($handle, %args) = @_;
451     my $callback;
452     if (exists $args{'write'}) {
453         $callback = $args{'write'};
454         if ($callback) {
455             $wt_callbacks{$handle} = $callback;
456             $wt_handles->add($handle);
457         } else {
458             delete $wt_callbacks{$handle};
459             $wt_handles->remove($handle);
460         }
461     }
462     if (exists $args{'read'}) {
463         $callback = $args{'read'};
464         if ($callback) {
465             $rd_callbacks{$handle} = $callback;
466             $rd_handles->add($handle);
467         } else {
468             delete $rd_callbacks{$handle};
469             $rd_handles->remove($handle);
470        }
471     }
472     if (exists $args{'error'}) {
473         $callback = $args{'error'};
474         if ($callback) {
475             $er_callbacks{$handle} = $callback;
476             $er_handles->add($handle);
477         } else {
478             delete $er_callbacks{$handle};
479             $er_handles->remove($handle);
480        }
481     }
482 }
483
484 sub event_loop {
485     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
486     my ($conn, $r, $w, $e, $rset, $wset, $eset);
487     while (1) {
488  
489        # Quit the loop if no handles left to process
490         last unless ($rd_handles->count() || $wt_handles->count());
491         
492                 ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
493                 
494         foreach $e (@$eset) {
495             &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
496         }
497         foreach $r (@$rset) {
498             &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
499         }
500         foreach $w (@$wset) {
501             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
502         }
503
504                 Timer::handler;
505                 
506         if (defined($loop_count)) {
507             last unless --$loop_count;
508         }
509     }
510 }
511
512 sub sleep
513 {
514         my ($pkg, $interval) = @_;
515         my $now = time;
516         while (time - $now < $interval) {
517                 $pkg->event_loop(10, 0.01);
518         }
519 }
520
521 sub DESTROY
522 {
523         my $conn = shift;
524         my $call = $conn->{call} || 'unallocated';
525         my $host = $conn->{peerhost} || '';
526         my $port = $conn->{peerport} || '';
527         dbg('connll', "Connection $conn->{cnum} $call [$host $port] being destroyed");
528         $noconns--;
529 }
530
531 1;
532
533 __END__
534