The guts of the new Msg system
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 #use DXDebug;
17
18 use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain);
19
20 %rd_callbacks = ();
21 %wt_callbacks = ();
22 $rd_handles   = IO::Select->new();
23 $wt_handles   = IO::Select->new();
24 $now = time;
25 @timerchain = ();
26
27 my $blocking_supported = 0;
28
29 BEGIN {
30     # Checks if blocking is supported
31     eval {
32         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
33     };
34     $blocking_supported = 1 unless $@;
35 }
36
37 #
38 #-----------------------------------------------------------------
39 # Generalised initializer
40
41 sub new
42 {
43     my ($pkg, $rproc) = @_;
44         my $obj = ref($pkg);
45         my $class = $obj || $pkg;
46
47     my $conn = {
48         rproc => $rproc,
49                 inqueue => [],
50                 state => 0,
51                 lineend => "\r\n",
52                 csort => 'telnet',
53     };
54
55         return bless $conn, $class;
56 }
57
58 #-----------------------------------------------------------------
59 # Send side routines
60 sub connect {
61     my ($pkg, $to_host, $to_port, $rproc) = @_;
62
63     # Create a new internet socket
64     my $sock = IO::Socket::INET->new (
65                                       PeerAddr => $to_host,
66                                       PeerPort => $to_port,
67                                       Proto    => 'tcp',
68                                       Reuse    => 1);
69
70     return undef unless $sock;
71
72     # Create a connection end-point object
73     my $conn = $pkg;
74         unless (ref $pkg) {
75                 $conn = $pkg->new($rproc);
76         }
77         $conn->{sock} = $sock;
78     
79     if ($conn->{rproc}) {
80         my $callback = sub {_rcv($conn)};
81         set_event_handler ($sock, "read" => $callback);
82     }
83     return $conn;
84 }
85
86 sub disconnect {
87     my $conn = shift;
88     my $sock = delete $conn->{sock};
89         $conn->{state} = 'E';
90         delete $conn->{cmd};
91         $conn->{timeout}->del_timer if $conn->{timeout};
92         return unless defined($sock);
93     set_event_handler ($sock, "read" => undef, "write" => undef);
94     shutdown($sock, 3);
95         close($sock);
96 }
97
98 sub send_now {
99     my ($conn, $msg) = @_;
100     $conn->enqueue($msg);
101     $conn->_send (1); # 1 ==> flush
102 }
103
104 sub send_later {
105     my ($conn, $msg) = @_;
106     $conn->enqueue($msg);
107     my $sock = $conn->{sock};
108     return unless defined($sock);
109     set_event_handler ($sock, "write" => sub {$conn->_send(0)});
110 }
111
112 sub enqueue {
113     my $conn = shift;
114     push (@{$conn->{outqueue}}, $_[0]);
115 }
116
117 sub _send {
118     my ($conn, $flush) = @_;
119     my $sock = $conn->{sock};
120     return unless defined($sock);
121     my ($rq) = $conn->{outqueue};
122
123     # If $flush is set, set the socket to blocking, and send all
124     # messages in the queue - return only if there's an error
125     # If $flush is 0 (deferred mode) make the socket non-blocking, and
126     # return to the event loop only after every message, or if it
127     # is likely to block in the middle of a message.
128
129     $flush ? $conn->set_blocking() : $conn->set_non_blocking();
130     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
131
132     while (@$rq) {
133         my $msg            = $rq->[0];
134                 my $mlth           = length($msg);
135         my $bytes_to_write = $mlth - $offset;
136         my $bytes_written  = 0;
137                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
138         while ($bytes_to_write > 0) {
139             $bytes_written = syswrite ($sock, $msg,
140                                        $bytes_to_write, $offset);
141             if (!defined($bytes_written)) {
142                 if (_err_will_block($!)) {
143                     # Should happen only in deferred mode. Record how
144                     # much we have already sent.
145                     $conn->{send_offset} = $offset;
146                     # Event handler should already be set, so we will
147                     # be called back eventually, and will resume sending
148                     return 1;
149                 } else {    # Uh, oh
150                                         delete $conn->{send_offset};
151                     $conn->handle_send_err($!);
152                                         $conn->disconnect;
153                     return 0; # fail. Message remains in queue ..
154                 }
155             }
156             $offset         += $bytes_written;
157             $bytes_to_write -= $bytes_written;
158         }
159         delete $conn->{send_offset};
160         $offset = 0;
161         shift @$rq;
162         last unless $flush; # Go back to select and wait
163                             # for it to fire again.
164     }
165     # Call me back if queue has not been drained.
166     if (@$rq) {
167         set_event_handler ($sock, "write" => sub {$conn->_send(0)});
168     } else {
169         set_event_handler ($sock, "write" => undef);
170     }
171     1;  # Success
172 }
173
174 sub _err_will_block {
175     if ($blocking_supported) {
176         return ($_[0] == EAGAIN());
177     }
178     return 0;
179 }
180 sub set_non_blocking {                        # $conn->set_blocking
181     if ($blocking_supported) {
182         # preserve other fcntl flags
183         my $flags = fcntl ($_[0], F_GETFL(), 0);
184         fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK());
185     }
186 }
187 sub set_blocking {
188     if ($blocking_supported) {
189         my $flags = fcntl ($_[0], F_GETFL(), 0);
190         $flags  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
191         fcntl ($_[0], F_SETFL(), $flags);
192     }
193 }
194
195 sub handle_send_err {
196    # For more meaningful handling of send errors, subclass Msg and
197    # rebless $conn.  
198    my ($conn, $err_msg) = @_;
199    warn "Error while sending: $err_msg \n";
200    set_event_handler ($conn->{sock}, "write" => undef);
201 }
202
203 #-----------------------------------------------------------------
204 # Receive side routines
205
206 sub new_server {
207     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
208     my ($pkg, $my_host, $my_port, $login_proc) = @_;
209         my $self = $pkg->new($login_proc);
210         
211     $self->{sock} = IO::Socket::INET->new (
212                                           LocalAddr => $my_host,
213                                           LocalPort => $my_port,
214                                           Listen    => 5,
215                                           Proto     => 'tcp',
216                                           Reuse     => 1);
217     die "Could not create socket: $! \n" unless $self->{sock};
218     set_event_handler ($self->{sock}, "read" => sub { $self->new_client }  );
219         return $self;
220 }
221
222 sub dequeue
223 {
224         my $conn = shift;
225         my $msg;
226         
227         while ($msg = shift @{$conn->{inqueue}}){
228                 &{$conn->{rproc}}($conn, $msg, $!);
229                 $! = 0;
230         }
231 }
232
233 sub _rcv {                     # Complement to _send
234     my $conn = shift; # $rcv_now complement of $flush
235     # Find out how much has already been received, if at all
236     my ($msg, $offset, $bytes_to_read, $bytes_read);
237     my $sock = $conn->{sock};
238     return unless defined($sock);
239
240         my @lines;
241     $conn->set_non_blocking();
242         $bytes_read = sysread ($sock, $msg, 1024, 0);
243         if (defined ($bytes_read)) {
244                 if ($bytes_read > 0) {
245                         if ($msg =~ /\n/) {
246                                 @lines = split /\r?\n/, $msg;
247                                 push @lines, ' ' unless @lines;
248                                 
249                                 $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg};
250                                 if ($msg =~ /\n$/) {
251                                         delete $conn->{msg};
252                                 } else {
253                                         $conn->{msg} = pop @lines;
254                                 }
255                                 push @{$conn->{inqueue}}, @lines if @lines;
256                         } else {
257                                 $conn->{msg} .= $msg;
258                         }
259                 } 
260         } else {
261                 if (_err_will_block($!)) {
262                         return ; 
263                 } else {
264                         $bytes_read = 0;
265                 }
266     }
267
268 FINISH:
269     if (defined $bytes_read && $bytes_read == 0) {
270 #               $conn->disconnect();
271                 &{$conn->{rproc}}($conn, undef, $!);
272                 delete $conn->{inqueue};
273     } else {
274                 $conn->dequeue;
275         }
276 }
277
278 sub new_client {
279         my $server_conn = shift;
280     my $sock = $server_conn->{sock}->accept();
281     my $conn = $server_conn->new($server_conn->{rproc});
282         $conn->{sock} = $sock;
283     my $rproc = &{$server_conn->{rproc}} ($conn, $sock->peerhost(), $sock->peerport());
284     if ($rproc) {
285         $conn->{rproc} = $rproc;
286         my $callback = sub {_rcv($conn)};
287         set_event_handler ($sock, "read" => $callback);
288     } else {  # Login failed
289         $conn->disconnect();
290     }
291 }
292
293 sub close_server
294 {
295         my $conn = shift;
296         set_event_handler ($conn->{sock}, "read" => undef);
297         $conn->{sock}->close;
298 }
299
300 #----------------------------------------------------
301 # Event loop routines used by both client and server
302
303 sub set_event_handler {
304     shift unless ref($_[0]); # shift if first arg is package name
305     my ($handle, %args) = @_;
306     my $callback;
307     if (exists $args{'write'}) {
308         $callback = $args{'write'};
309         if ($callback) {
310             $wt_callbacks{$handle} = $callback;
311             $wt_handles->add($handle);
312         } else {
313             delete $wt_callbacks{$handle};
314             $wt_handles->remove($handle);
315         }
316     }
317     if (exists $args{'read'}) {
318         $callback = $args{'read'};
319         if ($callback) {
320             $rd_callbacks{$handle} = $callback;
321             $rd_handles->add($handle);
322         } else {
323             delete $rd_callbacks{$handle};
324             $rd_handles->remove($handle);
325        }
326     }
327 }
328
329 sub new_timer
330 {
331     my ($pkg, $time, $proc, $recur) = @_;
332         my $obj = ref($pkg);
333         my $class = $obj || $pkg;
334         my $self = bless { t=>$time + time, proc=>$proc }, $class;
335         $self->{interval} = $time if $recur;
336         push @timerchain, $self;
337         return $self;
338 }
339
340 sub del_timer
341 {
342         my $self = shift;
343         @timerchain = grep {$_ != $self} @timerchain;
344 }
345
346 sub event_loop {
347     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
348     my ($conn, $r, $w, $rset, $wset);
349     while (1) {
350  
351        # Quit the loop if no handles left to process
352         last unless ($rd_handles->count() || $wt_handles->count());
353         
354                 ($rset, $wset) =
355             IO::Select->select ($rd_handles, $wt_handles, undef, $timeout);
356                 $now = time;
357                 
358         foreach $r (@$rset) {
359             &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r};
360         }
361         foreach $w (@$wset) {
362             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
363         }
364
365                 # handle things on the timer chain
366                 for (@timerchain) {
367                         if ($now >= $_->{t}) {
368                                 &{$_->{proc}}();
369                                 $_->{t} = $now + $_->{interval} if exists $_->{interval};
370                         }
371                 }
372
373                 # remove dead timers
374                 @timerchain = grep { $_->{t} > $now } @timerchain;
375                 
376         if (defined($loop_count)) {
377             last unless --$loop_count;
378         }
379     }
380 }
381
382 1;
383
384 __END__
385