fix simulanious connections
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 use Carp;
17
18 use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain %conns);
19
20 %rd_callbacks = ();
21 %wt_callbacks = ();
22 $rd_handles   = IO::Select->new();
23 $wt_handles   = IO::Select->new();
24 $now = time;
25 @timerchain = ();
26
27 my $blocking_supported = 0;
28
29 BEGIN {
30     # Checks if blocking is supported
31     eval {
32         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
33     };
34     $blocking_supported = 1 unless $@;
35 }
36
37 #
38 #-----------------------------------------------------------------
39 # Generalised initializer
40
41 sub new
42 {
43     my ($pkg, $rproc) = @_;
44         my $obj = ref($pkg);
45         my $class = $obj || $pkg;
46
47     my $conn = {
48         rproc => $rproc,
49                 inqueue => [],
50                 outqueue => [],
51                 state => 0,
52                 lineend => "\r\n",
53                 csort => 'telnet',
54                 timeval => 60,
55     };
56
57         return bless $conn, $class;
58 }
59
60 # save it
61 sub conns
62 {
63         my $pkg = shift;
64         my $call = shift;
65         my $ref;
66         
67         if (ref $pkg) {
68                 $call = $pkg->{call} unless $call;
69                 return undef unless $call;
70                 confess "changing $pkg->{call} to $call" if exists $pkg->{call} && $call ne $pkg->{call};
71                 $pkg->{call} = $call;
72                 $ref = $conns{$call} = $pkg;
73         } else {
74                 $ref = $conns{$call};
75         }
76         return $ref;
77 }
78
79 # this is only called by any dependent processes going away unexpectedly
80 sub pid_gone
81 {
82         my ($pkg, $pid) = @_;
83         
84         my @pid = grep {$_->{pid} == $pid} values %conns;
85         for (@pid) {
86                 if ($_->{rproc}) {
87                         &{$_->{rproc}}($_, undef, "$pid has gorn");
88                 } else {
89                         $_->disconnect;
90                 }
91         }
92 }
93
94 #-----------------------------------------------------------------
95 # Send side routines
96 sub connect {
97     my ($pkg, $to_host, $to_port, $rproc) = @_;
98
99     # Create a connection end-point object
100     my $conn = $pkg;
101         unless (ref $pkg) {
102                 $conn = $pkg->new($rproc);
103         }
104         
105     # Create a new internet socket
106     my $sock = IO::Socket::INET->new (
107                                       PeerAddr => $to_host,
108                                       PeerPort => $to_port,
109                                       Proto    => 'tcp',
110                                       Reuse    => 1,
111                                                                           Timeout  => $conn->{timeval} / 2);
112
113     return undef unless $sock;
114
115         $conn->{sock} = $sock;
116     
117     if ($conn->{rproc}) {
118         my $callback = sub {_rcv($conn)};
119         set_event_handler ($sock, "read" => $callback);
120     }
121     return $conn;
122 }
123
124 sub disconnect {
125     my $conn = shift;
126     my $sock = delete $conn->{sock};
127         $conn->{state} = 'E';
128         delete $conn->{cmd};
129         $conn->{timeout}->del_timer if $conn->{timeout};
130
131         # be careful to delete the correct one
132         if (my $call = $conn->{call}) {
133                 my $ref = $conns{$call};
134                 delete $conns{$call} if $ref && $ref == $conn;
135         }
136         
137     set_event_handler ($sock, "read" => undef, "write" => undef);
138         unless ($^O =~ /^MS/i) {
139                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
140         }
141         return unless defined($sock);
142     shutdown($sock, 3);
143         close($sock);
144 }
145
146 sub send_now {
147     my ($conn, $msg) = @_;
148     $conn->enqueue($msg);
149     $conn->_send (1); # 1 ==> flush
150 }
151
152 sub send_later {
153     my ($conn, $msg) = @_;
154     $conn->enqueue($msg);
155     my $sock = $conn->{sock};
156     return unless defined($sock);
157     set_event_handler ($sock, "write" => sub {$conn->_send(0)});
158 }
159
160 sub enqueue {
161     my $conn = shift;
162     push (@{$conn->{outqueue}}, $_[0]);
163 }
164
165 sub _send {
166     my ($conn, $flush) = @_;
167     my $sock = $conn->{sock};
168     return unless defined($sock);
169     my $rq = $conn->{outqueue};
170
171     # If $flush is set, set the socket to blocking, and send all
172     # messages in the queue - return only if there's an error
173     # If $flush is 0 (deferred mode) make the socket non-blocking, and
174     # return to the event loop only after every message, or if it
175     # is likely to block in the middle of a message.
176
177     $flush ? $conn->set_blocking() : $conn->set_non_blocking();
178     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
179
180     while (@$rq) {
181         my $msg            = $rq->[0];
182                 my $mlth           = length($msg);
183         my $bytes_to_write = $mlth - $offset;
184         my $bytes_written  = 0;
185                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
186         while ($bytes_to_write > 0) {
187             $bytes_written = syswrite ($sock, $msg,
188                                        $bytes_to_write, $offset);
189             if (!defined($bytes_written)) {
190                 if (_err_will_block($!)) {
191                     # Should happen only in deferred mode. Record how
192                     # much we have already sent.
193                     $conn->{send_offset} = $offset;
194                     # Event handler should already be set, so we will
195                     # be called back eventually, and will resume sending
196                     return 1;
197                 } else {    # Uh, oh
198                                         delete $conn->{send_offset};
199                     $conn->handle_send_err($!);
200                                         $conn->disconnect;
201                     return 0; # fail. Message remains in queue ..
202                 }
203             }
204             $offset         += $bytes_written;
205             $bytes_to_write -= $bytes_written;
206         }
207         delete $conn->{send_offset};
208         $offset = 0;
209         shift @$rq;
210         last unless $flush; # Go back to select and wait
211                             # for it to fire again.
212     }
213     # Call me back if queue has not been drained.
214     if (@$rq) {
215         set_event_handler ($sock, "write" => sub {$conn->_send(0)});
216     } else {
217         set_event_handler ($sock, "write" => undef);
218     }
219     1;  # Success
220 }
221
222 sub _err_will_block {
223     if ($blocking_supported) {
224         return ($_[0] == EAGAIN());
225     }
226     return 0;
227 }
228 sub set_non_blocking {                        # $conn->set_blocking
229     if ($blocking_supported) {
230         # preserve other fcntl flags
231         my $flags = fcntl ($_[0], F_GETFL(), 0);
232         fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK());
233     }
234 }
235 sub set_blocking {
236     if ($blocking_supported) {
237         my $flags = fcntl ($_[0], F_GETFL(), 0);
238         $flags  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
239         fcntl ($_[0], F_SETFL(), $flags);
240     }
241 }
242
243 sub handle_send_err {
244    # For more meaningful handling of send errors, subclass Msg and
245    # rebless $conn.  
246    my ($conn, $err_msg) = @_;
247    warn "Error while sending: $err_msg \n";
248    set_event_handler ($conn->{sock}, "write" => undef);
249 }
250
251 #-----------------------------------------------------------------
252 # Receive side routines
253
254 sub new_server {
255     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
256     my ($pkg, $my_host, $my_port, $login_proc) = @_;
257         my $self = $pkg->new($login_proc);
258         
259     $self->{sock} = IO::Socket::INET->new (
260                                           LocalAddr => $my_host,
261                                           LocalPort => $my_port,
262                                           Listen    => 5,
263                                           Proto     => 'tcp',
264                                           Reuse     => 1);
265     die "Could not create socket: $! \n" unless $self->{sock};
266     set_event_handler ($self->{sock}, "read" => sub { $self->new_client }  );
267         return $self;
268 }
269
270 sub dequeue
271 {
272         my $conn = shift;
273         my $msg;
274         
275         while ($msg = shift @{$conn->{inqueue}}){
276                 &{$conn->{rproc}}($conn, $msg, $!);
277                 $! = 0;
278         }
279 }
280
281 sub _rcv {                     # Complement to _send
282     my $conn = shift; # $rcv_now complement of $flush
283     # Find out how much has already been received, if at all
284     my ($msg, $offset, $bytes_to_read, $bytes_read);
285     my $sock = $conn->{sock};
286     return unless defined($sock);
287
288         my @lines;
289     $conn->set_non_blocking();
290         $bytes_read = sysread ($sock, $msg, 1024, 0);
291         if (defined ($bytes_read)) {
292                 if ($bytes_read > 0) {
293                         if ($msg =~ /\n/) {
294                                 @lines = split /\r?\n/, $msg;
295                                 if (@lines) {
296                                         $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg};
297                                 } else {
298                                         $lines[0] = $conn->{msg} if exists $conn->{msg};
299                                         push @lines, '' unless @lines;
300                                 }
301                                 if ($msg =~ /\n$/) {
302                                         delete $conn->{msg};
303                                 } else {
304                                         $conn->{msg} = pop @lines;
305                                 }
306                                 push @{$conn->{inqueue}}, @lines if @lines;
307                         } else {
308                                 $conn->{msg} .= $msg;
309                         }
310                 } 
311         } else {
312                 if (_err_will_block($!)) {
313                         return ; 
314                 } else {
315                         $bytes_read = 0;
316                 }
317     }
318
319 FINISH:
320     if (defined $bytes_read && $bytes_read == 0) {
321 #               $conn->disconnect();
322                 &{$conn->{rproc}}($conn, undef, $!);
323                 delete $conn->{inqueue};
324     } else {
325                 $conn->dequeue;
326         }
327 }
328
329 sub new_client {
330         my $server_conn = shift;
331     my $sock = $server_conn->{sock}->accept();
332     my $conn = $server_conn->new($server_conn->{rproc});
333         $conn->{sock} = $sock;
334     my $rproc = &{$server_conn->{rproc}} ($conn, $sock->peerhost(), $sock->peerport());
335     if ($rproc) {
336         $conn->{rproc} = $rproc;
337         my $callback = sub {_rcv($conn)};
338         set_event_handler ($sock, "read" => $callback);
339     } else {  # Login failed
340         $conn->disconnect();
341     }
342 }
343
344 sub close_server
345 {
346         my $conn = shift;
347         set_event_handler ($conn->{sock}, "read" => undef);
348         $conn->{sock}->close;
349 }
350
351 #----------------------------------------------------
352 # Event loop routines used by both client and server
353
354 sub set_event_handler {
355     shift unless ref($_[0]); # shift if first arg is package name
356     my ($handle, %args) = @_;
357     my $callback;
358     if (exists $args{'write'}) {
359         $callback = $args{'write'};
360         if ($callback) {
361             $wt_callbacks{$handle} = $callback;
362             $wt_handles->add($handle);
363         } else {
364             delete $wt_callbacks{$handle};
365             $wt_handles->remove($handle);
366         }
367     }
368     if (exists $args{'read'}) {
369         $callback = $args{'read'};
370         if ($callback) {
371             $rd_callbacks{$handle} = $callback;
372             $rd_handles->add($handle);
373         } else {
374             delete $rd_callbacks{$handle};
375             $rd_handles->remove($handle);
376        }
377     }
378 }
379
380 sub new_timer
381 {
382     my ($pkg, $time, $proc, $recur) = @_;
383         my $obj = ref($pkg);
384         my $class = $obj || $pkg;
385         my $self = bless { t=>$time + time, proc=>$proc }, $class;
386         $self->{interval} = $time if $recur;
387         push @timerchain, $self;
388         return $self;
389 }
390
391 sub del_timer
392 {
393         my $self = shift;
394         @timerchain = grep {$_ != $self} @timerchain;
395 }
396
397 sub event_loop {
398     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
399     my ($conn, $r, $w, $rset, $wset);
400     while (1) {
401  
402        # Quit the loop if no handles left to process
403         last unless ($rd_handles->count() || $wt_handles->count());
404         
405                 ($rset, $wset) =
406             IO::Select->select ($rd_handles, $wt_handles, undef, $timeout);
407                 $now = time;
408                 
409         foreach $r (@$rset) {
410             &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r};
411         }
412         foreach $w (@$wset) {
413             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
414         }
415
416                 # handle things on the timer chain
417                 for (@timerchain) {
418                         if ($now >= $_->{t}) {
419                                 &{$_->{proc}}();
420                                 $_->{t} = $now + $_->{interval} if exists $_->{interval};
421                         }
422                 }
423
424                 # remove dead timers
425                 @timerchain = grep { $_->{t} > $now } @timerchain;
426                 
427         if (defined($loop_count)) {
428             last unless --$loop_count;
429         }
430     }
431 }
432
433 1;
434
435 __END__
436