use IO::Socket;
use DXDebug;
use Timer;
-use Errno qw(EWOULDBLOCK EAGAIN EINPROGRESS);
-use POSIX qw(F_GETFL F_SETFL O_NONBLOCK);
use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns);
$er_handles = IO::Select->new();
$now = time;
+my $blocking_supported = 0;
+
+BEGIN {
+ # Checks if blocking is supported
+ eval {
+ require POSIX; POSIX->import(qw (F_SETFL F_GETFL O_NONBLOCK));
+ };
+ $blocking_supported = 1 unless $@;
+
+ # import as many of these errno values as are available
+ eval {
+ require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
+ };
+}
+
+my $w = $^W;
+$^W = 0;
+my $eagain = eval {EAGAIN()};
+my $einprogress = eval {EINPROGRESS()};
+my $ewouldblock = eval {EWOULDBLOCK()};
+$^W = $w;
#
#-----------------------------------------------------------------
lineend => "\r\n",
csort => 'telnet',
timeval => 60,
+ blocking => 0,
};
$noconns++;
set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
}
+sub set_rproc
+{
+ my $conn = shift;
+ my $callback = shift;
+ $conn->{rproc} = $callback;
+}
+
sub blocking
{
+ return unless $blocking_supported;
+
my $flags = fcntl ($_[0], F_GETFL, 0);
if ($_[1]) {
$flags &= ~O_NONBLOCK;
my $proto = getprotobyname('tcp');
$sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
- blocking($sock, 0);
- my $ip = gethostbyname($to_host);
- my $r = $sock->connect($to_port, $ip);
- unless ($r) {
- return undef unless $! == EINPROGRESS;
+ if ($conn->{blocking}) {
+ blocking($sock, 0);
+ $conn->{blocking} = 0;
}
+
+ my $ip = gethostbyname($to_host);
+# my $r = $sock->connect($to_port, $ip);
+ my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
+ return undef unless $r || _err_will_block($!);
$conn->{sock} = $sock;
if ($conn->{rproc}) {
- my $callback = sub {_rcv($conn)};
- set_event_handler ($sock, "read" => $callback);
+ my $callback = sub {$conn->_rcv};
+ set_event_handler ($sock, read => $callback);
}
return $conn;
}
$conn->{disconnecting} = 1;
my $sock = delete $conn->{sock};
$conn->{state} = 'E';
- delete $conn->{cmd};
- delete $conn->{eproc};
- delete $conn->{rproc};
$conn->{timeout}->del if $conn->{timeout};
# be careful to delete the correct one
$call ||= 'unallocated';
dbg('connll', "Connection $call disconnected");
- set_event_handler ($sock, read => undef, write => undef, error => undef);
unless ($^O =~ /^MS/i) {
kill 'TERM', $conn->{pid} if exists $conn->{pid};
}
+
+ # get rid of any references
+ for (keys %$conn) {
+ if (ref($conn->{$_})) {
+ delete $conn->{$_};
+ }
+ }
+
return unless defined($sock);
+ set_event_handler ($sock, read => undef, write => undef, error => undef);
shutdown($sock, 3);
close($sock);
}
$conn->enqueue($msg);
my $sock = $conn->{sock};
return unless defined($sock);
- set_event_handler ($sock, "write" => sub {$conn->_send(0)});
+ set_event_handler ($sock, write => sub {$conn->_send(0)});
}
sub enqueue {
# return to the event loop only after every message, or if it
# is likely to block in the middle of a message.
- blocking($sock, $flush);
+ if ($conn->{blocking} != $flush) {
+ blocking($sock, $flush);
+ $conn->{blocking} = $flush;
+ }
my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
while (@$rq) {
}
# Call me back if queue has not been drained.
if (@$rq) {
- set_event_handler ($sock, "write" => sub {$conn->_send(0)});
+ set_event_handler ($sock, write => sub {$conn->_send(0)});
} else {
- set_event_handler ($sock, "write" => undef);
+ set_event_handler ($sock, write => undef);
if (exists $conn->{close_on_empty}) {
&{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
$conn->disconnect;
1; # Success
}
+sub dup_sock
+{
+ my $conn = shift;
+ my $oldsock = $conn->{sock};
+ my $rc = $rd_callbacks{$oldsock};
+ my $wc = $wt_callbacks{$oldsock};
+ my $ec = $er_callbacks{$oldsock};
+ my $sock = $oldsock->new_from_fd($oldsock, "w+");
+ if ($sock) {
+ set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
+ $conn->{sock} = $sock;
+ set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
+ $oldsock->close;
+ }
+}
+
sub _err_will_block {
- return ($_[0] == EAGAIN || $_[0] == EWOULDBLOCK || $_[0] == EINPROGRESS);
+ return 0 unless $blocking_supported;
+ return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
}
sub close_on_empty
Proto => 'tcp',
Reuse => 1);
die "Could not create socket: $! \n" unless $self->{sock};
- set_event_handler ($self->{sock}, "read" => sub { $self->new_client } );
+ set_event_handler ($self->{sock}, read => sub { $self->new_client } );
return $self;
}
return unless defined($sock);
my @lines;
- blocking($sock, 0);
+ if ($conn->{blocking}) {
+ blocking($sock, 0);
+ $conn->{blocking} = 0;
+ }
$bytes_read = sysread ($sock, $msg, 1024, 0);
if (defined ($bytes_read)) {
if ($bytes_read > 0) {
FINISH:
if (defined $bytes_read && $bytes_read == 0) {
- &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
- $conn->disconnect();
+ &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
+ $conn->disconnect;
} else {
$conn->dequeue if exists $conn->{msg};
}
$conn->{sort} = 'Incoming';
if ($eproc) {
$conn->{eproc} = $eproc;
- set_event_handler ($sock, "error" => $eproc);
+ set_event_handler ($sock, error => $eproc);
}
if ($rproc) {
$conn->{rproc} = $rproc;
- my $callback = sub {_rcv($conn)};
- set_event_handler ($sock, "read" => $callback);
+ my $callback = sub {$conn->_rcv};
+ set_event_handler ($sock, read => $callback);
} else { # Login failed
&{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
$conn->disconnect();
sub close_server
{
my $conn = shift;
- set_event_handler ($conn->{sock}, "read" => undef);
+ set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
$conn->{sock}->close;
}
}
}
+#
#----------------------------------------------------
# Event loop routines used by both client and server
last unless ($rd_handles->count() || $wt_handles->count());
($rset, $wset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
- $now = time;
foreach $e (@$eset) {
&{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
}
}
+sub sleep
+{
+ my ($pkg, $interval) = @_;
+ my $now = time;
+ while (time - $now < $interval) {
+ $pkg->event_loop(10, 0.01);
+ }
+}
+
sub DESTROY
{
my $conn = shift;