- if ($pcno == 32) { # incoming EOM
- dbg("stream $f[3]: EOM received\n") if isdbg('msg');
- my $ref = $work{"$f[2]$f[3]"};
- if ($ref) {
- $self->send(DXProt::pc33($f[2], $f[1], $f[3])); # acknowledge it
+# this is a incoming subject ack
+sub handle_30
+{
+ my $dxchan = shift;
+ my ($tonode, $fromnode, $stream) = @_[1..3];
+
+ my $ref = get_fwq($fromnode); # note no stream at this stage
+ if ($ref) {
+ del_fwq($fromnode);
+ $ref->{stream} = $stream;
+ $ref->{count} = 0;
+ $ref->{linesreq} = 5;
+ set_fwq($fromnode, $stream, $ref); # new ref
+ set_busy($fromnode, $ref); # interlock
+ dbg("incoming subject ack stream $stream\n") if isdbg('msg');
+ $ref->{lines} = [ $ref->read_msg_body ];
+ $ref->send_tranche($dxchan);
+ $ref->{lastt} = $main::systime;
+ } else {
+ dbg("PC30 from unknown stream $stream from $fromnode") if isdbg('msg');
+ $dxchan->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream
+ }
+}
+
+# acknowledge a tranche of lines
+sub handle_31
+{
+ my $dxchan = shift;
+ my ($tonode, $fromnode, $stream) = @_[1..3];
+
+ my $ref = get_fwq($fromnode, $stream);
+ if ($ref) {
+ dbg("tranche ack stream $stream\n") if isdbg('msg');
+ $ref->send_tranche($dxchan);
+ $ref->{lastt} = $main::systime;
+ } else {
+ dbg("PC31 from unknown stream $stream from $fromnode") if isdbg('msg');
+ $dxchan->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream
+ }
+}
+
+# incoming EOM
+sub handle_32
+{
+ my $dxchan = shift;
+ my ($tonode, $fromnode, $stream) = @_[1..3];
+
+ dbg("stream $stream: EOM received\n") if isdbg('msg');
+ my $ref = get_fwq($fromnode, $stream);
+ if ($ref) {
+ $dxchan->send(DXProt::pc33($fromnode, $tonode, $stream)); # acknowledge it