b1875d97bb8fa9d5698e30a1f5efcfa547cee159
[spider.git] / perl / AsyncMsg.pm
1 #
2 # This class is the internal subclass that does various Async connects and
3 # retreivals of info. Typical uses (and specific support) include http get and
4 # post.
5
6 # This merely starts up a Msg handler (and no DXChannel) ($conn in other words)
7 # does the GET, parses out the result and the data and then (assuming a positive
8 # result and that the originating callsign is still online) punts out the data
9 # to the caller.
10 #
11 # It isn't designed to be very clever.
12 #
13 # Copyright (c) 2013 - Dirk Koopman G1TLH
14 #
15
16 package AsyncMsg;
17
18 use Msg;
19 use DXDebug;
20 use DXUtil;
21 use DXChannel;
22
23 use vars qw(@ISA $deftimeout);
24
25 @ISA = qw(Msg);
26 $deftimeout = 15;
27
28 my %outstanding;
29
30 sub new 
31 {
32         my $pkg = shift;
33         my $call = shift;
34         my $handler = shift;
35         
36         my $conn = $pkg->SUPER::new($handler);
37         $conn->{caller} = ref $call ? $call->call : $call;
38
39         # make it persistent
40         $outstanding{$conn} = $conn;
41         
42         return $conn;
43 }
44
45 sub handle_getpost
46 {
47         my ($conn, $ua, $tx) = @_;
48
49         # no point in going on if there is no-one wanting the output anymore
50         my $dxchan = DXChannel::get($conn->{caller});
51         unless ($dxchan) {
52                 $conn->disconnect;
53                 return;
54         }
55         
56         my @lines = split qr{\r?\n}, $tx->res->body;
57         
58         foreach my $msg(@lines) {
59                 dbg("AsyncMsg: $conn->{_asstate} $msg") if isdbg('async');
60                 
61                 if (my $filter = $conn->{_asfilter}) {
62                         no strict 'refs';
63                         # this will crash if the command has been redefined and the filter is a
64                         # function defined there whilst the request is in flight,
65                         # but this isn't exactly likely in a production environment.
66                         $filter->($conn, $msg, $dxchan);
67                 } else {
68                         my $prefix = $conn->{prefix} || '';
69                         $dxchan->send("$prefix$msg");
70                 }
71         }
72         
73         $conn->disconnect;
74 }
75
76 # This does a http get on a path on a host and
77 # returns the result (through an optional filter)
78 #
79 # expects to be called something like from a cmd.pl file:
80 #
81 # AsyncMsg->get($self, <host>, <port>, <path>, [<key=>value>...]
82
83 # Standard key => value pairs are:
84 #
85 # filter => CODE ref (e.g. sub { ... })
86 # prefix => <string>                 prefix output with this string
87 #
88 # Anything else is taken and sent as (extra) http header stuff e.g:
89 #
90 # 'User-Agent' => qq{DXSpider;$main::version;$main::build;$^O}
91 # 'Content-Type' => q{text/xml; charset=utf-8}
92 # 'Content-Length' => $lth
93 #
94 # Host: is always set to the name of the host (unless overridden)
95 # User-Agent: is set to default above (unless overridden)
96 #
97 sub _getpost
98 {
99         my $pkg = shift;
100         my $sort = shift;
101         my $call = shift;
102         my $host = shift;
103         my $path = shift;
104         my %args = @_;
105         
106
107         my $conn = $pkg->new($call);
108         $conn->{_asargs} = [@_];
109         $conn->{_asstate} = 'waitreply';
110         $conn->{_asfilter} = delete $args{filter} if exists $args{filter};
111         $conn->{prefix} = delete $args{prefix} if exists $args{prefix};
112         $conn->{prefix} ||= '';
113         $conn->{on_disconnect} = delete $args{on_disc} || delete $args{on_disconnect};
114         $conn->{path} = $path;
115         $conn->{host} = $conn->{peerhost} = $host;
116         $conn->{port} = $conn->{peerport} = delete $args{port} || 80;
117         $conn->{sort} = 'outgoing';
118         $conn->{_assort} = $sort;
119         $conn->{csort} = 'http';
120
121         my $ua =  Mojo::UserAgent->new;
122         my $s;
123         $s .= $host;
124         $s .= ":$port" unless $conn->{port} == 80;
125         $s .= $path;
126         dbg("AsyncMsg: $sort $s") if isdbg('async');
127         
128         my $tx = $ua->build_tx($sort => $s);
129         $ua->on(error => sub { $conn->_error(@_); });
130 #       $tx->on(error => sub { $conn->_error(@_); });
131 #       $tx->on(finish => sub { $conn->disconnect; });
132
133         $ua->on(start => sub {
134                                 my ($ua, $tx) = @_;
135                                 my $data = delete $args{data};
136                                 while (my ($k, $v) = each %args) {
137                                         dbg("AsyncMsg: attaching header $k: $v") if isdbg('async');
138                                         $tx->req->headers->header($k => $v);
139                                 }
140                                 if (defined $data) {
141                                         dbg("AsyncMsg: body ='$data'") if isdbg('async'); 
142                                         $tx->req->body($data);
143                                 }
144                         });
145         
146
147         $ua->start($tx => sub { $conn->handle_getpost(@_) }); 
148
149         
150         $conn->{mojo} = $ua;
151         return $conn if $tx;
152
153         $conn->disconnect;
154         return undef;
155 }
156
157 sub _dxchan_send
158 {
159         my $conn = shift;
160         my $msg = shift;
161         my $dxchan = DXChannel::get($conn->{caller});
162         $dxchan->send($msg) if $dxchan;
163 }
164
165 sub _error
166 {
167         my ($conn, $e, $err);
168         dbg("Async: $conn->host:$conn->port path $conn->{path} error $err") if isdbg('chan');
169         $conn->_dxchan_send("$conn->{prefix}$msg");
170         $conn->disconnect;
171 }
172         
173 sub get
174 {
175         my $pkg = shift;
176         _getpost($pkg, "GET", @_);
177 }
178
179 sub post
180 {
181         my $pkg = shift;
182         _getpost($pkg, "POST", @_);
183 }
184
185 # do a raw connection
186 #
187 # Async->raw($self, <host>, <port>, [handler => CODE ref], [prefix => <string>]);
188 #
189 # With no handler defined, everything sent by the connection will be sent to
190 # the caller.
191 #
192 # One can send stuff out on the connection by doing a standard "$conn->send_later(...)" 
193 # inside the (custom) handler.
194
195 sub raw
196 {
197         my $pkg = shift;
198         my $call = shift;
199         my $host = shift;
200         my $port = shift;
201
202         my %args = @_;
203
204         my $handler = delete $args{handler} || \&handle_raw;
205         my $conn = $pkg->new($call, $handler);
206         $conn->{prefix} = delete $args{prefix} if exists $args{prefix};
207         $conn->{prefix} ||= '';
208         $r = $conn->connect($host, $port, on_connect => &_on_raw_connect);
209         return $r ? $conn : undef;
210 }
211
212
213 # simple raw handler
214 #
215 # Just outputs everything
216 #
217 sub handle_raw
218 {
219         my $conn = shift;
220         my $msg = shift;
221
222         # no point in going on if there is no-one wanting the output anymore
223         my $dxchan = DXChannel::get($conn->{caller});
224         unless ($dxchan) {
225                 $conn->disconnect;
226                 return;
227         }
228
229         # send out the data
230         $dxchan->send("$conn->{prefix}$msg");
231 }
232
233
234 sub _on_raw_connect
235 {
236         my $conn = shift;
237         my $handle = shift;
238         dbg("AsyncMsg: Connected $conn->{cnum} to $conn->{host}:$conn->{port}") if isdbg('async');
239 }
240
241 sub _on_error
242 {
243         my $conn = shift;
244         my $msg = shift;
245         dbg("AsyncMsg: ***Connect $conn->{cnum} Failed to $conn->{host}:$conn->{port} $!") if isdbg('async');   
246 }
247
248 sub connect
249 {
250         my $conn = shift;
251         my $host = shift;
252         my $port = shift;
253         
254         # start a connection
255         my $r = $conn->SUPER::connect($host, $port, @_);
256
257         return $r;
258 }
259
260 sub disconnect
261 {
262         my $conn = shift;
263
264         if (my $ondisc = $conn->{on_disconnect}) {
265                 my $dxchan = DXChannel::get($conn->{caller});
266                 if ($dxchan) {
267                         no strict 'refs';
268                         $ondisc->($conn, $dxchan);
269                 }
270         }
271         delete $conn->{mojo};
272         delete $outstanding{$conn};
273         $conn->SUPER::disconnect;
274 }
275
276 sub DESTROY
277 {
278         my $conn = shift;
279         delete $outstanding{$conn};
280         $conn->SUPER::DESTROY;
281 }
282
283 1;
284