extend the Web interface protocol further
[spider.git] / perl / Msg.pm
index d62cb744034377f54592a3a7233571f9e88f842d..ad09c85da0f784d98b12634f01ad1a0e421144d5 100644 (file)
@@ -20,14 +20,14 @@ use Mojo::IOLoop::Stream;
 use DXDebug;
 use Timer;
 
-use vars qw($now %conns $noconns $cnum $total_in $total_out);
+use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout);
 
 $total_in = $total_out = 0;
 
 $now = time;
 
 $cnum = 0;
-
+$connect_timeout = 5;
 
 #
 #-----------------------------------------------------------------
@@ -61,14 +61,14 @@ sub set_error
 {
        my $conn = shift;
        my $callback = shift;
-       $conn->{sock}->on(error => sub {my ($stream, $err) = @_; $callback->($conn, $err);});
+       $conn->{sock}->on(error => sub {$callback->($_[1]);});
 }
 
 sub set_on_eof
 {
        my $conn = shift;
        my $callback = shift;
-       $conn->{sock}->on(close => sub {$callback->($conn);});
+       $conn->{sock}->on(close => sub {$callback->()});
 }
 
 sub set_rproc
@@ -120,20 +120,50 @@ sub ax25
 sub peerhost
 {
        my $conn = shift;
-       $conn->{peerhost} ||= 'ax25' if $conn->ax25;
-       $conn->{peerhost} ||= $conn->{sock}->handle->peerhost if $conn->{sock};
-       $conn->{peerhost} ||= 'UNKNOWN';
+       unless ($conn->{peerhost}) {
+               $conn->{peerhost} ||= 'ax25' if $conn->ax25;
+               $conn->{peerhost} ||= $conn->{sock}->handle->peerhost if $conn->{sock};
+               $conn->{peerhost} ||= 'UNKNOWN';
+       }
        return $conn->{peerhost};
 }
 
 #-----------------------------------------------------------------
 # Send side routines
-sub connect {
-    my ($pkg, $to_host, $to_port, $rproc) = @_;
 
+sub _on_connect
+{
+       my $conn = shift;
+       my $handle = shift;
+       undef $conn->{sock};
+       my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
+       $sock->on(read => sub {$conn->_rcv($_[1]);} );
+       $sock->on(error => sub {delete $conn->{sock}; $conn->disconnect;});
+       $sock->on(close => sub {delete $conn->{sock}; $conn->disconnect;});
+       $sock->timeout(0);
+       $sock->start;
+       $conn->{peerhost} = eval { $handle->peerhost; };
+       dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('connll');
+       if ($conn->{on_connect}) {
+               &{$conn->{on_connect}}($conn, $handle);
+       }
+}
+
+sub is_connected
+{
+       my $conn = shift;
+       my $sock = $conn->{sock};
+       return ref $sock && $sock->isa('Mojo::IOLoop::Stream');
+}
+
+sub connect {
+    my ($pkg, $to_host, $to_port, %args) = @_;
+       my $timeout = delete $args{timeout} || $connect_timeout;
+       
     # Create a connection end-point object
     my $conn = $pkg;
        unless (ref $pkg) {
+               my $rproc = delete $args{rproc}; 
                $conn = $pkg->new($rproc);
        }
        $conn->{peerhost} = $to_host;
@@ -144,17 +174,17 @@ sub connect {
        
        my $sock;
        $conn->{sock} = $sock = Mojo::IOLoop::Client->new;
-       $sock->on(connect => sub { dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll');}, 
-                         error => {$conn->disconnect},
-                         close => {$conn->disconnect});
+       $sock->on(connect => sub {$conn->_on_connect($_[1])} );
+       $sock->on(error => sub {&{$conn->{eproc}}($conn, $_[1]) if exists $conn->{eproc}; $conn->disconnect});
+       $sock->on(close => sub {$conn->disconnect});
+
+       # copy any args like on_connect, on_disconnect etc
+       while (my ($k, $v) = each %args) {
+               $conn->{$k} = $v;
+       }
        
-       $sock->connect(address => $to_host, port => $to_port);
+       $sock->connect(address => $to_host, port => $to_port, timeout => $timeout);
        
-       dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll');
-
-    if ($conn->{rproc}) {
-               $sock->on(read => sub {my ($stream, $msg) = @_; $conn->_rcv($msg);} );
-    }
     return $conn;
 }
 
@@ -226,6 +256,10 @@ sub disconnect
        $call ||= 'unallocated';
        dbg((ref $conn) . " Connection $conn->{cnum} $call disconnected") if isdbg('connll');
        
+       if ($conn->{on_disconnect}) {
+               &{$conn->{on_disconnect}}($conn);
+       }
+
        # get rid of any references
        for (keys %$conn) {
                if (ref($conn->{$_})) {
@@ -233,9 +267,8 @@ sub disconnect
                }
        }
 
-       if (defined($sock)) {
-               $sock->remove;
-       }
+       $sock->close_gracefully if defined $sock && $sock->can('close_gracefully');
+       undef $sock;
        
        unless ($main::is_win) {
                kill 'TERM', $conn->{pid} if exists $conn->{pid};
@@ -247,6 +280,9 @@ sub _send_stuff
        my $conn = shift;
        my $rq = $conn->{outqueue};
     my $sock = $conn->{sock};
+       return unless defined $sock;
+       return if $conn->{disconnecting};
+       
        while (@$rq) {
                my $data = shift @$rq;
                my $lth = length $data;
@@ -256,9 +292,9 @@ sub _send_stuff
                                dbgdump('raw', "$call send $lth: ", $lth);
                        }
                }
-               if (defined $sock && !$sock->destroyed) {
+               if (defined $sock) {
                        $sock->write($data);
-                       $total_out = $lth;
+                       $total_out += $lth;
                } else {
                        dbg("_send_stuff $call ending data ignored: $data");
                }
@@ -275,6 +311,13 @@ sub send_later {
        goto &send_now;
 }
 
+sub send_raw
+{
+    my ($conn, $msg) = @_;
+       push @{$conn->{outqueue}}, $msg;
+       _send_stuff($conn);
+}
+
 sub enqueue {
     my $conn = shift;
     push @{$conn->{outqueue}}, defined $_[0] ? $_[0] : '';
@@ -300,9 +343,10 @@ sub new_server
        my ($pkg, $my_host, $my_port, $login_proc) = @_;
        my $conn = $pkg->new($login_proc);
        
-    $conn->{sock} = Mojo::IOLoop::Server->new;
-       $conn->{sock}->on(accept=>sub{$conn->new_client()});
-       $conn->{sock}->listen(address=>$my_host, port=>$my_port);
+    my $sock = $conn->{sock} = Mojo::IOLoop::Server->new;
+       $sock->on(accept=>sub{$conn->new_client($_[1]);});
+       $sock->listen(address=>$my_host, port=>$my_port);
+       $sock->start;
        
     die "Could not create socket: $! \n" unless $conn->{sock};
        return $conn;
@@ -335,73 +379,55 @@ sub dequeue
 
 sub _rcv {                     # Complement to _send
     my $conn = shift; # $rcv_now complement of $flush
-    # Find out how much has already been received, if at all
-    my ($msg, $offset, $bytes_to_read, $bytes_read);
+       my $msg = shift;
     my $sock = $conn->{sock};
     return unless defined($sock);
+       return if $conn->{disconnecting};
+
+       $total_in += length $msg;
 
        my @lines;
-#      if ($conn->{blocking}) {
-#              blocking($sock, 0);
-#              $conn->{blocking} = 0;
-#      }
-       $bytes_read = sysread ($sock, $msg, 1024, 0);
-       if (defined ($bytes_read)) {
-               if ($bytes_read > 0) {
-                       $total_in += $bytes_read;
-                       if (isdbg('raw')) {
-                               my $call = $conn->{call} || 'none';
-                               dbgdump('raw', "$call read $bytes_read: ", $msg);
-                       }
-                       if ($conn->{echo}) {
-                               my @ch = split //, $msg;
-                               my $out;
-                               for (@ch) {
-                                       if (/[\cH\x7f]/) {
-                                               $out .= "\cH \cH";
-                                               $conn->{msg} =~ s/.$//;
-                                       } else {
-                                               $out .= $_;
-                                               $conn->{msg} .= $_;
-                                       }
-                               }
-                               if (defined $out) {
-                                       set_event_handler ($sock, write => sub{$conn->_send(0)});
-                                       push @{$conn->{outqueue}}, $out;
+       if (isdbg('raw')) {
+               my $call = $conn->{call} || 'none';
+               my $lth = length $msg;
+               dbgdump('raw', "$call read $lth: ", $msg);
+       }
+       if ($conn->{echo}) {
+               my @ch = split //, $msg;
+                       my $out;
+                       for (@ch) {
+                               if (/[\cH\x7f]/) {
+                                       $out .= "\cH \cH";
+                                       $conn->{msg} =~ s/.$//;
+                               } else {
+                                       $out .= $_;
+                                       $conn->{msg} .= $_;
                                }
-                       } else {
-                               $conn->{msg} .= $msg;
                        }
-               } 
+                       if (defined $out) {
+                               $conn->send_raw($out);
+                       }
        } else {
-               if (_err_will_block($!)) {
-                       return ; 
-               } else {
-                       $bytes_read = 0;
-               }
-    }
+               $conn->{msg} .= $msg;
+       }
 
-FINISH:
-    if (defined $bytes_read && $bytes_read == 0) {
-               &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
-               $conn->disconnect;
-    } else {
-               unless ($conn->{disable_read}) {
-                       $conn->dequeue if exists $conn->{msg};
-               }
+       unless ($conn->{disable_read}) {
+               $conn->dequeue if exists $conn->{msg};
        }
 }
 
 sub new_client {
        my $server_conn = shift;
-       my $client = shift;
+       my $handle = shift;
        
        my $conn = $server_conn->new($server_conn->{rproc});
-       my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($client);
+       my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
        $sock->on(read => sub {$conn->_rcv($_[1])});
+       $sock->timeout(0);
+       $sock->start;
        dbg((ref $conn) . "accept $conn->{cnum} from $conn->{peerhost} $conn->{peerport}") if isdbg('connll');
 
-       my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $client->peerhost, $conn->{peerport} = $client->peerport);
+       my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $handle->peerhost, $conn->{peerport} = $handle->peerport);
        $conn->{sort} = 'Incoming';
        if ($eproc) {
                $conn->{eproc} = $eproc;
@@ -412,6 +438,7 @@ sub new_client {
                &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
                $conn->disconnect();
        }
+       return $conn;
 }
 
 sub close_server
@@ -466,6 +493,10 @@ sub DESTROY
        my $call = $conn->{call} || 'unallocated';
        my $host = $conn->{peerhost} || '';
        my $port = $conn->{peerport} || '';
+       my $sock = $conn->{sock};
+
+       $sock->close_gracefully if defined $sock && $sock->can('close_gracefully');
+       
        $noconns--;
        dbg((ref $conn) . " Connection $conn->{cnum} $call [$host $port] being destroyed (total $noconns)") if isdbg('connll');
 }