added echo and noecho for clients to set/unset
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 require Exporter;
14 @ISA = qw(Exporter);
15
16 use strict;
17 use IO::Select;
18 use IO::Socket;
19 use Carp;
20
21 use vars qw (%rd_callbacks %wt_callbacks $rd_handles $wt_handles);
22
23 %rd_callbacks = ();
24 %wt_callbacks = ();
25 $rd_handles   = IO::Select->new();
26 $wt_handles   = IO::Select->new();
27 my $blocking_supported = 0;
28
29 BEGIN {
30     # Checks if blocking is supported
31     eval {
32         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
33     };
34     $blocking_supported = 1 unless $@;
35 }
36
37 #-----------------------------------------------------------------
38 # Send side routines
39 sub connect {
40     my ($pkg, $to_host, $to_port,$rcvd_notification_proc) = @_;
41     
42     # Create a new internet socket
43     
44     my $sock = IO::Socket::INET->new (
45                                       PeerAddr => $to_host,
46                                       PeerPort => $to_port,
47                                       Proto    => 'tcp',
48                                       Reuse    => 1);
49
50     return undef unless $sock;
51
52     # Create a connection end-point object
53     my $conn = {
54         sock                   => $sock,
55         rcvd_notification_proc => $rcvd_notification_proc,
56     };
57     
58     if ($rcvd_notification_proc) {
59         my $callback = sub {_rcv($conn)};
60         set_event_handler ($sock, "read" => $callback);
61     }
62     return bless $conn, $pkg;
63 }
64
65 sub disconnect {
66     my $conn = shift;
67     my $sock = delete $conn->{sock};
68     return unless defined($sock);
69     set_event_handler ($sock, "read" => undef, "write" => undef);
70     shutdown($sock, 3);
71         close($sock);
72 }
73
74 sub send_now {
75     my ($conn, $msg) = @_;
76     _enqueue ($conn, $msg);
77     $conn->_send (1); # 1 ==> flush
78 }
79
80 sub send_later {
81     my ($conn, $msg) = @_;
82     _enqueue($conn, $msg);
83     my $sock = $conn->{sock};
84     return unless defined($sock);
85     set_event_handler ($sock, "write" => sub {$conn->_send(0)});
86 }
87
88 sub _enqueue {
89     my ($conn, $msg) = @_;
90     # prepend length (encoded as network long)
91     my $len = length($msg);
92         $msg =~ s/([\%\x00-\x1f\x7f-\xff])/sprintf("%%%02X", ord($1))/eg; 
93     push (@{$conn->{queue}}, $msg . "\n");
94 }
95
96 sub _send {
97     my ($conn, $flush) = @_;
98     my $sock = $conn->{sock};
99     return unless defined($sock);
100     my ($rq) = $conn->{queue};
101
102     # If $flush is set, set the socket to blocking, and send all
103     # messages in the queue - return only if there's an error
104     # If $flush is 0 (deferred mode) make the socket non-blocking, and
105     # return to the event loop only after every message, or if it
106     # is likely to block in the middle of a message.
107
108     $flush ? $conn->set_blocking() : $conn->set_non_blocking();
109     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
110
111     while (@$rq) {
112         my $msg            = $rq->[0];
113                 my $mlth           = length($msg);
114         my $bytes_to_write = $mlth - $offset;
115         my $bytes_written  = 0;
116                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
117         while ($bytes_to_write > 0) {
118             $bytes_written = syswrite ($sock, $msg,
119                                        $bytes_to_write, $offset);
120             if (!defined($bytes_written)) {
121                 if (_err_will_block($!)) {
122                     # Should happen only in deferred mode. Record how
123                     # much we have already sent.
124                     $conn->{send_offset} = $offset;
125                     # Event handler should already be set, so we will
126                     # be called back eventually, and will resume sending
127                     return 1;
128                 } else {    # Uh, oh
129                                         delete $conn->{send_offset};
130                     $conn->handle_send_err($!);
131                                         $conn->disconnect;
132                     return 0; # fail. Message remains in queue ..
133                 }
134             }
135             $offset         += $bytes_written;
136             $bytes_to_write -= $bytes_written;
137         }
138         delete $conn->{send_offset};
139         $offset = 0;
140         shift @$rq;
141         last unless $flush; # Go back to select and wait
142                             # for it to fire again.
143     }
144     # Call me back if queue has not been drained.
145     if (@$rq) {
146         set_event_handler ($sock, "write" => sub {$conn->_send(0)});
147     } else {
148         set_event_handler ($sock, "write" => undef);
149     }
150     1;  # Success
151 }
152
153 sub _err_will_block {
154     if ($blocking_supported) {
155         return ($_[0] == EAGAIN());
156     }
157     return 0;
158 }
159 sub set_non_blocking {                        # $conn->set_blocking
160     if ($blocking_supported) {
161         # preserve other fcntl flags
162         my $flags = fcntl ($_[0], F_GETFL(), 0);
163         fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK());
164     }
165 }
166 sub set_blocking {
167     if ($blocking_supported) {
168         my $flags = fcntl ($_[0], F_GETFL(), 0);
169         $flags  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
170         fcntl ($_[0], F_SETFL(), $flags);
171     }
172 }
173
174 sub handle_send_err {
175    # For more meaningful handling of send errors, subclass Msg and
176    # rebless $conn.  
177    my ($conn, $err_msg) = @_;
178    warn "Error while sending: $err_msg \n";
179    set_event_handler ($conn->{sock}, "write" => undef);
180 }
181
182 #-----------------------------------------------------------------
183 # Receive side routines
184
185 my ($g_login_proc,$g_pkg);
186 my $main_socket = 0;
187 sub new_server {
188     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc)\n";
189     my ($pkg, $my_host, $my_port, $login_proc) = @_;
190     
191     $main_socket = IO::Socket::INET->new (
192                                           LocalAddr => $my_host,
193                                           LocalPort => $my_port,
194                                           Listen    => 5,
195                                           Proto     => 'tcp',
196                                           Reuse     => 1);
197     die "Could not create socket: $! \n" unless $main_socket;
198     set_event_handler ($main_socket, "read" => \&_new_client);
199     $g_login_proc = $login_proc; $g_pkg = $pkg;
200 }
201
202 sub _rcv {                     # Complement to _send
203     my $conn = shift; # $rcv_now complement of $flush
204     # Find out how much has already been received, if at all
205     my ($msg, $offset, $bytes_to_read, $bytes_read);
206     my $sock = $conn->{sock};
207     return unless defined($sock);
208
209         my @lines;
210     $conn->set_non_blocking();
211         $bytes_read = sysread ($sock, $msg, 1024, 0);
212         if (defined ($bytes_read)) {
213                 if ($bytes_read > 0) {
214                         if ($msg =~ /\n/) {
215                                 @lines = split /\n/, $msg;
216                                 $lines[0] = $conn->{msg} . $lines[0] if $conn->{msg};
217                                 if ($msg =~ /\n$/) {
218                                         delete $conn->{msg};
219                                 } else {
220                                         $conn->{msg} = pop @lines;
221                                 }
222                         } else {
223                                 $conn->{msg} .= $msg;
224                         }
225                 } 
226         } else {
227                 if (_err_will_block($!)) {
228                         return ; 
229                 } else {
230                         $bytes_read = 0;
231                 }
232     }
233
234 FINISH:
235     if (defined $bytes_read && $bytes_read == 0) {
236 #               $conn->disconnect();
237                 &{$conn->{rcvd_notification_proc}}($conn, undef, $!);
238                 @lines = ();
239     } 
240
241         while (@lines){
242                 $msg = shift @lines;
243                 $msg =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg;
244                 &{$conn->{rcvd_notification_proc}}($conn, $msg, $!);
245                 $! = 0;
246         }
247 }
248
249 sub _new_client {
250     my $sock = $main_socket->accept();
251     my $conn = bless {
252         'sock' =>  $sock,
253         'state' => 'connected'
254     }, $g_pkg;
255     my $rcvd_notification_proc =
256         &$g_login_proc ($conn, $sock->peerhost(), $sock->peerport());
257     if ($rcvd_notification_proc) {
258         $conn->{rcvd_notification_proc} = $rcvd_notification_proc;
259         my $callback = sub {_rcv($conn)};
260         set_event_handler ($sock, "read" => $callback);
261     } else {  # Login failed
262         $conn->disconnect();
263     }
264 }
265
266 sub close_server
267 {
268         set_event_handler ($main_socket, "read" => undef);
269         $main_socket->close;
270         $main_socket = 0;
271 }
272
273 #----------------------------------------------------
274 # Event loop routines used by both client and server
275
276 sub set_event_handler {
277     shift unless ref($_[0]); # shift if first arg is package name
278     my ($handle, %args) = @_;
279     my $callback;
280     if (exists $args{'write'}) {
281         $callback = $args{'write'};
282         if ($callback) {
283             $wt_callbacks{$handle} = $callback;
284             $wt_handles->add($handle);
285         } else {
286             delete $wt_callbacks{$handle};
287             $wt_handles->remove($handle);
288         }
289     }
290     if (exists $args{'read'}) {
291         $callback = $args{'read'};
292         if ($callback) {
293             $rd_callbacks{$handle} = $callback;
294             $rd_handles->add($handle);
295         } else {
296             delete $rd_callbacks{$handle};
297             $rd_handles->remove($handle);
298        }
299     }
300 }
301
302 sub event_loop {
303     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
304     my ($conn, $r, $w, $rset, $wset);
305     while (1) {
306         # Quit the loop if no handles left to process
307         last unless ($rd_handles->count() || $wt_handles->count());
308         ($rset, $wset) =
309             IO::Select->select ($rd_handles, $wt_handles, undef, $timeout);
310         foreach $r (@$rset) {
311             &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r};
312         }
313         foreach $w (@$wset) {
314             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
315         }
316         if (defined($loop_count)) {
317             last unless --$loop_count;
318         }
319     }
320 }
321
322 1;
323
324 __END__
325