package Msg;
-require Exporter;
-@ISA = qw(Exporter);
-
use strict;
use IO::Select;
use IO::Socket;
-use Carp;
+#use DXDebug;
-use vars qw (%rd_callbacks %wt_callbacks $rd_handles $wt_handles);
+use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles);
%rd_callbacks = ();
%wt_callbacks = ();
};
if ($rcvd_notification_proc) {
- my $callback = sub {_rcv($conn, 0)};
+ my $callback = sub {_rcv($conn)};
set_event_handler ($sock, "read" => $callback);
}
return bless $conn, $pkg;
my ($conn, $msg) = @_;
# prepend length (encoded as network long)
my $len = length($msg);
- $msg = pack ('N', $len) . $msg;
- push (@{$conn->{queue}}, $msg);
+ $msg =~ s/([\%\x00-\x1f\x7f-\xff])/sprintf("%%%02X", ord($1))/eg;
+ push (@{$conn->{queue}}, $msg . "\n");
}
sub _send {
while (@$rq) {
my $msg = $rq->[0];
- my $bytes_to_write = length($msg) - $offset;
+ my $mlth = length($msg);
+ my $bytes_to_write = $mlth - $offset;
my $bytes_written = 0;
- while ($bytes_to_write) {
+ confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
+ while ($bytes_to_write > 0) {
$bytes_written = syswrite ($sock, $msg,
$bytes_to_write, $offset);
if (!defined($bytes_written)) {
# be called back eventually, and will resume sending
return 1;
} else { # Uh, oh
+ delete $conn->{send_offset};
$conn->handle_send_err($!);
+ $conn->disconnect;
return 0; # fail. Message remains in queue ..
}
}
$g_login_proc = $login_proc; $g_pkg = $pkg;
}
-sub rcv_now {
- my ($conn) = @_;
- my ($msg, $err) = _rcv ($conn, 1); # 1 ==> rcv now
- return wantarray ? ($msg, $err) : $msg;
-}
-
sub _rcv { # Complement to _send
- my ($conn, $rcv_now) = @_; # $rcv_now complement of $flush
+ my $conn = shift; # $rcv_now complement of $flush
# Find out how much has already been received, if at all
my ($msg, $offset, $bytes_to_read, $bytes_read);
my $sock = $conn->{sock};
return unless defined($sock);
- if (exists $conn->{msg}) {
- $msg = $conn->{msg};
- $offset = length($msg) - 1; # sysread appends to it.
- $bytes_to_read = $conn->{bytes_to_read};
- delete $conn->{'msg'}; # have made a copy
- } else {
- # The typical case ...
- $msg = ""; # Otherwise -w complains
- $offset = 0 ;
- $bytes_to_read = 0 ; # Will get set soon
- }
- # We want to read the message length in blocking mode. Quite
- # unlikely that we'll get blocked too long reading 4 bytes
- if (!$bytes_to_read) { # Get new length
- my $buf;
- $conn->set_blocking();
- $bytes_read = sysread($sock, $buf, 4);
- if ($! || ($bytes_read != 4)) {
- goto FINISH;
- }
- $bytes_to_read = unpack ('N', $buf);
- }
- $conn->set_non_blocking() unless $rcv_now;
- while ($bytes_to_read) {
- $bytes_read = sysread ($sock, $msg, $bytes_to_read, $offset);
- if (defined ($bytes_read)) {
- if ($bytes_read == 0) {
- last;
- }
- $bytes_to_read -= $bytes_read;
- $offset += $bytes_read;
- } else {
- if (_err_will_block($!)) {
- # Should come here only in non-blocking mode
- $conn->{msg} = $msg;
- $conn->{bytes_to_read} = $bytes_to_read;
- return ; # .. _rcv will be called later
- # when socket is readable again
- } else {
- last;
- }
- }
- }
- FINISH:
- if (length($msg) == 0) {
- $conn->disconnect();
- }
- if ($rcv_now) {
- return ($msg, $!);
- } else {
- &{$conn->{rcvd_notification_proc}}($conn, $msg, $!);
+ my @lines;
+ $conn->set_non_blocking();
+ $bytes_read = sysread ($sock, $msg, 1024, 0);
+ if (defined ($bytes_read)) {
+ if ($bytes_read > 0) {
+ if ($msg =~ /\n/) {
+ @lines = split /\n/, $msg;
+ $lines[0] = $conn->{msg} . $lines[0] if $conn->{msg};
+ if ($msg =~ /\n$/) {
+ delete $conn->{msg};
+ } else {
+ $conn->{msg} = pop @lines;
+ }
+ } else {
+ $conn->{msg} .= $msg;
+ }
+ }
+ } else {
+ if (_err_will_block($!)) {
+ return ;
+ } else {
+ $bytes_read = 0;
+ }
}
+
+FINISH:
+ if (defined $bytes_read && $bytes_read == 0) {
+# $conn->disconnect();
+ &{$conn->{rcvd_notification_proc}}($conn, undef, $!);
+ @lines = ();
+ }
+
+ while (@lines){
+ $msg = shift @lines;
+ $msg =~ s/\%([2-9A-F][0-9A-F])/chr(hex($1))/eg;
+ $msg =~ s/[\x00-\x08\x0a-\x1f\x9b\x8e]/./g; # immutable CSI sequence + control characters
+ &{$conn->{rcvd_notification_proc}}($conn, $msg, $!);
+ $! = 0;
+ }
}
sub _new_client {
&$g_login_proc ($conn, $sock->peerhost(), $sock->peerport());
if ($rcvd_notification_proc) {
$conn->{rcvd_notification_proc} = $rcvd_notification_proc;
- my $callback = sub {_rcv($conn,0)};
+ my $callback = sub {_rcv($conn)};
set_event_handler ($sock, "read" => $callback);
} else { # Login failed
$conn->disconnect();