X-Git-Url: http://dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=70009d74f483a94a51500daed2b13bc4c9bebd51;hb=b9dffeff7239952814342dad19db3a51def6fab7;hp=cf15ff76a8ce2242762d01248339d0ea1c61e22e;hpb=403e6bac61ed5d509aeda5b49557481ac88bc08c;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index cf15ff76..70009d74 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -12,18 +12,12 @@ package Msg; use strict; -use vars qw($VERSION $BRANCH); -$VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ ); -$BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ || (0,0)); -$main::build += $VERSION; -$main::branch += $BRANCH; - use IO::Select; use IO::Socket; use DXDebug; use Timer; -use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum); +use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum $total_in $total_out); %rd_callbacks = (); %wt_callbacks = (); @@ -31,6 +25,7 @@ use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $e $rd_handles = IO::Select->new(); $wt_handles = IO::Select->new(); $er_handles = IO::Select->new(); +$total_in = $total_out = 0; $now = time; @@ -41,11 +36,9 @@ BEGIN { require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL)) }; if ($@ || $main::is_win) { -# print STDERR "POSIX Blocking *** NOT *** supported $@\n"; - $blocking_supported = 0; + $blocking_supported = IO::Socket->can('blocking') ? 2 : 0; } else { - $blocking_supported = 1; -# print STDERR "POSIX Blocking enabled\n"; + $blocking_supported = IO::Socket->can('blocking') ? 2 : 1; } @@ -138,12 +131,8 @@ sub blocking return unless $blocking_supported; # Make the handle stop blocking, the Windows way. - if ($main::is_win) { - # 126 is FIONBIO (some docs say 0x7F << 16) - ioctl( $_[0], - 0x80000000 | (4 << 16) | (ord('f') << 8) | 126, - "$_[1]" - ); + if ($blocking_supported) { + $_[0]->blocking($_[1]); } else { my $flags = fcntl ($_[0], F_GETFL, 0); if ($_[1]) { @@ -216,7 +205,6 @@ sub connect { my $ip = gethostbyname($to_host); return undef unless $ip; -# my $r = $sock->connect($to_port, $ip); my $r = connect($sock, pack_sockaddr_in($to_port, $ip)); return undef unless $r || _err_will_block($!); @@ -229,7 +217,57 @@ sub connect { return $conn; } -sub disconnect { +sub start_program +{ + my ($conn, $line, $sort) = @_; + my $pid; + + local $^F = 10000; # make sure it ain't closed on exec + my ($a, $b) = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC); + if ($a && $b) { + $a->autoflush(1); + $b->autoflush(1); + $pid = fork; + if (defined $pid) { + if ($pid) { + close $b; + $conn->{sock} = $a; + $conn->{csort} = $sort; + $conn->{lineend} = "\cM" if $sort eq 'ax25'; + $conn->{pid} = $pid; + if ($conn->{rproc}) { + my $callback = sub {$conn->_rcv}; + Msg::set_event_handler ($a, read => $callback); + } + dbg("connect $conn->{cnum}: started pid: $conn->{pid} as $line") if isdbg('connect'); + } else { + $^W = 0; + dbgclose(); + STDIN->close; + STDOUT->close; + STDOUT->close; + *STDIN = IO::File->new_from_fd($b, 'r') or die; + *STDOUT = IO::File->new_from_fd($b, 'w') or die; + *STDERR = IO::File->new_from_fd($b, 'w') or die; + close $a; + unless ($main::is_win) { + # $SIG{HUP} = 'IGNORE'; + $SIG{HUP} = $SIG{CHLD} = $SIG{TERM} = $SIG{INT} = 'DEFAULT'; + alarm(0); + } + exec "$line" or dbg("exec '$line' failed $!"); + } + } else { + dbg("cannot fork for $line"); + } + } else { + dbg("no socket pair $! for $line"); + } + return $pid; +} + +sub disconnect +{ my $conn = shift; return if exists $conn->{disconnecting}; @@ -263,7 +301,6 @@ sub disconnect { unless ($main::is_win) { kill 'TERM', $conn->{pid} if exists $conn->{pid}; } - } sub send_now { @@ -297,10 +334,10 @@ sub _send { # return to the event loop only after every message, or if it # is likely to block in the middle of a message. - if ($conn->{blocking} != $flush) { - blocking($sock, $flush); - $conn->{blocking} = $flush; - } +# if ($conn->{blocking} != $flush) { +# blocking($sock, $flush); +# $conn->{blocking} = $flush; +# } my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0; while (@$rq) { @@ -329,6 +366,7 @@ sub _send { my $call = $conn->{call} || 'none'; dbgdump('raw', "$call send $bytes_written: ", $msg); } + $total_out += $bytes_written; $offset += $bytes_written; $bytes_to_write -= $bytes_written; } @@ -454,6 +492,7 @@ sub _rcv { # Complement to _send $bytes_read = sysread ($sock, $msg, 1024, 0); if (defined ($bytes_read)) { if ($bytes_read > 0) { + $total_in += $bytes_read; if (isdbg('raw')) { my $call = $conn->{call} || 'none'; dbgdump('raw', "$call read $bytes_read: ", $msg);