fix typo
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 use DXDebug;
17 use Timer;
18 use Errno qw(EWOULDBLOCK EAGAIN EINPROGRESS);
19 use POSIX qw(F_GETFL F_SETFL O_NONBLOCK);
20
21 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns);
22
23 %rd_callbacks = ();
24 %wt_callbacks = ();
25 %er_callbacks = ();
26 $rd_handles   = IO::Select->new();
27 $wt_handles   = IO::Select->new();
28 $er_handles   = IO::Select->new();
29
30 $now = time;
31 my $blocking_supported = 0;
32
33 BEGIN {
34     # Checks if blocking is supported
35     eval {
36         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK));
37     };
38     $blocking_supported = 1 unless $@;
39 }
40
41 my $w = $^W;
42 $^W = 0;
43 my $eagain = eval {EAGAIN()};
44 my $einprogress = eval {EINPROGRESS()};
45 my $ewouldblock = eval {EWOULDBLOCK()};
46 $^W = $w;
47
48 #
49 #-----------------------------------------------------------------
50 # Generalised initializer
51
52 sub new
53 {
54     my ($pkg, $rproc) = @_;
55         my $obj = ref($pkg);
56         my $class = $obj || $pkg;
57
58     my $conn = {
59         rproc => $rproc,
60                 inqueue => [],
61                 outqueue => [],
62                 state => 0,
63                 lineend => "\r\n",
64                 csort => 'telnet',
65                 timeval => 60,
66     };
67
68         $noconns++;
69         dbg('connll', "Connection created ($noconns)");
70         return bless $conn, $class;
71 }
72
73 sub set_error
74 {
75         my $conn = shift;
76         my $callback = shift;
77         $conn->{eproc} = $callback;
78         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
79 }
80
81 sub set_rproc
82 {
83         my $conn = shift;
84         my $callback = shift;
85         $conn->{rproc} = $callback;
86 }
87
88 sub blocking
89 {
90         return unless $blocking_supported;
91         
92         my $flags = fcntl ($_[0], F_GETFL, 0);
93         if ($_[1]) {
94                 $flags &= ~O_NONBLOCK;
95         } else {
96                 $flags |= O_NONBLOCK;
97         }
98         fcntl ($_[0], F_SETFL, $flags);
99 }
100
101 # save it
102 sub conns
103 {
104         my $pkg = shift;
105         my $call = shift;
106         my $ref;
107         
108         if (ref $pkg) {
109                 $call = $pkg->{call} unless $call;
110                 return undef unless $call;
111                 confess "changing $pkg->{call} to $call" if exists $pkg->{call} && $call ne $pkg->{call};
112                 $pkg->{call} = $call;
113                 $ref = $conns{$call} = $pkg;
114                 dbg('connll', "Connection $call stored");
115         } else {
116                 $ref = $conns{$call};
117         }
118         return $ref;
119 }
120
121 # this is only called by any dependent processes going away unexpectedly
122 sub pid_gone
123 {
124         my ($pkg, $pid) = @_;
125         
126         my @pid = grep {$_->{pid} == $pid} values %conns;
127         for (@pid) {
128                 &{$_->{eproc}}($_, "$pid has gorn") if exists $_->{eproc};
129                 $_->disconnect;
130         }
131 }
132
133 #-----------------------------------------------------------------
134 # Send side routines
135 sub connect {
136     my ($pkg, $to_host, $to_port, $rproc) = @_;
137
138     # Create a connection end-point object
139     my $conn = $pkg;
140         unless (ref $pkg) {
141                 $conn = $pkg->new($rproc);
142         }
143         $conn->{peerhost} = $to_host;
144         $conn->{peerport} = $to_port;
145         $conn->{sort} = 'Outgoing';
146         
147     # Create a new internet socket
148     my $sock = IO::Socket::INET->new();
149     return undef unless $sock;
150         
151         my $proto = getprotobyname('tcp');
152         $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
153         
154         blocking($sock, 0);
155         my $ip = gethostbyname($to_host);
156 #       my $r = $sock->connect($to_port, $ip);
157         my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
158         return undef unless $r || _err_will_block($!);
159         
160         $conn->{sock} = $sock;
161     
162     if ($conn->{rproc}) {
163         my $callback = sub {$conn->_rcv};
164         set_event_handler ($sock, read => $callback);
165     }
166     return $conn;
167 }
168
169 sub disconnect {
170     my $conn = shift;
171         return if exists $conn->{disconnecting};
172
173         $conn->{disconnecting} = 1;
174     my $sock = delete $conn->{sock};
175         $conn->{state} = 'E';
176         $conn->{timeout}->del if $conn->{timeout};
177
178         # be careful to delete the correct one
179         my $call;
180         if ($call = $conn->{call}) {
181                 my $ref = $conns{$call};
182                 delete $conns{$call} if $ref && $ref == $conn;
183         }
184         $call ||= 'unallocated';
185         dbg('connll', "Connection $call disconnected");
186         
187         unless ($^O =~ /^MS/i) {
188                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
189         }
190
191         # get rid of any references
192         for (keys %$conn) {
193                 if (ref($conn->{$_})) {
194                         delete $conn->{$_};
195                 }
196         }
197
198         return unless defined($sock);
199     set_event_handler ($sock, read => undef, write => undef, error => undef);
200     shutdown($sock, 3);
201         close($sock);
202 }
203
204 sub send_now {
205     my ($conn, $msg) = @_;
206     $conn->enqueue($msg);
207     $conn->_send (1); # 1 ==> flush
208 }
209
210 sub send_later {
211     my ($conn, $msg) = @_;
212     $conn->enqueue($msg);
213     my $sock = $conn->{sock};
214     return unless defined($sock);
215     set_event_handler ($sock, write => sub {$conn->_send(0)});
216 }
217
218 sub enqueue {
219     my $conn = shift;
220     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
221 }
222
223 sub _send {
224     my ($conn, $flush) = @_;
225     my $sock = $conn->{sock};
226     return unless defined($sock);
227     my $rq = $conn->{outqueue};
228
229     # If $flush is set, set the socket to blocking, and send all
230     # messages in the queue - return only if there's an error
231     # If $flush is 0 (deferred mode) make the socket non-blocking, and
232     # return to the event loop only after every message, or if it
233     # is likely to block in the middle of a message.
234
235         blocking($sock, $flush);
236     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
237
238     while (@$rq) {
239         my $msg            = $rq->[0];
240                 my $mlth           = length($msg);
241         my $bytes_to_write = $mlth - $offset;
242         my $bytes_written  = 0;
243                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
244         while ($bytes_to_write > 0) {
245             $bytes_written = syswrite ($sock, $msg,
246                                        $bytes_to_write, $offset);
247             if (!defined($bytes_written)) {
248                 if (_err_will_block($!)) {
249                     # Should happen only in deferred mode. Record how
250                     # much we have already sent.
251                     $conn->{send_offset} = $offset;
252                     # Event handler should already be set, so we will
253                     # be called back eventually, and will resume sending
254                     return 1;
255                 } else {    # Uh, oh
256                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
257                                         $conn->disconnect;
258                     return 0; # fail. Message remains in queue ..
259                 }
260             }
261             $offset         += $bytes_written;
262             $bytes_to_write -= $bytes_written;
263         }
264         delete $conn->{send_offset};
265         $offset = 0;
266         shift @$rq;
267         last unless $flush; # Go back to select and wait
268                             # for it to fire again.
269     }
270     # Call me back if queue has not been drained.
271     if (@$rq) {
272         set_event_handler ($sock, write => sub {$conn->_send(0)});
273     } else {
274         set_event_handler ($sock, write => undef);
275                 if (exists $conn->{close_on_empty}) {
276                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
277                         $conn->disconnect; 
278                 }
279     }
280     1;  # Success
281 }
282
283 sub _err_will_block {
284         return 0 unless $blocking_supported;
285         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
286 }
287
288 sub close_on_empty
289 {
290         my $conn = shift;
291         $conn->{close_on_empty} = 1;
292 }
293
294 #-----------------------------------------------------------------
295 # Receive side routines
296
297 sub new_server {
298     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
299     my ($pkg, $my_host, $my_port, $login_proc) = @_;
300         my $self = $pkg->new($login_proc);
301         
302     $self->{sock} = IO::Socket::INET->new (
303                                           LocalAddr => $my_host,
304                                           LocalPort => $my_port,
305                                           Listen    => SOMAXCONN,
306                                           Proto     => 'tcp',
307                                           Reuse     => 1);
308     die "Could not create socket: $! \n" unless $self->{sock};
309     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
310         return $self;
311 }
312
313 sub dequeue
314 {
315         my $conn = shift;
316
317         if ($conn->{msg} =~ /\n/) {
318                 my @lines = split /\r?\n/, $conn->{msg};
319                 if ($conn->{msg} =~ /\n$/) {
320                         delete $conn->{msg};
321                 } else {
322                         $conn->{msg} = pop @lines;
323                 }
324                 for (@lines) {
325                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
326                 }
327         }
328 }
329
330 sub _rcv {                     # Complement to _send
331     my $conn = shift; # $rcv_now complement of $flush
332     # Find out how much has already been received, if at all
333     my ($msg, $offset, $bytes_to_read, $bytes_read);
334     my $sock = $conn->{sock};
335     return unless defined($sock);
336
337         my @lines;
338         blocking($sock, 0);
339         $bytes_read = sysread ($sock, $msg, 1024, 0);
340         if (defined ($bytes_read)) {
341                 if ($bytes_read > 0) {
342                         $conn->{msg} .= $msg;
343                 } 
344         } else {
345                 if (_err_will_block($!)) {
346                         return ; 
347                 } else {
348                         $bytes_read = 0;
349                 }
350     }
351
352 FINISH:
353     if (defined $bytes_read && $bytes_read == 0) {
354                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
355                 $conn->disconnect;
356     } else {
357                 $conn->dequeue if exists $conn->{msg};
358         }
359 }
360
361 sub new_client {
362         my $server_conn = shift;
363     my $sock = $server_conn->{sock}->accept();
364     my $conn = $server_conn->new($server_conn->{rproc});
365         $conn->{sock} = $sock;
366     my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
367         $conn->{sort} = 'Incoming';
368         if ($eproc) {
369                 $conn->{eproc} = $eproc;
370         set_event_handler ($sock, error => $eproc);
371         }
372     if ($rproc) {
373         $conn->{rproc} = $rproc;
374         my $callback = sub {$conn->_rcv};
375         set_event_handler ($sock, read => $callback);
376     } else {  # Login failed
377                 &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
378         $conn->disconnect();
379     }
380 }
381
382 sub close_server
383 {
384         my $conn = shift;
385         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
386         $conn->{sock}->close;
387 }
388
389 # close all clients (this is for forking really)
390 sub close_all_clients
391 {
392         for (values %conns) {
393                 $_->disconnect;
394         }
395 }
396
397 #----------------------------------------------------
398 # Event loop routines used by both client and server
399
400 sub set_event_handler {
401     shift unless ref($_[0]); # shift if first arg is package name
402     my ($handle, %args) = @_;
403     my $callback;
404     if (exists $args{'write'}) {
405         $callback = $args{'write'};
406         if ($callback) {
407             $wt_callbacks{$handle} = $callback;
408             $wt_handles->add($handle);
409         } else {
410             delete $wt_callbacks{$handle};
411             $wt_handles->remove($handle);
412         }
413     }
414     if (exists $args{'read'}) {
415         $callback = $args{'read'};
416         if ($callback) {
417             $rd_callbacks{$handle} = $callback;
418             $rd_handles->add($handle);
419         } else {
420             delete $rd_callbacks{$handle};
421             $rd_handles->remove($handle);
422        }
423     }
424     if (exists $args{'error'}) {
425         $callback = $args{'error'};
426         if ($callback) {
427             $er_callbacks{$handle} = $callback;
428             $er_handles->add($handle);
429         } else {
430             delete $er_callbacks{$handle};
431             $er_handles->remove($handle);
432        }
433     }
434 }
435
436 sub event_loop {
437     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
438     my ($conn, $r, $w, $e, $rset, $wset, $eset);
439     while (1) {
440  
441        # Quit the loop if no handles left to process
442         last unless ($rd_handles->count() || $wt_handles->count());
443         
444                 ($rset, $wset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
445                 
446         foreach $e (@$eset) {
447             &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
448         }
449         foreach $r (@$rset) {
450             &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
451         }
452         foreach $w (@$wset) {
453             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
454         }
455
456                 Timer::handler;
457                 
458         if (defined($loop_count)) {
459             last unless --$loop_count;
460         }
461     }
462 }
463
464 sub DESTROY
465 {
466         my $conn = shift;
467         my $call = $conn->{call} || 'unallocated';
468         dbg('connll', "Connection $call being destroyed ($noconns)");
469         $noconns--;
470 }
471
472 1;
473
474 __END__
475