From: minima Date: Tue, 13 Mar 2001 14:22:13 +0000 (+0000) Subject: do non blocking connects X-Git-Tag: R_1_47~134 X-Git-Url: http://dxcluster.net/gitweb/gitweb.cgi?a=commitdiff_plain;h=586cbb347e7639f5575b48572e75140501a109c0;p=spider.git do non blocking connects fix memory leak in connects --- diff --git a/Changes b/Changes index e5828220..549e49d8 100644 --- a/Changes +++ b/Changes @@ -1,3 +1,6 @@ +13Mar01======================================================================= +1. implemented first cut at non blocking connect +2. removed memory leakage in connects 10Mar01======================================================================= 1. minor changes to the admin manual to reflect differences in distibutions thanks to pa3ezl (g0vgs) diff --git a/cmd/Aliases b/cmd/Aliases index c6aab03c..b916742c 100644 --- a/cmd/Aliases +++ b/cmd/Aliases @@ -65,6 +65,7 @@ package CmdAlias; '^l$', 'directory', 'directory', '^ll$', 'directory', 'directory', '^ll/(\d+)', 'directory $1', 'directory', + '^lm$', 'directory own', 'directory', ], 'm' => [ ], diff --git a/cmd/bye.pl b/cmd/bye.pl index 1fc73f66..d6e2d14b 100644 --- a/cmd/bye.pl +++ b/cmd/bye.pl @@ -5,5 +5,15 @@ # my $self = shift; + +# log out text +if ($self->is_user && -e "$main::data/logout") { + open(I, "$main::data/logout") or confess; + my @in = ; + close(I); + $self->send_now('D', @in); + sleep(1); +} + $self->state('bye'); return (1); diff --git a/cmd/disconnect.pl b/cmd/disconnect.pl index 9207d73b..9e6b67df 100644 --- a/cmd/disconnect.pl +++ b/cmd/disconnect.pl @@ -23,7 +23,7 @@ foreach $call (@calls) { } $dxchan->disconnect; push @out, $self->msg('disc2', $call); - } elsif (my $conn = Msg->call($call)) { + } elsif (my $conn = Msg->conns($call)) { $conn->disconnect; } else { push @out, $self->msg('e10', $call); diff --git a/cmd/stat/channel.pl b/cmd/stat/channel.pl index 147c150a..8b20e0cb 100644 --- a/cmd/stat/channel.pl +++ b/cmd/stat/channel.pl @@ -15,7 +15,7 @@ foreach $call (@list) { $call = uc $call; my $ref = DXChannel->get($call); if ($ref) { - @out = print_all_fields($self, $ref, "Channe Information $call"); + @out = print_all_fields($self, $ref, "Channel Information $call"); } else { return (0, "Channel: $call not found") if !$ref; } diff --git a/perl/DXChannel.pm b/perl/DXChannel.pm index b7b2d4bb..3e32f429 100644 --- a/perl/DXChannel.pm +++ b/perl/DXChannel.pm @@ -370,12 +370,11 @@ sub disconnect { my $self = shift; my $user = $self->{user}; - my $conn = $self->{conn}; my $call = $self->{call}; - $self->finish($conn); + $self->finish; $user->close() if defined $user; - $conn->disconnect() if $conn; + $self->{conn}->disconnect; $self->del(); } diff --git a/perl/DXCommandmode.pm b/perl/DXCommandmode.pm index 3c45aee9..aea2064e 100644 --- a/perl/DXCommandmode.pm +++ b/perl/DXCommandmode.pm @@ -387,7 +387,6 @@ sub process sub finish { my $self = shift; - my $conn = shift; my $call = $self->call; # reset the redirection of messages back to 'normal' if we are the sysop @@ -399,20 +398,6 @@ sub finish # I was the last node visited $self->user->node($main::mycall); - # log out text - if ($conn && -e "$main::data/logout") { - open(I, "$main::data/logout") or confess; - my @in = ; - close(I); - $self->send_now('D', @in); - sleep(1); - } - -# if ($call eq $main::myalias) { # unset the channel if it is us really -# my $node = DXNode->get($main::mycall); -# $node->{dxchan} = 0; -# } - # issue a pc17 to everybody interested my $nchan = DXChannel->get($main::mycall); my $pc17 = $nchan->pc17($self); diff --git a/perl/DXProt.pm b/perl/DXProt.pm index 516779b6..fd47a28a 100644 --- a/perl/DXProt.pm +++ b/perl/DXProt.pm @@ -1148,7 +1148,6 @@ sub finish { my $self = shift; my $call = $self->call; - my $conn = shift; my $ref = DXCluster->get_exact($call); # unbusy and stop and outgoing mail diff --git a/perl/ExtMsg.pm b/perl/ExtMsg.pm index cd18eb93..87729553 100644 --- a/perl/ExtMsg.pm +++ b/perl/ExtMsg.pm @@ -57,47 +57,56 @@ sub dequeue { my $conn = shift; my $msg; - - while (@{$conn->{inqueue}}){ - $msg = shift @{$conn->{inqueue}}; - dbg('connect', $msg) unless $conn->{state} eq 'C'; - - $msg =~ s/\xff\xfa.*\xff\xf0|\xff[\xf0-\xfe].//g; # remove telnet options - $msg =~ s/[\x00-\x08\x0a-\x1f\x80-\x9f]/./g; # immutable CSI sequence + control characters - if ($conn->{state} eq 'C') { - &{$conn->{rproc}}($conn, "I$conn->{call}|$msg", $!); - $! = 0; - } elsif ($conn->{state} eq 'WL' ) { - $msg = uc $msg; - if (is_callsign($msg)) { - &{$conn->{rproc}}($conn, "A$msg|telnet"); - _send_file($conn, "$main::data/connected"); - $conn->{state} = 'C'; - } else { - $conn->send_now("Sorry $msg is an invalid callsign"); - $conn->disconnect; - } - } elsif ($conn->{state} eq 'WC') { - if (exists $conn->{cmd} && @{$conn->{cmd}}) { - $conn->_docmd($msg); - if ($conn->{state} eq 'WC' && exists $conn->{cmd} && @{$conn->{cmd}} == 0) { - $conn->{state} = 'C'; - &{$conn->{rproc}}($conn, "O$conn->{call}|telnet"); - delete $conn->{cmd}; - $conn->{timeout}->del_timer if $conn->{timeout}; - } - } + if ($conn->{state} eq 'WC') { + if (exists $conn->{cmd}) { + if (@{$conn->{cmd}}) { + dbg('connect', $conn->{msg}); + $conn->_docmd($conn->{msg}); + } } - } - if ($conn->{msg} && $conn->{state} eq 'WC' && exists $conn->{cmd} && @{$conn->{cmd}}) { - dbg('connect', $conn->{msg}); - $conn->_docmd($conn->{msg}); if ($conn->{state} eq 'WC' && exists $conn->{cmd} && @{$conn->{cmd}} == 0) { $conn->{state} = 'C'; &{$conn->{rproc}}($conn, "O$conn->{call}|telnet"); delete $conn->{cmd}; - $conn->{timeout}->del_timer if $conn->{timeout}; + $conn->{timeout}->del if $conn->{timeout}; + } + } elsif ($conn->{msg} =~ /\n/) { + my @lines = split /\r?\n/, $conn->{msg}; + if ($conn->{msg} =~ /\n$/) { + delete $conn->{msg}; + } else { + $conn->{msg} = pop @lines; + } + while (defined ($msg = shift @lines)) { + dbg('connect', $msg) unless $conn->{state} eq 'C'; + + $msg =~ s/\xff\xfa.*\xff\xf0|\xff[\xf0-\xfe].//g; # remove telnet options + $msg =~ s/[\x00-\x08\x0a-\x1f\x80-\x9f]/./g; # immutable CSI sequence + control characters + + if ($conn->{state} eq 'C') { + &{$conn->{rproc}}($conn, "I$conn->{call}|$msg"); + } elsif ($conn->{state} eq 'WL' ) { + $msg = uc $msg; + if (is_callsign($msg)) { + &{$conn->{rproc}}($conn, "A$msg|telnet"); + _send_file($conn, "$main::data/connected"); + $conn->{state} = 'C'; + } else { + $conn->send_now("Sorry $msg is an invalid callsign"); + $conn->disconnect; + } + } elsif ($conn->{state} eq 'WC') { + if (exists $conn->{cmd} && @{$conn->{cmd}}) { + $conn->_docmd($msg); + if ($conn->{state} eq 'WC' && exists $conn->{cmd} && @{$conn->{cmd}} == 0) { + $conn->{state} = 'C'; + &{$conn->{rproc}}($conn, "O$conn->{call}|telnet"); + delete $conn->{cmd}; + $conn->{timeout}->del if $conn->{timeout}; + } + } + } } } } @@ -108,7 +117,11 @@ sub new_client { my $conn = $server_conn->new($server_conn->{rproc}); $conn->{sock} = $sock; - my $rproc = &{$server_conn->{rproc}} ($conn, $sock->peerhost(), $sock->peerport()); + my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport()); + if ($eproc) { + $conn->{eproc} = $eproc; + set_event_handler ($sock, "error" => $eproc); + } if ($rproc) { $conn->{rproc} = $rproc; my $callback = sub {$conn->_rcv}; @@ -209,8 +222,8 @@ sub _dotimeout my $conn = shift; my $val = shift; dbg('connect', "timeout set to $val"); - $conn->{timeout}->del_timer if $conn->{timeout}; - $conn->{timeout} = ExtMsg->new_timer($val, sub{ _timeout($conn); }); + my $old = $conn->{timeout}->del if $conn->{timeout}; + $conn->{timeout} = Timer->new($val, sub{ &_timeout($conn) }); $conn->{timeval} = $val; } @@ -269,7 +282,7 @@ sub _doclient $conn->{state} = 'C'; &{$conn->{rproc}}($conn, "O$conn->{call}|telnet"); delete $conn->{cmd}; - $conn->{timeout}->del_timer if $conn->{timeout}; + $conn->{timeout}->del if $conn->{timeout}; } sub _send_file @@ -287,5 +300,4 @@ sub _send_file $f->close; } } - $! = undef; } diff --git a/perl/IntMsg.pm b/perl/IntMsg.pm index 150ec91a..8065f302 100644 --- a/perl/IntMsg.pm +++ b/perl/IntMsg.pm @@ -26,12 +26,23 @@ sub enqueue sub dequeue { my $conn = shift; - my $msg; - - while ($msg = shift @{$conn->{inqueue}}){ - $msg =~ s/\%([2-9A-F][0-9A-F])/chr(hex($1))/eg; - $msg =~ s/[\x00-\x08\x0a-\x1f\x80-\x9f]/./g; # immutable CSI sequence + control characters - &{$conn->{rproc}}($conn, $msg, $!); - $! = 0; + + if ($conn->{msg} =~ /\n/) { + my @lines = split /\r?\n/, $conn->{msg}; + if ($conn->{msg} =~ /\n$/) { + delete $conn->{msg}; + } else { + $conn->{msg} = pop @lines; + } + for (@lines) { + if (defined $_) { + s/\%([0-9A-F][0-9A-F])/chr(hex($1))/eg; + s/[\x00-\x08\x0a-\x1f\x80-\x9f]/./g; # immutable CSI sequence + control characters + } else { + $_ = ''; + } + &{$conn->{rproc}}($conn, $_); + } } } + diff --git a/perl/Msg.pm b/perl/Msg.pm index 7d5b4072..449f1790 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -13,26 +13,21 @@ package Msg; use strict; use IO::Select; use IO::Socket; -use Carp; +use DXDebug; +use Timer; +use Errno qw(EWOULDBLOCK EAGAIN EINPROGRESS); +use POSIX qw(F_GETFL F_SETFL O_NONBLOCK); -use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain %conns); +use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns); %rd_callbacks = (); %wt_callbacks = (); +%er_callbacks = (); $rd_handles = IO::Select->new(); $wt_handles = IO::Select->new(); -$now = time; -@timerchain = (); - -my $blocking_supported = 0; +$er_handles = IO::Select->new(); -BEGIN { - # Checks if blocking is supported - eval { - require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN)); - }; - $blocking_supported = 1 unless $@; -} +$now = time; # #----------------------------------------------------------------- @@ -54,9 +49,30 @@ sub new timeval => 60, }; + $noconns++; + dbg('connll', "Connection created ($noconns)"); return bless $conn, $class; } +sub set_error +{ + my $conn = shift; + my $callback = shift; + $conn->{eproc} = $callback; + set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock}; +} + +sub blocking +{ + my $flags = fcntl ($_[0], F_GETFL, 0); + if ($_[1]) { + $flags &= ~O_NONBLOCK; + } else { + $flags |= O_NONBLOCK; + } + fcntl ($_[0], F_SETFL, $flags); +} + # save it sub conns { @@ -70,6 +86,7 @@ sub conns confess "changing $pkg->{call} to $call" if exists $pkg->{call} && $call ne $pkg->{call}; $pkg->{call} = $call; $ref = $conns{$call} = $pkg; + dbg('connll', "Connection $call stored"); } else { $ref = $conns{$call}; } @@ -83,11 +100,8 @@ sub pid_gone my @pid = grep {$_->{pid} == $pid} values %conns; for (@pid) { - if ($_->{rproc}) { - &{$_->{rproc}}($_, undef, "$pid has gorn"); - } else { - $_->disconnect; - } + &{$_->{eproc}}($_, "$pid has gorn") if exists $_->{eproc}; + $_->disconnect; } } @@ -101,17 +115,24 @@ sub connect { unless (ref $pkg) { $conn = $pkg->new($rproc); } + $conn->{peerhost} = $to_host; + $conn->{peerport} = $to_port; + $conn->{sort} = 'Outgoing'; # Create a new internet socket - my $sock = IO::Socket::INET->new ( - PeerAddr => $to_host, - PeerPort => $to_port, - Proto => 'tcp', - Reuse => 1, - Timeout => $conn->{timeval} / 2); - + my $sock = IO::Socket::INET->new(); return undef unless $sock; - + + my $proto = getprotobyname('tcp'); + $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef; + + blocking($sock, 0); + my $ip = gethostbyname($to_host); + my $r = $sock->connect($to_port, $ip); + unless ($r) { + return undef unless $! == EINPROGRESS; + } + $conn->{sock} = $sock; if ($conn->{rproc}) { @@ -123,18 +144,26 @@ sub connect { sub disconnect { my $conn = shift; + return if exists $conn->{disconnecting}; + + $conn->{disconnecting} = 1; my $sock = delete $conn->{sock}; $conn->{state} = 'E'; delete $conn->{cmd}; - $conn->{timeout}->del_timer if $conn->{timeout}; + delete $conn->{eproc}; + delete $conn->{rproc}; + $conn->{timeout}->del if $conn->{timeout}; # be careful to delete the correct one - if (my $call = $conn->{call}) { + my $call; + if ($call = $conn->{call}) { my $ref = $conns{$call}; delete $conns{$call} if $ref && $ref == $conn; } + $call ||= 'unallocated'; + dbg('connll', "Connection $call disconnected"); - set_event_handler ($sock, "read" => undef, "write" => undef); + set_event_handler ($sock, read => undef, write => undef, error => undef); unless ($^O =~ /^MS/i) { kill 'TERM', $conn->{pid} if exists $conn->{pid}; } @@ -159,7 +188,7 @@ sub send_later { sub enqueue { my $conn = shift; - push (@{$conn->{outqueue}}, $_[0]); + push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : ''); } sub _send { @@ -174,7 +203,7 @@ sub _send { # return to the event loop only after every message, or if it # is likely to block in the middle of a message. - $flush ? $conn->set_blocking() : $conn->set_non_blocking(); + blocking($sock, $flush); my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0; while (@$rq) { @@ -195,8 +224,7 @@ sub _send { # be called back eventually, and will resume sending return 1; } else { # Uh, oh - delete $conn->{send_offset}; - $conn->handle_send_err($!); + &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc}; $conn->disconnect; return 0; # fail. Message remains in queue .. } @@ -215,37 +243,22 @@ sub _send { set_event_handler ($sock, "write" => sub {$conn->_send(0)}); } else { set_event_handler ($sock, "write" => undef); + if (exists $conn->{close_on_empty}) { + &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; + $conn->disconnect; + } } 1; # Success } sub _err_will_block { - if ($blocking_supported) { - return ($_[0] == EAGAIN()); - } - return 0; -} -sub set_non_blocking { # $conn->set_blocking - if ($blocking_supported) { - # preserve other fcntl flags - my $flags = fcntl ($_[0], F_GETFL(), 0); - fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK()); - } -} -sub set_blocking { - if ($blocking_supported) { - my $flags = fcntl ($_[0], F_GETFL(), 0); - $flags &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags - fcntl ($_[0], F_SETFL(), $flags); - } + return ($_[0] == EAGAIN || $_[0] == EWOULDBLOCK || $_[0] == EINPROGRESS); } -sub handle_send_err { - # For more meaningful handling of send errors, subclass Msg and - # rebless $conn. - my ($conn, $err_msg) = @_; - warn "Error while sending: $err_msg \n"; - set_event_handler ($conn->{sock}, "write" => undef); +sub close_on_empty +{ + my $conn = shift; + $conn->{close_on_empty} = 1; } #----------------------------------------------------------------- @@ -259,7 +272,7 @@ sub new_server { $self->{sock} = IO::Socket::INET->new ( LocalAddr => $my_host, LocalPort => $my_port, - Listen => 5, + Listen => SOMAXCONN, Proto => 'tcp', Reuse => 1); die "Could not create socket: $! \n" unless $self->{sock}; @@ -270,11 +283,17 @@ sub new_server { sub dequeue { my $conn = shift; - my $msg; - - while ($msg = shift @{$conn->{inqueue}}){ - &{$conn->{rproc}}($conn, $msg, $!); - $! = 0; + + if ($conn->{msg} =~ /\n/) { + my @lines = split /\r?\n/, $conn->{msg}; + if ($conn->{msg} =~ /\n$/) { + delete $conn->{msg}; + } else { + $conn->{msg} = pop @lines; + } + for (@lines) { + &{$conn->{rproc}}($conn, defined $_ ? $_ : ''); + } } } @@ -286,27 +305,11 @@ sub _rcv { # Complement to _send return unless defined($sock); my @lines; - $conn->set_non_blocking(); + blocking($sock, 0); $bytes_read = sysread ($sock, $msg, 1024, 0); if (defined ($bytes_read)) { if ($bytes_read > 0) { - if ($msg =~ /\n/) { - @lines = split /\r?\n/, $msg; - if (@lines) { - $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg}; - } else { - $lines[0] = $conn->{msg} if exists $conn->{msg}; - push @lines, '' unless @lines; - } - if ($msg =~ /\n$/) { - delete $conn->{msg}; - } else { - $conn->{msg} = pop @lines; - } - push @{$conn->{inqueue}}, @lines if @lines; - } else { - $conn->{msg} .= $msg; - } + $conn->{msg} .= $msg; } } else { if (_err_will_block($!)) { @@ -318,11 +321,10 @@ sub _rcv { # Complement to _send FINISH: if (defined $bytes_read && $bytes_read == 0) { -# $conn->disconnect(); - &{$conn->{rproc}}($conn, undef, $!); - delete $conn->{inqueue}; + &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; + $conn->disconnect(); } else { - $conn->dequeue; + $conn->dequeue if exists $conn->{msg}; } } @@ -331,12 +333,18 @@ sub new_client { my $sock = $server_conn->{sock}->accept(); my $conn = $server_conn->new($server_conn->{rproc}); $conn->{sock} = $sock; - my $rproc = &{$server_conn->{rproc}} ($conn, $sock->peerhost(), $sock->peerport()); + my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport()); + $conn->{sort} = 'Incoming'; + if ($eproc) { + $conn->{eproc} = $eproc; + set_event_handler ($sock, "error" => $eproc); + } if ($rproc) { $conn->{rproc} = $rproc; my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } else { # Login failed + &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; $conn->disconnect(); } } @@ -348,6 +356,14 @@ sub close_server $conn->{sock}->close; } +# close all clients (this is for forking really) +sub close_all_clients +{ + for (values %conns) { + $_->disconnect; + } +} + #---------------------------------------------------- # Event loop routines used by both client and server @@ -375,54 +391,40 @@ sub set_event_handler { $rd_handles->remove($handle); } } -} - -sub new_timer -{ - my ($pkg, $time, $proc, $recur) = @_; - my $obj = ref($pkg); - my $class = $obj || $pkg; - my $self = bless { t=>$time + time, proc=>$proc }, $class; - $self->{interval} = $time if $recur; - push @timerchain, $self; - return $self; -} - -sub del_timer -{ - my $self = shift; - @timerchain = grep {$_ != $self} @timerchain; + if (exists $args{'error'}) { + $callback = $args{'error'}; + if ($callback) { + $er_callbacks{$handle} = $callback; + $er_handles->add($handle); + } else { + delete $er_callbacks{$handle}; + $er_handles->remove($handle); + } + } } sub event_loop { my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once - my ($conn, $r, $w, $rset, $wset); + my ($conn, $r, $w, $e, $rset, $wset, $eset); while (1) { # Quit the loop if no handles left to process last unless ($rd_handles->count() || $wt_handles->count()); - ($rset, $wset) = - IO::Select->select ($rd_handles, $wt_handles, undef, $timeout); + ($rset, $wset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout); $now = time; + foreach $e (@$eset) { + &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e}; + } foreach $r (@$rset) { - &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r}; + &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r}; } foreach $w (@$wset) { &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; } - # handle things on the timer chain - for (@timerchain) { - if ($now >= $_->{t}) { - &{$_->{proc}}(); - $_->{t} = $now + $_->{interval} if exists $_->{interval}; - } - } - - # remove dead timers - @timerchain = grep { $_->{t} > $now } @timerchain; + Timer::handler; if (defined($loop_count)) { last unless --$loop_count; @@ -430,6 +432,14 @@ sub event_loop { } } +sub DESTROY +{ + my $conn = shift; + my $call = $conn->{call} || 'unallocated'; + dbg('connll', "Connection $call being destroyed ($noconns)"); + $noconns--; +} + 1; __END__ diff --git a/perl/Timer.pm b/perl/Timer.pm new file mode 100644 index 00000000..8969756f --- /dev/null +++ b/perl/Timer.pm @@ -0,0 +1,49 @@ +# +# Polled Timer handling +# +# This uses callbacks. BE CAREFUL!!!! +# +# $Id$ +# +# Copyright (c) 2001 Dirk Koopman G1TLH +# + +package Timer; + +use vars qw(@timerchain); + +@timerchain = (); + +sub new +{ + my ($pkg, $time, $proc, $recur) = @_; + my $obj = ref($pkg); + my $class = $obj || $pkg; + my $self = bless { t=>$time + time, proc=>$proc }, $class; + $self->{interval} = $time if $recur; + push @timerchain, $self; + return $self; +} + +sub del +{ + my $self = shift; + my $old = delete $self->{proc}; + @timerchain = grep {$_ != $self} @timerchain; + return $old; +} + +sub handler +{ + my $now = time; + + # handle things on the timer chain + for (@timerchain) { + if ($now >= $_->{t}) { + &{$_->{proc}}(); + $_->{t} = $now + $_->{interval} if exists $_->{interval}; + } + } +} + +1; diff --git a/perl/cluster.pl b/perl/cluster.pl index 2c94dcc1..eef7a40c 100755 --- a/perl/cluster.pl +++ b/perl/cluster.pl @@ -94,38 +94,27 @@ sub already_conn dbg('chan', "-> D $call $mess\n"); $conn->send_now("D$call|$mess"); - sleep(1); - dbg('chan', "-> Z $call bye\n"); - $conn->send_now("Z$call|bye"); # this will cause 'client' to disconnect - sleep(1); + sleep(2); $conn->disconnect; } +sub error_handler +{ + my $dxchan = shift; + $dxchan->disconnect; +} + # handle incoming messages sub rec { - my ($conn, $msg, $err) = @_; + my ($conn, $msg) = @_; my $dxchan = DXChannel->get_by_cnum($conn); # get the dxconnnect object for this message + my ($sort, $call, $line) = DXChannel::decode_input(0, $msg); + return unless defined $sort; - if (!defined $msg || (defined $err && $err)) { - if ($dxchan) { - if (defined $err) { - $conn->disconnect; - undef $conn; - $dxchan->conn(undef); - } - $dxchan->disconnect; - } elsif ($conn) { - $conn->disconnect; - } - return; - } - - # set up the basic channel info - this needs a bit more thought - there is duplication here + # set up the basic channel info if (!defined $dxchan) { - my ($sort, $call, $line) = DXChannel::decode_input(0, $msg); - return unless defined $sort; - + # is there one already connected to me - locally? my $user = DXUser->get($call); if ($sort ne 'O' && Msg->conns($call)) { @@ -158,13 +147,13 @@ sub rec # is he locked out ? if ($user->lockout) { Log('DXCommand', "$call is locked out, disconnected"); - $conn->send_now("Z$call|bye"); # this will cause 'client' to disconnect $conn->disconnect; return; } # mark him up $conn->conns($call) unless $sort eq 'O'; + $conn->set_error(sub {error_handler($dxchan)}); # create the channel $dxchan = DXCommandmode->new($call, $conn, $user) if $user->is_user; @@ -335,10 +324,14 @@ DXUser->init($userfn, 1); use Listeners; dbg('err', "starting listeners ..."); -push @listeners, IntMsg->new_server("$clusteraddr", $clusterport, \&login); +my $conn = IntMsg->new_server($clusteraddr, $clusterport, \&login); +$conn->conns("Server $clusteraddr/$clusterport"); +push @listeners, $conn; dbg('err', "Internal port: $clusteraddr $clusterport"); for (@main::listen) { - push @listeners, ExtMsg->new_server($_->[0], $_->[1], \&login); + $conn = ExtMsg->new_server($_->[0], $_->[1], \&login); + $conn->conns("Server $_->[0]/$_->[1]"); + push @listeners, $conn; dbg('err', "External Port: $_->[0] $_->[1]"); } @@ -422,7 +415,7 @@ dbg('err', "orft we jolly well go ..."); for (;;) { # $DB::trace = 1; - Msg->event_loop(1, 0.1); + Msg->event_loop(10, 0.001); my $timenow = time; process_inqueue(); # read in lines from the input queue and despatch them # $DB::trace = 0; diff --git a/perl/console.pl b/perl/console.pl index 208eb90a..aee1bc9b 100755 --- a/perl/console.pl +++ b/perl/console.pl @@ -104,9 +104,7 @@ sub do_resize sub cease { my $sendz = shift; -# if ($conn && $sendz) { -# $conn->send_now("Z$call|bye..."); -# } + $conn->disconnect if $conn; endwin(); dbgclose(); print @_ if @_; @@ -437,6 +435,8 @@ if (! $conn) { exit(0); } +$conn->set_error(sub{cease(0)}); + unless ($DB::VERSION) { $SIG{'INT'} = \&sig_term;