alter changing %conns callsign logic slightly
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 use DXDebug;
17 use Timer;
18
19 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported);
20
21 %rd_callbacks = ();
22 %wt_callbacks = ();
23 %er_callbacks = ();
24 $rd_handles   = IO::Select->new();
25 $wt_handles   = IO::Select->new();
26 $er_handles   = IO::Select->new();
27
28 $now = time;
29
30 BEGIN {
31     # Checks if blocking is supported
32     eval {
33         require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL))
34     };
35         if ($@ || $main::is_win) {
36                 print STDERR "POSIX Blocking *** NOT *** supported $@\n";
37                 $blocking_supported = 0;
38         } else {
39                 $blocking_supported = 1;
40                 print STDERR "POSIX Blocking enabled\n";
41         }
42
43
44         # import as many of these errno values as are available
45         eval {
46                 require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
47         };
48 }
49
50 my $w = $^W;
51 $^W = 0;
52 my $eagain = eval {EAGAIN()};
53 my $einprogress = eval {EINPROGRESS()};
54 my $ewouldblock = eval {EWOULDBLOCK()};
55 $^W = $w;
56
57 #
58 #-----------------------------------------------------------------
59 # Generalised initializer
60
61 sub new
62 {
63     my ($pkg, $rproc) = @_;
64         my $obj = ref($pkg);
65         my $class = $obj || $pkg;
66
67     my $conn = {
68         rproc => $rproc,
69                 inqueue => [],
70                 outqueue => [],
71                 state => 0,
72                 lineend => "\r\n",
73                 csort => 'telnet',
74                 timeval => 60,
75                 blocking => 0,
76                 cnum => ++$noconns,
77     };
78
79         dbg('connll', "Connection created ($noconns)");
80         return bless $conn, $class;
81 }
82
83 sub set_error
84 {
85         my $conn = shift;
86         my $callback = shift;
87         $conn->{eproc} = $callback;
88         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
89 }
90
91 sub set_rproc
92 {
93         my $conn = shift;
94         my $callback = shift;
95         $conn->{rproc} = $callback;
96 }
97
98 sub blocking
99 {
100         return unless $blocking_supported;
101         
102         my $flags = fcntl ($_[0], F_GETFL, 0);
103         if ($_[1]) {
104                 $flags &= ~O_NONBLOCK;
105         } else {
106                 $flags |= O_NONBLOCK;
107         }
108         fcntl ($_[0], F_SETFL, $flags);
109 }
110
111 # save it
112 sub conns
113 {
114         my $pkg = shift;
115         my $call = shift;
116         my $ref;
117         
118         if (ref $pkg) {
119                 $call = $pkg->{call} unless $call;
120                 return undef unless $call;
121                 dbg('connll', "changing $pkg->{call} to $call") if exists $pkg->{call} && $call ne $pkg->{call};
122                 delete $conns{$pkg->{call}} if $pkg->{call} ne $call; 
123                 $pkg->{call} = $call;
124                 $ref = $conns{$call} = $pkg;
125                 dbg('connll', "Connection $pkg->{cnum} $call stored");
126         } else {
127                 $ref = $conns{$call};
128         }
129         return $ref;
130 }
131
132 # this is only called by any dependent processes going away unexpectedly
133 sub pid_gone
134 {
135         my ($pkg, $pid) = @_;
136         
137         my @pid = grep {$_->{pid} == $pid} values %conns;
138         foreach my $p (@pid) {
139                 &{$p->{eproc}}($p, "$pid has gorn") if exists $p->{eproc};
140                 $p->disconnect;
141         }
142 }
143
144 #-----------------------------------------------------------------
145 # Send side routines
146 sub connect {
147     my ($pkg, $to_host, $to_port, $rproc) = @_;
148
149     # Create a connection end-point object
150     my $conn = $pkg;
151         unless (ref $pkg) {
152                 $conn = $pkg->new($rproc);
153         }
154         $conn->{peerhost} = $to_host;
155         $conn->{peerport} = $to_port;
156         $conn->{sort} = 'Outgoing';
157         
158     # Create a new internet socket
159     my $sock = IO::Socket::INET->new();
160     return undef unless $sock;
161         
162         my $proto = getprotobyname('tcp');
163         $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
164         
165         blocking($sock, 0);
166         $conn->{blocking} = 0;
167
168         my $ip = gethostbyname($to_host);
169 #       my $r = $sock->connect($to_port, $ip);
170         my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
171         return undef unless $r || _err_will_block($!);
172         
173         $conn->{sock} = $sock;
174     
175     if ($conn->{rproc}) {
176         my $callback = sub {$conn->_rcv};
177         set_event_handler ($sock, read => $callback);
178     }
179     return $conn;
180 }
181
182 sub disconnect {
183     my $conn = shift;
184         return if exists $conn->{disconnecting};
185
186         $conn->{disconnecting} = 1;
187     my $sock = delete $conn->{sock};
188         $conn->{state} = 'E';
189         $conn->{timeout}->del if $conn->{timeout};
190
191         # be careful to delete the correct one
192         my $call;
193         if ($call = $conn->{call}) {
194                 my $ref = $conns{$call};
195                 delete $conns{$call} if $ref && $ref == $conn;
196         }
197         $call ||= 'unallocated';
198         dbg('connll', "Connection $conn->{cnum} $call disconnected");
199         
200         unless ($main::is_win) {
201                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
202         }
203
204         # get rid of any references
205         for (keys %$conn) {
206                 if (ref($conn->{$_})) {
207                         delete $conn->{$_};
208                 }
209         }
210
211         return unless defined($sock);
212     set_event_handler ($sock, read => undef, write => undef, error => undef);
213     shutdown($sock, 3);
214         close($sock);
215 }
216
217 sub send_now {
218     my ($conn, $msg) = @_;
219     $conn->enqueue($msg);
220     $conn->_send (1); # 1 ==> flush
221 }
222
223 sub send_later {
224     my ($conn, $msg) = @_;
225     $conn->enqueue($msg);
226     my $sock = $conn->{sock};
227     return unless defined($sock);
228     set_event_handler ($sock, write => sub {$conn->_send(0)});
229 }
230
231 sub enqueue {
232     my $conn = shift;
233     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
234 }
235
236 sub _send {
237     my ($conn, $flush) = @_;
238     my $sock = $conn->{sock};
239     return unless defined($sock);
240     my $rq = $conn->{outqueue};
241
242     # If $flush is set, set the socket to blocking, and send all
243     # messages in the queue - return only if there's an error
244     # If $flush is 0 (deferred mode) make the socket non-blocking, and
245     # return to the event loop only after every message, or if it
246     # is likely to block in the middle of a message.
247
248         if ($conn->{blocking} != $flush) {
249                 blocking($sock, $flush);
250                 $conn->{blocking} = $flush;
251         }
252     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
253
254     while (@$rq) {
255         my $msg            = $rq->[0];
256                 my $mlth           = length($msg);
257         my $bytes_to_write = $mlth - $offset;
258         my $bytes_written  = 0;
259                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
260         while ($bytes_to_write > 0) {
261             $bytes_written = syswrite ($sock, $msg,
262                                        $bytes_to_write, $offset);
263             if (!defined($bytes_written)) {
264                 if (_err_will_block($!)) {
265                     # Should happen only in deferred mode. Record how
266                     # much we have already sent.
267                     $conn->{send_offset} = $offset;
268                     # Event handler should already be set, so we will
269                     # be called back eventually, and will resume sending
270                     return 1;
271                 } else {    # Uh, oh
272                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
273                                         $conn->disconnect;
274                     return 0; # fail. Message remains in queue ..
275                 }
276             } elsif (isdbg('raw')) {
277                                 my $call = $conn->{call} || 'none';
278                                 dbgdump('raw', "$call send $bytes_written: ", $msg);
279                         }
280             $offset         += $bytes_written;
281             $bytes_to_write -= $bytes_written;
282         }
283         delete $conn->{send_offset};
284         $offset = 0;
285         shift @$rq;
286         last unless $flush; # Go back to select and wait
287                             # for it to fire again.
288     }
289     # Call me back if queue has not been drained.
290     if (@$rq) {
291         set_event_handler ($sock, write => sub {$conn->_send(0)});
292     } else {
293         set_event_handler ($sock, write => undef);
294                 if (exists $conn->{close_on_empty}) {
295                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
296                         $conn->disconnect; 
297                 }
298     }
299     1;  # Success
300 }
301
302 sub dup_sock
303 {
304         my $conn = shift;
305         my $oldsock = $conn->{sock};
306         my $rc = $rd_callbacks{$oldsock};
307         my $wc = $wt_callbacks{$oldsock};
308         my $ec = $er_callbacks{$oldsock};
309         my $sock = $oldsock->new_from_fd($oldsock, "w+");
310         if ($sock) {
311                 set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
312                 $conn->{sock} = $sock;
313                 set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
314                 $oldsock->close;
315         }
316 }
317
318 sub _err_will_block {
319         return 0 unless $blocking_supported;
320         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
321 }
322
323 sub close_on_empty
324 {
325         my $conn = shift;
326         $conn->{close_on_empty} = 1;
327 }
328
329 #-----------------------------------------------------------------
330 # Receive side routines
331
332 sub new_server {
333     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
334     my ($pkg, $my_host, $my_port, $login_proc) = @_;
335         my $self = $pkg->new($login_proc);
336         
337     $self->{sock} = IO::Socket::INET->new (
338                                           LocalAddr => $my_host,
339                                           LocalPort => $my_port,
340                                           Listen    => SOMAXCONN,
341                                           Proto     => 'tcp',
342                                           Reuse     => 1);
343     die "Could not create socket: $! \n" unless $self->{sock};
344     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
345         return $self;
346 }
347
348 sub dequeue
349 {
350         my $conn = shift;
351
352         if ($conn->{msg} =~ /\n/) {
353                 my @lines = split /\r?\n/, $conn->{msg};
354                 if ($conn->{msg} =~ /\n$/) {
355                         delete $conn->{msg};
356                 } else {
357                         $conn->{msg} = pop @lines;
358                 }
359                 for (@lines) {
360                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
361                 }
362         }
363 }
364
365 sub _rcv {                     # Complement to _send
366     my $conn = shift; # $rcv_now complement of $flush
367     # Find out how much has already been received, if at all
368     my ($msg, $offset, $bytes_to_read, $bytes_read);
369     my $sock = $conn->{sock};
370     return unless defined($sock);
371
372         my @lines;
373         if ($conn->{blocking}) {
374                 blocking($sock, 0);
375                 $conn->{blocking} = 0;
376         }
377         $bytes_read = sysread ($sock, $msg, 1024, 0);
378         if (defined ($bytes_read)) {
379                 if ($bytes_read > 0) {
380                         $conn->{msg} .= $msg;
381                         if (isdbg('raw')) {
382                                 my $call = $conn->{call} || 'none';
383                                 dbgdump('raw', "$call read $bytes_read: ", $msg);
384                         }
385                 } 
386         } else {
387                 if (_err_will_block($!)) {
388                         return ; 
389                 } else {
390                         $bytes_read = 0;
391                 }
392     }
393
394 FINISH:
395     if (defined $bytes_read && $bytes_read == 0) {
396                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
397                 $conn->disconnect;
398     } else {
399                 $conn->dequeue if exists $conn->{msg};
400         }
401 }
402
403 sub new_client {
404         my $server_conn = shift;
405     my $sock = $server_conn->{sock}->accept();
406         if ($sock) {
407                 my $conn = $server_conn->new($server_conn->{rproc});
408                 $conn->{sock} = $sock;
409                 blocking($sock, 0);
410                 $conn->{blocking} = 0;
411                 my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
412                 $conn->{sort} = 'Incoming';
413                 if ($eproc) {
414                         $conn->{eproc} = $eproc;
415                         set_event_handler ($sock, error => $eproc);
416                 }
417                 if ($rproc) {
418                         $conn->{rproc} = $rproc;
419                         my $callback = sub {$conn->_rcv};
420                         set_event_handler ($sock, read => $callback);
421                 } else {  # Login failed
422                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
423                         $conn->disconnect();
424                 }
425         } else {
426                 dbg('err', "Msg: error on accept ($!)");
427         }
428 }
429
430 sub close_server
431 {
432         my $conn = shift;
433         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
434         $conn->{sock}->close;
435 }
436
437 # close all clients (this is for forking really)
438 sub close_all_clients
439 {
440         foreach my $conn (values %conns) {
441                 $conn->disconnect;
442         }
443 }
444
445 #
446 #----------------------------------------------------
447 # Event loop routines used by both client and server
448
449 sub set_event_handler {
450     shift unless ref($_[0]); # shift if first arg is package name
451     my ($handle, %args) = @_;
452     my $callback;
453     if (exists $args{'write'}) {
454         $callback = $args{'write'};
455         if ($callback) {
456             $wt_callbacks{$handle} = $callback;
457             $wt_handles->add($handle);
458         } else {
459             delete $wt_callbacks{$handle};
460             $wt_handles->remove($handle);
461         }
462     }
463     if (exists $args{'read'}) {
464         $callback = $args{'read'};
465         if ($callback) {
466             $rd_callbacks{$handle} = $callback;
467             $rd_handles->add($handle);
468         } else {
469             delete $rd_callbacks{$handle};
470             $rd_handles->remove($handle);
471        }
472     }
473     if (exists $args{'error'}) {
474         $callback = $args{'error'};
475         if ($callback) {
476             $er_callbacks{$handle} = $callback;
477             $er_handles->add($handle);
478         } else {
479             delete $er_callbacks{$handle};
480             $er_handles->remove($handle);
481        }
482     }
483 }
484
485 sub event_loop {
486     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
487     my ($conn, $r, $w, $e, $rset, $wset, $eset);
488     while (1) {
489  
490        # Quit the loop if no handles left to process
491         last unless ($rd_handles->count() || $wt_handles->count());
492         
493                 ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
494                 
495         foreach $e (@$eset) {
496             &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
497         }
498         foreach $r (@$rset) {
499             &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
500         }
501         foreach $w (@$wset) {
502             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
503         }
504
505                 Timer::handler;
506                 
507         if (defined($loop_count)) {
508             last unless --$loop_count;
509         }
510     }
511 }
512
513 sub sleep
514 {
515         my ($pkg, $interval) = @_;
516         my $now = time;
517         while (time - $now < $interval) {
518                 $pkg->event_loop(10, 0.01);
519         }
520 }
521
522 sub DESTROY
523 {
524         my $conn = shift;
525         my $call = $conn->{call} || 'unallocated';
526         my $host = $conn->{peerhost} || '';
527         my $port = $conn->{peerport} || '';
528         dbg('connll', "Connection $conn->{cnum} $call [$host $port] being destroyed");
529         $noconns--;
530 }
531
532 1;
533
534 __END__
535