]> dxcluster.net Git - spider.git/blob - perl/Msg.pm
some minor cleanups
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 #use DXDebug;
17
18 use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain);
19
20 %rd_callbacks = ();
21 %wt_callbacks = ();
22 $rd_handles   = IO::Select->new();
23 $wt_handles   = IO::Select->new();
24 $now = time;
25 @timerchain = ();
26
27 my $blocking_supported = 0;
28
29 BEGIN {
30     # Checks if blocking is supported
31     eval {
32         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
33     };
34     $blocking_supported = 1 unless $@;
35 }
36
37 #
38 #-----------------------------------------------------------------
39 # Generalised initializer
40
41 sub new
42 {
43     my ($pkg, $rproc) = @_;
44         my $obj = ref($pkg);
45         my $class = $obj || $pkg;
46
47     my $conn = {
48         rproc => $rproc,
49                 inqueue => [],
50                 outqueue => [],
51                 state => 0,
52                 lineend => "\r\n",
53                 csort => 'telnet',
54     };
55
56         return bless $conn, $class;
57 }
58
59 #-----------------------------------------------------------------
60 # Send side routines
61 sub connect {
62     my ($pkg, $to_host, $to_port, $rproc) = @_;
63
64     # Create a new internet socket
65     my $sock = IO::Socket::INET->new (
66                                       PeerAddr => $to_host,
67                                       PeerPort => $to_port,
68                                       Proto    => 'tcp',
69                                       Reuse    => 1);
70
71     return undef unless $sock;
72
73     # Create a connection end-point object
74     my $conn = $pkg;
75         unless (ref $pkg) {
76                 $conn = $pkg->new($rproc);
77         }
78         $conn->{sock} = $sock;
79     
80     if ($conn->{rproc}) {
81         my $callback = sub {_rcv($conn)};
82         set_event_handler ($sock, "read" => $callback);
83     }
84     return $conn;
85 }
86
87 sub disconnect {
88     my $conn = shift;
89     my $sock = delete $conn->{sock};
90         $conn->{state} = 'E';
91         delete $conn->{cmd};
92         $conn->{timeout}->del_timer if $conn->{timeout};
93         return unless defined($sock);
94     set_event_handler ($sock, "read" => undef, "write" => undef);
95     shutdown($sock, 3);
96         close($sock);
97 }
98
99 sub send_now {
100     my ($conn, $msg) = @_;
101     $conn->enqueue($msg);
102     $conn->_send (1); # 1 ==> flush
103 }
104
105 sub send_later {
106     my ($conn, $msg) = @_;
107     $conn->enqueue($msg);
108     my $sock = $conn->{sock};
109     return unless defined($sock);
110     set_event_handler ($sock, "write" => sub {$conn->_send(0)});
111 }
112
113 sub enqueue {
114     my $conn = shift;
115     push (@{$conn->{outqueue}}, $_[0]);
116 }
117
118 sub _send {
119     my ($conn, $flush) = @_;
120     my $sock = $conn->{sock};
121     return unless defined($sock);
122     my $rq = $conn->{outqueue};
123
124     # If $flush is set, set the socket to blocking, and send all
125     # messages in the queue - return only if there's an error
126     # If $flush is 0 (deferred mode) make the socket non-blocking, and
127     # return to the event loop only after every message, or if it
128     # is likely to block in the middle of a message.
129
130     $flush ? $conn->set_blocking() : $conn->set_non_blocking();
131     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
132
133     while (@$rq) {
134         my $msg            = $rq->[0];
135                 my $mlth           = length($msg);
136         my $bytes_to_write = $mlth - $offset;
137         my $bytes_written  = 0;
138                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
139         while ($bytes_to_write > 0) {
140             $bytes_written = syswrite ($sock, $msg,
141                                        $bytes_to_write, $offset);
142             if (!defined($bytes_written)) {
143                 if (_err_will_block($!)) {
144                     # Should happen only in deferred mode. Record how
145                     # much we have already sent.
146                     $conn->{send_offset} = $offset;
147                     # Event handler should already be set, so we will
148                     # be called back eventually, and will resume sending
149                     return 1;
150                 } else {    # Uh, oh
151                                         delete $conn->{send_offset};
152                     $conn->handle_send_err($!);
153                                         $conn->disconnect;
154                     return 0; # fail. Message remains in queue ..
155                 }
156             }
157             $offset         += $bytes_written;
158             $bytes_to_write -= $bytes_written;
159         }
160         delete $conn->{send_offset};
161         $offset = 0;
162         shift @$rq;
163         last unless $flush; # Go back to select and wait
164                             # for it to fire again.
165     }
166     # Call me back if queue has not been drained.
167     if (@$rq) {
168         set_event_handler ($sock, "write" => sub {$conn->_send(0)});
169     } else {
170         set_event_handler ($sock, "write" => undef);
171     }
172     1;  # Success
173 }
174
175 sub _err_will_block {
176     if ($blocking_supported) {
177         return ($_[0] == EAGAIN());
178     }
179     return 0;
180 }
181 sub set_non_blocking {                        # $conn->set_blocking
182     if ($blocking_supported) {
183         # preserve other fcntl flags
184         my $flags = fcntl ($_[0], F_GETFL(), 0);
185         fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK());
186     }
187 }
188 sub set_blocking {
189     if ($blocking_supported) {
190         my $flags = fcntl ($_[0], F_GETFL(), 0);
191         $flags  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
192         fcntl ($_[0], F_SETFL(), $flags);
193     }
194 }
195
196 sub handle_send_err {
197    # For more meaningful handling of send errors, subclass Msg and
198    # rebless $conn.  
199    my ($conn, $err_msg) = @_;
200    warn "Error while sending: $err_msg \n";
201    set_event_handler ($conn->{sock}, "write" => undef);
202 }
203
204 #-----------------------------------------------------------------
205 # Receive side routines
206
207 sub new_server {
208     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
209     my ($pkg, $my_host, $my_port, $login_proc) = @_;
210         my $self = $pkg->new($login_proc);
211         
212     $self->{sock} = IO::Socket::INET->new (
213                                           LocalAddr => $my_host,
214                                           LocalPort => $my_port,
215                                           Listen    => 5,
216                                           Proto     => 'tcp',
217                                           Reuse     => 1);
218     die "Could not create socket: $! \n" unless $self->{sock};
219     set_event_handler ($self->{sock}, "read" => sub { $self->new_client }  );
220         return $self;
221 }
222
223 sub dequeue
224 {
225         my $conn = shift;
226         my $msg;
227         
228         while ($msg = shift @{$conn->{inqueue}}){
229                 &{$conn->{rproc}}($conn, $msg, $!);
230                 $! = 0;
231         }
232 }
233
234 sub _rcv {                     # Complement to _send
235     my $conn = shift; # $rcv_now complement of $flush
236     # Find out how much has already been received, if at all
237     my ($msg, $offset, $bytes_to_read, $bytes_read);
238     my $sock = $conn->{sock};
239     return unless defined($sock);
240
241         my @lines;
242     $conn->set_non_blocking();
243         $bytes_read = sysread ($sock, $msg, 1024, 0);
244         if (defined ($bytes_read)) {
245                 if ($bytes_read > 0) {
246                         if ($msg =~ /\n/) {
247                                 @lines = split /\r?\n/, $msg;
248                                 if (@lines) {
249                                         $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg};
250                                 } else {
251                                         $lines[0] = $conn->{msg} if exists $conn->{msg};
252                                         push @lines, '' unless @lines;
253                                 }
254                                 if ($msg =~ /\n$/) {
255                                         delete $conn->{msg};
256                                 } else {
257                                         $conn->{msg} = pop @lines;
258                                 }
259                                 push @{$conn->{inqueue}}, @lines if @lines;
260                         } else {
261                                 $conn->{msg} .= $msg;
262                         }
263                 } 
264         } else {
265                 if (_err_will_block($!)) {
266                         return ; 
267                 } else {
268                         $bytes_read = 0;
269                 }
270     }
271
272 FINISH:
273     if (defined $bytes_read && $bytes_read == 0) {
274 #               $conn->disconnect();
275                 &{$conn->{rproc}}($conn, undef, $!);
276                 delete $conn->{inqueue};
277     } else {
278                 $conn->dequeue;
279         }
280 }
281
282 sub new_client {
283         my $server_conn = shift;
284     my $sock = $server_conn->{sock}->accept();
285     my $conn = $server_conn->new($server_conn->{rproc});
286         $conn->{sock} = $sock;
287     my $rproc = &{$server_conn->{rproc}} ($conn, $sock->peerhost(), $sock->peerport());
288     if ($rproc) {
289         $conn->{rproc} = $rproc;
290         my $callback = sub {_rcv($conn)};
291         set_event_handler ($sock, "read" => $callback);
292     } else {  # Login failed
293         $conn->disconnect();
294     }
295 }
296
297 sub close_server
298 {
299         my $conn = shift;
300         set_event_handler ($conn->{sock}, "read" => undef);
301         $conn->{sock}->close;
302 }
303
304 #----------------------------------------------------
305 # Event loop routines used by both client and server
306
307 sub set_event_handler {
308     shift unless ref($_[0]); # shift if first arg is package name
309     my ($handle, %args) = @_;
310     my $callback;
311     if (exists $args{'write'}) {
312         $callback = $args{'write'};
313         if ($callback) {
314             $wt_callbacks{$handle} = $callback;
315             $wt_handles->add($handle);
316         } else {
317             delete $wt_callbacks{$handle};
318             $wt_handles->remove($handle);
319         }
320     }
321     if (exists $args{'read'}) {
322         $callback = $args{'read'};
323         if ($callback) {
324             $rd_callbacks{$handle} = $callback;
325             $rd_handles->add($handle);
326         } else {
327             delete $rd_callbacks{$handle};
328             $rd_handles->remove($handle);
329        }
330     }
331 }
332
333 sub new_timer
334 {
335     my ($pkg, $time, $proc, $recur) = @_;
336         my $obj = ref($pkg);
337         my $class = $obj || $pkg;
338         my $self = bless { t=>$time + time, proc=>$proc }, $class;
339         $self->{interval} = $time if $recur;
340         push @timerchain, $self;
341         return $self;
342 }
343
344 sub del_timer
345 {
346         my $self = shift;
347         @timerchain = grep {$_ != $self} @timerchain;
348 }
349
350 sub event_loop {
351     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
352     my ($conn, $r, $w, $rset, $wset);
353     while (1) {
354  
355        # Quit the loop if no handles left to process
356         last unless ($rd_handles->count() || $wt_handles->count());
357         
358                 ($rset, $wset) =
359             IO::Select->select ($rd_handles, $wt_handles, undef, $timeout);
360                 $now = time;
361                 
362         foreach $r (@$rset) {
363             &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r};
364         }
365         foreach $w (@$wset) {
366             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
367         }
368
369                 # handle things on the timer chain
370                 for (@timerchain) {
371                         if ($now >= $_->{t}) {
372                                 &{$_->{proc}}();
373                                 $_->{t} = $now + $_->{interval} if exists $_->{interval};
374                         }
375                 }
376
377                 # remove dead timers
378                 @timerchain = grep { $_->{t} > $now } @timerchain;
379                 
380         if (defined($loop_count)) {
381             last unless --$loop_count;
382         }
383     }
384 }
385
386 1;
387
388 __END__
389