add a timeout to the outgoing connect
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 #use DXDebug;
17
18 use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain);
19
20 %rd_callbacks = ();
21 %wt_callbacks = ();
22 $rd_handles   = IO::Select->new();
23 $wt_handles   = IO::Select->new();
24 $now = time;
25 @timerchain = ();
26
27 my $blocking_supported = 0;
28
29 BEGIN {
30     # Checks if blocking is supported
31     eval {
32         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
33     };
34     $blocking_supported = 1 unless $@;
35 }
36
37 #
38 #-----------------------------------------------------------------
39 # Generalised initializer
40
41 sub new
42 {
43     my ($pkg, $rproc) = @_;
44         my $obj = ref($pkg);
45         my $class = $obj || $pkg;
46
47     my $conn = {
48         rproc => $rproc,
49                 inqueue => [],
50                 outqueue => [],
51                 state => 0,
52                 lineend => "\r\n",
53                 csort => 'telnet',
54                 timeval => 60,
55     };
56
57         return bless $conn, $class;
58 }
59
60 #-----------------------------------------------------------------
61 # Send side routines
62 sub connect {
63     my ($pkg, $to_host, $to_port, $rproc) = @_;
64
65     # Create a connection end-point object
66     my $conn = $pkg;
67         unless (ref $pkg) {
68                 $conn = $pkg->new($rproc);
69         }
70         
71     # Create a new internet socket
72     my $sock = IO::Socket::INET->new (
73                                       PeerAddr => $to_host,
74                                       PeerPort => $to_port,
75                                       Proto    => 'tcp',
76                                       Reuse    => 1,
77                                                                           Timeout  => $conn->{timeval} / 2);
78
79     return undef unless $sock;
80
81         $conn->{sock} = $sock;
82     
83     if ($conn->{rproc}) {
84         my $callback = sub {_rcv($conn)};
85         set_event_handler ($sock, "read" => $callback);
86     }
87     return $conn;
88 }
89
90 sub disconnect {
91     my $conn = shift;
92     my $sock = delete $conn->{sock};
93         $conn->{state} = 'E';
94         delete $conn->{cmd};
95         $conn->{timeout}->del_timer if $conn->{timeout};
96         return unless defined($sock);
97     set_event_handler ($sock, "read" => undef, "write" => undef);
98     shutdown($sock, 3);
99         close($sock);
100 }
101
102 sub send_now {
103     my ($conn, $msg) = @_;
104     $conn->enqueue($msg);
105     $conn->_send (1); # 1 ==> flush
106 }
107
108 sub send_later {
109     my ($conn, $msg) = @_;
110     $conn->enqueue($msg);
111     my $sock = $conn->{sock};
112     return unless defined($sock);
113     set_event_handler ($sock, "write" => sub {$conn->_send(0)});
114 }
115
116 sub enqueue {
117     my $conn = shift;
118     push (@{$conn->{outqueue}}, $_[0]);
119 }
120
121 sub _send {
122     my ($conn, $flush) = @_;
123     my $sock = $conn->{sock};
124     return unless defined($sock);
125     my $rq = $conn->{outqueue};
126
127     # If $flush is set, set the socket to blocking, and send all
128     # messages in the queue - return only if there's an error
129     # If $flush is 0 (deferred mode) make the socket non-blocking, and
130     # return to the event loop only after every message, or if it
131     # is likely to block in the middle of a message.
132
133     $flush ? $conn->set_blocking() : $conn->set_non_blocking();
134     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
135
136     while (@$rq) {
137         my $msg            = $rq->[0];
138                 my $mlth           = length($msg);
139         my $bytes_to_write = $mlth - $offset;
140         my $bytes_written  = 0;
141                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
142         while ($bytes_to_write > 0) {
143             $bytes_written = syswrite ($sock, $msg,
144                                        $bytes_to_write, $offset);
145             if (!defined($bytes_written)) {
146                 if (_err_will_block($!)) {
147                     # Should happen only in deferred mode. Record how
148                     # much we have already sent.
149                     $conn->{send_offset} = $offset;
150                     # Event handler should already be set, so we will
151                     # be called back eventually, and will resume sending
152                     return 1;
153                 } else {    # Uh, oh
154                                         delete $conn->{send_offset};
155                     $conn->handle_send_err($!);
156                                         $conn->disconnect;
157                     return 0; # fail. Message remains in queue ..
158                 }
159             }
160             $offset         += $bytes_written;
161             $bytes_to_write -= $bytes_written;
162         }
163         delete $conn->{send_offset};
164         $offset = 0;
165         shift @$rq;
166         last unless $flush; # Go back to select and wait
167                             # for it to fire again.
168     }
169     # Call me back if queue has not been drained.
170     if (@$rq) {
171         set_event_handler ($sock, "write" => sub {$conn->_send(0)});
172     } else {
173         set_event_handler ($sock, "write" => undef);
174     }
175     1;  # Success
176 }
177
178 sub _err_will_block {
179     if ($blocking_supported) {
180         return ($_[0] == EAGAIN());
181     }
182     return 0;
183 }
184 sub set_non_blocking {                        # $conn->set_blocking
185     if ($blocking_supported) {
186         # preserve other fcntl flags
187         my $flags = fcntl ($_[0], F_GETFL(), 0);
188         fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK());
189     }
190 }
191 sub set_blocking {
192     if ($blocking_supported) {
193         my $flags = fcntl ($_[0], F_GETFL(), 0);
194         $flags  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
195         fcntl ($_[0], F_SETFL(), $flags);
196     }
197 }
198
199 sub handle_send_err {
200    # For more meaningful handling of send errors, subclass Msg and
201    # rebless $conn.  
202    my ($conn, $err_msg) = @_;
203    warn "Error while sending: $err_msg \n";
204    set_event_handler ($conn->{sock}, "write" => undef);
205 }
206
207 #-----------------------------------------------------------------
208 # Receive side routines
209
210 sub new_server {
211     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
212     my ($pkg, $my_host, $my_port, $login_proc) = @_;
213         my $self = $pkg->new($login_proc);
214         
215     $self->{sock} = IO::Socket::INET->new (
216                                           LocalAddr => $my_host,
217                                           LocalPort => $my_port,
218                                           Listen    => 5,
219                                           Proto     => 'tcp',
220                                           Reuse     => 1);
221     die "Could not create socket: $! \n" unless $self->{sock};
222     set_event_handler ($self->{sock}, "read" => sub { $self->new_client }  );
223         return $self;
224 }
225
226 sub dequeue
227 {
228         my $conn = shift;
229         my $msg;
230         
231         while ($msg = shift @{$conn->{inqueue}}){
232                 &{$conn->{rproc}}($conn, $msg, $!);
233                 $! = 0;
234         }
235 }
236
237 sub _rcv {                     # Complement to _send
238     my $conn = shift; # $rcv_now complement of $flush
239     # Find out how much has already been received, if at all
240     my ($msg, $offset, $bytes_to_read, $bytes_read);
241     my $sock = $conn->{sock};
242     return unless defined($sock);
243
244         my @lines;
245     $conn->set_non_blocking();
246         $bytes_read = sysread ($sock, $msg, 1024, 0);
247         if (defined ($bytes_read)) {
248                 if ($bytes_read > 0) {
249                         if ($msg =~ /\n/) {
250                                 @lines = split /\r?\n/, $msg;
251                                 if (@lines) {
252                                         $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg};
253                                 } else {
254                                         $lines[0] = $conn->{msg} if exists $conn->{msg};
255                                         push @lines, '' unless @lines;
256                                 }
257                                 if ($msg =~ /\n$/) {
258                                         delete $conn->{msg};
259                                 } else {
260                                         $conn->{msg} = pop @lines;
261                                 }
262                                 push @{$conn->{inqueue}}, @lines if @lines;
263                         } else {
264                                 $conn->{msg} .= $msg;
265                         }
266                 } 
267         } else {
268                 if (_err_will_block($!)) {
269                         return ; 
270                 } else {
271                         $bytes_read = 0;
272                 }
273     }
274
275 FINISH:
276     if (defined $bytes_read && $bytes_read == 0) {
277 #               $conn->disconnect();
278                 &{$conn->{rproc}}($conn, undef, $!);
279                 delete $conn->{inqueue};
280     } else {
281                 $conn->dequeue;
282         }
283 }
284
285 sub new_client {
286         my $server_conn = shift;
287     my $sock = $server_conn->{sock}->accept();
288     my $conn = $server_conn->new($server_conn->{rproc});
289         $conn->{sock} = $sock;
290     my $rproc = &{$server_conn->{rproc}} ($conn, $sock->peerhost(), $sock->peerport());
291     if ($rproc) {
292         $conn->{rproc} = $rproc;
293         my $callback = sub {_rcv($conn)};
294         set_event_handler ($sock, "read" => $callback);
295     } else {  # Login failed
296         $conn->disconnect();
297     }
298 }
299
300 sub close_server
301 {
302         my $conn = shift;
303         set_event_handler ($conn->{sock}, "read" => undef);
304         $conn->{sock}->close;
305 }
306
307 #----------------------------------------------------
308 # Event loop routines used by both client and server
309
310 sub set_event_handler {
311     shift unless ref($_[0]); # shift if first arg is package name
312     my ($handle, %args) = @_;
313     my $callback;
314     if (exists $args{'write'}) {
315         $callback = $args{'write'};
316         if ($callback) {
317             $wt_callbacks{$handle} = $callback;
318             $wt_handles->add($handle);
319         } else {
320             delete $wt_callbacks{$handle};
321             $wt_handles->remove($handle);
322         }
323     }
324     if (exists $args{'read'}) {
325         $callback = $args{'read'};
326         if ($callback) {
327             $rd_callbacks{$handle} = $callback;
328             $rd_handles->add($handle);
329         } else {
330             delete $rd_callbacks{$handle};
331             $rd_handles->remove($handle);
332        }
333     }
334 }
335
336 sub new_timer
337 {
338     my ($pkg, $time, $proc, $recur) = @_;
339         my $obj = ref($pkg);
340         my $class = $obj || $pkg;
341         my $self = bless { t=>$time + time, proc=>$proc }, $class;
342         $self->{interval} = $time if $recur;
343         push @timerchain, $self;
344         return $self;
345 }
346
347 sub del_timer
348 {
349         my $self = shift;
350         @timerchain = grep {$_ != $self} @timerchain;
351 }
352
353 sub event_loop {
354     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
355     my ($conn, $r, $w, $rset, $wset);
356     while (1) {
357  
358        # Quit the loop if no handles left to process
359         last unless ($rd_handles->count() || $wt_handles->count());
360         
361                 ($rset, $wset) =
362             IO::Select->select ($rd_handles, $wt_handles, undef, $timeout);
363                 $now = time;
364                 
365         foreach $r (@$rset) {
366             &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r};
367         }
368         foreach $w (@$wset) {
369             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
370         }
371
372                 # handle things on the timer chain
373                 for (@timerchain) {
374                         if ($now >= $_->{t}) {
375                                 &{$_->{proc}}();
376                                 $_->{t} = $now + $_->{interval} if exists $_->{interval};
377                         }
378                 }
379
380                 # remove dead timers
381                 @timerchain = grep { $_->{t} > $now } @timerchain;
382                 
383         if (defined($loop_count)) {
384             last unless --$loop_count;
385         }
386     }
387 }
388
389 1;
390
391 __END__
392