fix non-blocking so that it actually doesn't block!
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 use DXDebug;
17 use Timer;
18
19 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported);
20
21 %rd_callbacks = ();
22 %wt_callbacks = ();
23 %er_callbacks = ();
24 $rd_handles   = IO::Select->new();
25 $wt_handles   = IO::Select->new();
26 $er_handles   = IO::Select->new();
27
28 $now = time;
29
30 BEGIN {
31     # Checks if blocking is supported
32     eval {
33         require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL))
34     };
35         if ($@) {
36                 print STDERR "POSIX Blocking *** NOT *** supported $@\n";
37         } else {
38                 $blocking_supported = 1;
39                 print STDERR "POSIX Blocking enabled\n";
40         }
41
42
43         # import as many of these errno values as are available
44         eval {
45                 require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
46         };
47 }
48
49 my $w = $^W;
50 $^W = 0;
51 my $eagain = eval {EAGAIN()};
52 my $einprogress = eval {EINPROGRESS()};
53 my $ewouldblock = eval {EWOULDBLOCK()};
54 $^W = $w;
55
56 #
57 #-----------------------------------------------------------------
58 # Generalised initializer
59
60 sub new
61 {
62     my ($pkg, $rproc) = @_;
63         my $obj = ref($pkg);
64         my $class = $obj || $pkg;
65
66     my $conn = {
67         rproc => $rproc,
68                 inqueue => [],
69                 outqueue => [],
70                 state => 0,
71                 lineend => "\r\n",
72                 csort => 'telnet',
73                 timeval => 60,
74                 blocking => 0,
75     };
76
77         $noconns++;
78         dbg('connll', "Connection created ($noconns)");
79         return bless $conn, $class;
80 }
81
82 sub set_error
83 {
84         my $conn = shift;
85         my $callback = shift;
86         $conn->{eproc} = $callback;
87         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
88 }
89
90 sub set_rproc
91 {
92         my $conn = shift;
93         my $callback = shift;
94         $conn->{rproc} = $callback;
95 }
96
97 sub blocking
98 {
99         return unless $blocking_supported;
100         
101         my $flags = fcntl ($_[0], F_GETFL, 0);
102         if ($_[1]) {
103                 $flags &= ~O_NONBLOCK;
104         } else {
105                 $flags |= O_NONBLOCK;
106         }
107         fcntl ($_[0], F_SETFL, $flags);
108 }
109
110 # save it
111 sub conns
112 {
113         my $pkg = shift;
114         my $call = shift;
115         my $ref;
116         
117         if (ref $pkg) {
118                 $call = $pkg->{call} unless $call;
119                 return undef unless $call;
120                 confess "changing $pkg->{call} to $call" if exists $pkg->{call} && $call ne $pkg->{call};
121                 $pkg->{call} = $call;
122                 $ref = $conns{$call} = $pkg;
123                 dbg('connll', "Connection $call stored");
124         } else {
125                 $ref = $conns{$call};
126         }
127         return $ref;
128 }
129
130 # this is only called by any dependent processes going away unexpectedly
131 sub pid_gone
132 {
133         my ($pkg, $pid) = @_;
134         
135         my @pid = grep {$_->{pid} == $pid} values %conns;
136         for (@pid) {
137                 &{$_->{eproc}}($_, "$pid has gorn") if exists $_->{eproc};
138                 $_->disconnect;
139         }
140 }
141
142 #-----------------------------------------------------------------
143 # Send side routines
144 sub connect {
145     my ($pkg, $to_host, $to_port, $rproc) = @_;
146
147     # Create a connection end-point object
148     my $conn = $pkg;
149         unless (ref $pkg) {
150                 $conn = $pkg->new($rproc);
151         }
152         $conn->{peerhost} = $to_host;
153         $conn->{peerport} = $to_port;
154         $conn->{sort} = 'Outgoing';
155         
156     # Create a new internet socket
157     my $sock = IO::Socket::INET->new();
158     return undef unless $sock;
159         
160         my $proto = getprotobyname('tcp');
161         $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
162         
163         blocking($sock, 0);
164         $conn->{blocking} = 0;
165
166         my $ip = gethostbyname($to_host);
167 #       my $r = $sock->connect($to_port, $ip);
168         my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
169         return undef unless $r || _err_will_block($!);
170         
171         $conn->{sock} = $sock;
172     
173     if ($conn->{rproc}) {
174         my $callback = sub {$conn->_rcv};
175         set_event_handler ($sock, read => $callback);
176     }
177     return $conn;
178 }
179
180 sub disconnect {
181     my $conn = shift;
182         return if exists $conn->{disconnecting};
183
184         $conn->{disconnecting} = 1;
185     my $sock = delete $conn->{sock};
186         $conn->{state} = 'E';
187         $conn->{timeout}->del if $conn->{timeout};
188
189         # be careful to delete the correct one
190         my $call;
191         if ($call = $conn->{call}) {
192                 my $ref = $conns{$call};
193                 delete $conns{$call} if $ref && $ref == $conn;
194         }
195         $call ||= 'unallocated';
196         dbg('connll', "Connection $call disconnected");
197         
198         unless ($^O =~ /^MS/i) {
199                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
200         }
201
202         # get rid of any references
203         for (keys %$conn) {
204                 if (ref($conn->{$_})) {
205                         delete $conn->{$_};
206                 }
207         }
208
209         return unless defined($sock);
210     set_event_handler ($sock, read => undef, write => undef, error => undef);
211     shutdown($sock, 3);
212         close($sock);
213 }
214
215 sub send_now {
216     my ($conn, $msg) = @_;
217     $conn->enqueue($msg);
218     $conn->_send (1); # 1 ==> flush
219 }
220
221 sub send_later {
222     my ($conn, $msg) = @_;
223     $conn->enqueue($msg);
224     my $sock = $conn->{sock};
225     return unless defined($sock);
226     set_event_handler ($sock, write => sub {$conn->_send(0)});
227 }
228
229 sub enqueue {
230     my $conn = shift;
231     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
232 }
233
234 sub _send {
235     my ($conn, $flush) = @_;
236     my $sock = $conn->{sock};
237     return unless defined($sock);
238     my $rq = $conn->{outqueue};
239
240     # If $flush is set, set the socket to blocking, and send all
241     # messages in the queue - return only if there's an error
242     # If $flush is 0 (deferred mode) make the socket non-blocking, and
243     # return to the event loop only after every message, or if it
244     # is likely to block in the middle of a message.
245
246         if ($conn->{blocking} != $flush) {
247                 blocking($sock, $flush);
248                 $conn->{blocking} = $flush;
249         }
250     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
251
252     while (@$rq) {
253         my $msg            = $rq->[0];
254                 my $mlth           = length($msg);
255         my $bytes_to_write = $mlth - $offset;
256         my $bytes_written  = 0;
257                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
258         while ($bytes_to_write > 0) {
259             $bytes_written = syswrite ($sock, $msg,
260                                        $bytes_to_write, $offset);
261             if (!defined($bytes_written)) {
262                 if (_err_will_block($!)) {
263                     # Should happen only in deferred mode. Record how
264                     # much we have already sent.
265                     $conn->{send_offset} = $offset;
266                     # Event handler should already be set, so we will
267                     # be called back eventually, and will resume sending
268                     return 1;
269                 } else {    # Uh, oh
270                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
271                                         $conn->disconnect;
272                     return 0; # fail. Message remains in queue ..
273                 }
274             }
275             $offset         += $bytes_written;
276             $bytes_to_write -= $bytes_written;
277         }
278         delete $conn->{send_offset};
279         $offset = 0;
280         shift @$rq;
281         last unless $flush; # Go back to select and wait
282                             # for it to fire again.
283     }
284     # Call me back if queue has not been drained.
285     if (@$rq) {
286         set_event_handler ($sock, write => sub {$conn->_send(0)});
287     } else {
288         set_event_handler ($sock, write => undef);
289                 if (exists $conn->{close_on_empty}) {
290                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
291                         $conn->disconnect; 
292                 }
293     }
294     1;  # Success
295 }
296
297 sub dup_sock
298 {
299         my $conn = shift;
300         my $oldsock = $conn->{sock};
301         my $rc = $rd_callbacks{$oldsock};
302         my $wc = $wt_callbacks{$oldsock};
303         my $ec = $er_callbacks{$oldsock};
304         my $sock = $oldsock->new_from_fd($oldsock, "w+");
305         if ($sock) {
306                 set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
307                 $conn->{sock} = $sock;
308                 set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
309                 $oldsock->close;
310         }
311 }
312
313 sub _err_will_block {
314         return 0 unless $blocking_supported;
315         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
316 }
317
318 sub close_on_empty
319 {
320         my $conn = shift;
321         $conn->{close_on_empty} = 1;
322 }
323
324 #-----------------------------------------------------------------
325 # Receive side routines
326
327 sub new_server {
328     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
329     my ($pkg, $my_host, $my_port, $login_proc) = @_;
330         my $self = $pkg->new($login_proc);
331         
332     $self->{sock} = IO::Socket::INET->new (
333                                           LocalAddr => $my_host,
334                                           LocalPort => $my_port,
335                                           Listen    => SOMAXCONN,
336                                           Proto     => 'tcp',
337                                           Reuse     => 1);
338     die "Could not create socket: $! \n" unless $self->{sock};
339     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
340         return $self;
341 }
342
343 sub dequeue
344 {
345         my $conn = shift;
346
347         if ($conn->{msg} =~ /\n/) {
348                 my @lines = split /\r?\n/, $conn->{msg};
349                 if ($conn->{msg} =~ /\n$/) {
350                         delete $conn->{msg};
351                 } else {
352                         $conn->{msg} = pop @lines;
353                 }
354                 for (@lines) {
355                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
356                 }
357         }
358 }
359
360 sub _rcv {                     # Complement to _send
361     my $conn = shift; # $rcv_now complement of $flush
362     # Find out how much has already been received, if at all
363     my ($msg, $offset, $bytes_to_read, $bytes_read);
364     my $sock = $conn->{sock};
365     return unless defined($sock);
366
367         my @lines;
368         if ($conn->{blocking}) {
369                 blocking($sock, 0);
370                 $conn->{blocking} = 0;
371         }
372         $bytes_read = sysread ($sock, $msg, 1024, 0);
373         if (defined ($bytes_read)) {
374                 if ($bytes_read > 0) {
375                         $conn->{msg} .= $msg;
376                 } 
377         } else {
378                 if (_err_will_block($!)) {
379                         return ; 
380                 } else {
381                         $bytes_read = 0;
382                 }
383     }
384
385 FINISH:
386     if (defined $bytes_read && $bytes_read == 0) {
387                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
388                 $conn->disconnect;
389     } else {
390                 $conn->dequeue if exists $conn->{msg};
391         }
392 }
393
394 sub new_client {
395         my $server_conn = shift;
396     my $sock = $server_conn->{sock}->accept();
397         if ($sock) {
398                 my $conn = $server_conn->new($server_conn->{rproc});
399                 $conn->{sock} = $sock;
400                 blocking($sock, 0);
401                 $conn->{blocking} = 0;
402                 my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
403                 $conn->{sort} = 'Incoming';
404                 if ($eproc) {
405                         $conn->{eproc} = $eproc;
406                         set_event_handler ($sock, error => $eproc);
407                 }
408                 if ($rproc) {
409                         $conn->{rproc} = $rproc;
410                         my $callback = sub {$conn->_rcv};
411                         set_event_handler ($sock, read => $callback);
412                 } else {  # Login failed
413                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
414                         $conn->disconnect();
415                 }
416         } else {
417                 dbg('err', "Msg: error on accept ($!)");
418         }
419 }
420
421 sub close_server
422 {
423         my $conn = shift;
424         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
425         $conn->{sock}->close;
426 }
427
428 # close all clients (this is for forking really)
429 sub close_all_clients
430 {
431         for (values %conns) {
432                 $_->disconnect;
433         }
434 }
435
436 #
437 #----------------------------------------------------
438 # Event loop routines used by both client and server
439
440 sub set_event_handler {
441     shift unless ref($_[0]); # shift if first arg is package name
442     my ($handle, %args) = @_;
443     my $callback;
444     if (exists $args{'write'}) {
445         $callback = $args{'write'};
446         if ($callback) {
447             $wt_callbacks{$handle} = $callback;
448             $wt_handles->add($handle);
449         } else {
450             delete $wt_callbacks{$handle};
451             $wt_handles->remove($handle);
452         }
453     }
454     if (exists $args{'read'}) {
455         $callback = $args{'read'};
456         if ($callback) {
457             $rd_callbacks{$handle} = $callback;
458             $rd_handles->add($handle);
459         } else {
460             delete $rd_callbacks{$handle};
461             $rd_handles->remove($handle);
462        }
463     }
464     if (exists $args{'error'}) {
465         $callback = $args{'error'};
466         if ($callback) {
467             $er_callbacks{$handle} = $callback;
468             $er_handles->add($handle);
469         } else {
470             delete $er_callbacks{$handle};
471             $er_handles->remove($handle);
472        }
473     }
474 }
475
476 sub event_loop {
477     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
478     my ($conn, $r, $w, $e, $rset, $wset, $eset);
479     while (1) {
480  
481        # Quit the loop if no handles left to process
482         last unless ($rd_handles->count() || $wt_handles->count());
483         
484                 ($rset, $wset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
485                 
486         foreach $e (@$eset) {
487             &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
488         }
489         foreach $r (@$rset) {
490             &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
491         }
492         foreach $w (@$wset) {
493             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
494         }
495
496                 Timer::handler;
497                 
498         if (defined($loop_count)) {
499             last unless --$loop_count;
500         }
501     }
502 }
503
504 sub sleep
505 {
506         my ($pkg, $interval) = @_;
507         my $now = time;
508         while (time - $now < $interval) {
509                 $pkg->event_loop(10, 0.01);
510         }
511 }
512
513 sub DESTROY
514 {
515         my $conn = shift;
516         my $call = $conn->{call} || 'unallocated';
517         dbg('connll', "Connection $call being destroyed ($noconns)");
518         $noconns--;
519 }
520
521 1;
522
523 __END__
524