decode AGW messages when there 36 bytes available
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 use DXDebug;
17 use Timer;
18
19 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported);
20
21 %rd_callbacks = ();
22 %wt_callbacks = ();
23 %er_callbacks = ();
24 $rd_handles   = IO::Select->new();
25 $wt_handles   = IO::Select->new();
26 $er_handles   = IO::Select->new();
27
28 $now = time;
29
30 BEGIN {
31     # Checks if blocking is supported
32     eval {
33         require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL))
34     };
35         if ($@ || $main::is_win) {
36                 print STDERR "POSIX Blocking *** NOT *** supported $@\n";
37                 $blocking_supported = 0;
38         } else {
39                 $blocking_supported = 1;
40                 print STDERR "POSIX Blocking enabled\n";
41         }
42
43
44         # import as many of these errno values as are available
45         eval {
46                 require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
47         };
48 }
49
50 my $w = $^W;
51 $^W = 0;
52 my $eagain = eval {EAGAIN()};
53 my $einprogress = eval {EINPROGRESS()};
54 my $ewouldblock = eval {EWOULDBLOCK()};
55 $^W = $w;
56
57 #
58 #-----------------------------------------------------------------
59 # Generalised initializer
60
61 sub new
62 {
63     my ($pkg, $rproc) = @_;
64         my $obj = ref($pkg);
65         my $class = $obj || $pkg;
66
67     my $conn = {
68         rproc => $rproc,
69                 inqueue => [],
70                 outqueue => [],
71                 state => 0,
72                 lineend => "\r\n",
73                 csort => 'telnet',
74                 timeval => 60,
75                 blocking => 0,
76     };
77
78         $noconns++;
79         dbg('connll', "Connection created ($noconns)");
80         return bless $conn, $class;
81 }
82
83 sub set_error
84 {
85         my $conn = shift;
86         my $callback = shift;
87         $conn->{eproc} = $callback;
88         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
89 }
90
91 sub set_rproc
92 {
93         my $conn = shift;
94         my $callback = shift;
95         $conn->{rproc} = $callback;
96 }
97
98 sub blocking
99 {
100         return unless $blocking_supported;
101         
102         my $flags = fcntl ($_[0], F_GETFL, 0);
103         if ($_[1]) {
104                 $flags &= ~O_NONBLOCK;
105         } else {
106                 $flags |= O_NONBLOCK;
107         }
108         fcntl ($_[0], F_SETFL, $flags);
109 }
110
111 # save it
112 sub conns
113 {
114         my $pkg = shift;
115         my $call = shift;
116         my $ref;
117         
118         if (ref $pkg) {
119                 $call = $pkg->{call} unless $call;
120                 return undef unless $call;
121                 confess "changing $pkg->{call} to $call" if exists $pkg->{call} && $call ne $pkg->{call};
122                 $pkg->{call} = $call;
123                 $ref = $conns{$call} = $pkg;
124                 dbg('connll', "Connection $call stored");
125         } else {
126                 $ref = $conns{$call};
127         }
128         return $ref;
129 }
130
131 # this is only called by any dependent processes going away unexpectedly
132 sub pid_gone
133 {
134         my ($pkg, $pid) = @_;
135         
136         my @pid = grep {$_->{pid} == $pid} values %conns;
137         for (@pid) {
138                 &{$_->{eproc}}($_, "$pid has gorn") if exists $_->{eproc};
139                 $_->disconnect;
140         }
141 }
142
143 #-----------------------------------------------------------------
144 # Send side routines
145 sub connect {
146     my ($pkg, $to_host, $to_port, $rproc) = @_;
147
148     # Create a connection end-point object
149     my $conn = $pkg;
150         unless (ref $pkg) {
151                 $conn = $pkg->new($rproc);
152         }
153         $conn->{peerhost} = $to_host;
154         $conn->{peerport} = $to_port;
155         $conn->{sort} = 'Outgoing';
156         
157     # Create a new internet socket
158     my $sock = IO::Socket::INET->new();
159     return undef unless $sock;
160         
161         my $proto = getprotobyname('tcp');
162         $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
163         
164         blocking($sock, 0);
165         $conn->{blocking} = 0;
166
167         my $ip = gethostbyname($to_host);
168 #       my $r = $sock->connect($to_port, $ip);
169         my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
170         return undef unless $r || _err_will_block($!);
171         
172         $conn->{sock} = $sock;
173     
174     if ($conn->{rproc}) {
175         my $callback = sub {$conn->_rcv};
176         set_event_handler ($sock, read => $callback);
177     }
178     return $conn;
179 }
180
181 sub disconnect {
182     my $conn = shift;
183         return if exists $conn->{disconnecting};
184
185         $conn->{disconnecting} = 1;
186     my $sock = delete $conn->{sock};
187         $conn->{state} = 'E';
188         $conn->{timeout}->del if $conn->{timeout};
189
190         # be careful to delete the correct one
191         my $call;
192         if ($call = $conn->{call}) {
193                 my $ref = $conns{$call};
194                 delete $conns{$call} if $ref && $ref == $conn;
195         }
196         $call ||= 'unallocated';
197         dbg('connll', "Connection $call disconnected");
198         
199         unless ($main::is_win) {
200                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
201         }
202
203         # get rid of any references
204         for (keys %$conn) {
205                 if (ref($conn->{$_})) {
206                         delete $conn->{$_};
207                 }
208         }
209
210         return unless defined($sock);
211     set_event_handler ($sock, read => undef, write => undef, error => undef);
212     shutdown($sock, 3);
213         close($sock);
214 }
215
216 sub send_now {
217     my ($conn, $msg) = @_;
218     $conn->enqueue($msg);
219     $conn->_send (1); # 1 ==> flush
220 }
221
222 sub send_later {
223     my ($conn, $msg) = @_;
224     $conn->enqueue($msg);
225     my $sock = $conn->{sock};
226     return unless defined($sock);
227     set_event_handler ($sock, write => sub {$conn->_send(0)});
228 }
229
230 sub enqueue {
231     my $conn = shift;
232     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
233 }
234
235 sub _send {
236     my ($conn, $flush) = @_;
237     my $sock = $conn->{sock};
238     return unless defined($sock);
239     my $rq = $conn->{outqueue};
240
241     # If $flush is set, set the socket to blocking, and send all
242     # messages in the queue - return only if there's an error
243     # If $flush is 0 (deferred mode) make the socket non-blocking, and
244     # return to the event loop only after every message, or if it
245     # is likely to block in the middle of a message.
246
247         if ($conn->{blocking} != $flush) {
248                 blocking($sock, $flush);
249                 $conn->{blocking} = $flush;
250         }
251     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
252
253     while (@$rq) {
254         my $msg            = $rq->[0];
255                 my $mlth           = length($msg);
256         my $bytes_to_write = $mlth - $offset;
257         my $bytes_written  = 0;
258                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
259         while ($bytes_to_write > 0) {
260             $bytes_written = syswrite ($sock, $msg,
261                                        $bytes_to_write, $offset);
262             if (!defined($bytes_written)) {
263                 if (_err_will_block($!)) {
264                     # Should happen only in deferred mode. Record how
265                     # much we have already sent.
266                     $conn->{send_offset} = $offset;
267                     # Event handler should already be set, so we will
268                     # be called back eventually, and will resume sending
269                     return 1;
270                 } else {    # Uh, oh
271                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
272                                         $conn->disconnect;
273                     return 0; # fail. Message remains in queue ..
274                 }
275             }
276             $offset         += $bytes_written;
277             $bytes_to_write -= $bytes_written;
278         }
279         delete $conn->{send_offset};
280         $offset = 0;
281         shift @$rq;
282         last unless $flush; # Go back to select and wait
283                             # for it to fire again.
284     }
285     # Call me back if queue has not been drained.
286     if (@$rq) {
287         set_event_handler ($sock, write => sub {$conn->_send(0)});
288     } else {
289         set_event_handler ($sock, write => undef);
290                 if (exists $conn->{close_on_empty}) {
291                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
292                         $conn->disconnect; 
293                 }
294     }
295     1;  # Success
296 }
297
298 sub dup_sock
299 {
300         my $conn = shift;
301         my $oldsock = $conn->{sock};
302         my $rc = $rd_callbacks{$oldsock};
303         my $wc = $wt_callbacks{$oldsock};
304         my $ec = $er_callbacks{$oldsock};
305         my $sock = $oldsock->new_from_fd($oldsock, "w+");
306         if ($sock) {
307                 set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
308                 $conn->{sock} = $sock;
309                 set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
310                 $oldsock->close;
311         }
312 }
313
314 sub _err_will_block {
315         return 0 unless $blocking_supported;
316         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
317 }
318
319 sub close_on_empty
320 {
321         my $conn = shift;
322         $conn->{close_on_empty} = 1;
323 }
324
325 #-----------------------------------------------------------------
326 # Receive side routines
327
328 sub new_server {
329     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
330     my ($pkg, $my_host, $my_port, $login_proc) = @_;
331         my $self = $pkg->new($login_proc);
332         
333     $self->{sock} = IO::Socket::INET->new (
334                                           LocalAddr => $my_host,
335                                           LocalPort => $my_port,
336                                           Listen    => SOMAXCONN,
337                                           Proto     => 'tcp',
338                                           Reuse     => 1);
339     die "Could not create socket: $! \n" unless $self->{sock};
340     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
341         return $self;
342 }
343
344 sub dequeue
345 {
346         my $conn = shift;
347
348         if ($conn->{msg} =~ /\n/) {
349                 my @lines = split /\r?\n/, $conn->{msg};
350                 if ($conn->{msg} =~ /\n$/) {
351                         delete $conn->{msg};
352                 } else {
353                         $conn->{msg} = pop @lines;
354                 }
355                 for (@lines) {
356                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
357                 }
358         }
359 }
360
361 sub _rcv {                     # Complement to _send
362     my $conn = shift; # $rcv_now complement of $flush
363     # Find out how much has already been received, if at all
364     my ($msg, $offset, $bytes_to_read, $bytes_read);
365     my $sock = $conn->{sock};
366     return unless defined($sock);
367
368         my @lines;
369         if ($conn->{blocking}) {
370                 blocking($sock, 0);
371                 $conn->{blocking} = 0;
372         }
373         $bytes_read = sysread ($sock, $msg, 1024, 0);
374         if (defined ($bytes_read)) {
375                 if ($bytes_read > 0) {
376                         $conn->{msg} .= $msg;
377                 } 
378         } else {
379                 if (_err_will_block($!)) {
380                         return ; 
381                 } else {
382                         $bytes_read = 0;
383                 }
384     }
385
386 FINISH:
387     if (defined $bytes_read && $bytes_read == 0) {
388                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
389                 $conn->disconnect;
390     } else {
391                 $conn->dequeue if exists $conn->{msg};
392         }
393 }
394
395 sub new_client {
396         my $server_conn = shift;
397     my $sock = $server_conn->{sock}->accept();
398         if ($sock) {
399                 my $conn = $server_conn->new($server_conn->{rproc});
400                 $conn->{sock} = $sock;
401                 blocking($sock, 0);
402                 $conn->{blocking} = 0;
403                 my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
404                 $conn->{sort} = 'Incoming';
405                 if ($eproc) {
406                         $conn->{eproc} = $eproc;
407                         set_event_handler ($sock, error => $eproc);
408                 }
409                 if ($rproc) {
410                         $conn->{rproc} = $rproc;
411                         my $callback = sub {$conn->_rcv};
412                         set_event_handler ($sock, read => $callback);
413                 } else {  # Login failed
414                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
415                         $conn->disconnect();
416                 }
417         } else {
418                 dbg('err', "Msg: error on accept ($!)");
419         }
420 }
421
422 sub close_server
423 {
424         my $conn = shift;
425         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
426         $conn->{sock}->close;
427 }
428
429 # close all clients (this is for forking really)
430 sub close_all_clients
431 {
432         for (values %conns) {
433                 $_->disconnect;
434         }
435 }
436
437 #
438 #----------------------------------------------------
439 # Event loop routines used by both client and server
440
441 sub set_event_handler {
442     shift unless ref($_[0]); # shift if first arg is package name
443     my ($handle, %args) = @_;
444     my $callback;
445     if (exists $args{'write'}) {
446         $callback = $args{'write'};
447         if ($callback) {
448             $wt_callbacks{$handle} = $callback;
449             $wt_handles->add($handle);
450         } else {
451             delete $wt_callbacks{$handle};
452             $wt_handles->remove($handle);
453         }
454     }
455     if (exists $args{'read'}) {
456         $callback = $args{'read'};
457         if ($callback) {
458             $rd_callbacks{$handle} = $callback;
459             $rd_handles->add($handle);
460         } else {
461             delete $rd_callbacks{$handle};
462             $rd_handles->remove($handle);
463        }
464     }
465     if (exists $args{'error'}) {
466         $callback = $args{'error'};
467         if ($callback) {
468             $er_callbacks{$handle} = $callback;
469             $er_handles->add($handle);
470         } else {
471             delete $er_callbacks{$handle};
472             $er_handles->remove($handle);
473        }
474     }
475 }
476
477 sub event_loop {
478     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
479     my ($conn, $r, $w, $e, $rset, $wset, $eset);
480     while (1) {
481  
482        # Quit the loop if no handles left to process
483         last unless ($rd_handles->count() || $wt_handles->count());
484         
485                 ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
486                 
487         foreach $e (@$eset) {
488             &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
489         }
490         foreach $r (@$rset) {
491             &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
492         }
493         foreach $w (@$wset) {
494             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
495         }
496
497                 Timer::handler;
498                 
499         if (defined($loop_count)) {
500             last unless --$loop_count;
501         }
502     }
503 }
504
505 sub sleep
506 {
507         my ($pkg, $interval) = @_;
508         my $now = time;
509         while (time - $now < $interval) {
510                 $pkg->event_loop(10, 0.01);
511         }
512 }
513
514 sub DESTROY
515 {
516         my $conn = shift;
517         my $call = $conn->{call} || 'unallocated';
518         dbg('connll', "Connection $call being destroyed ($noconns)");
519         $noconns--;
520 }
521
522 1;
523
524 __END__
525