8c2fbaadc5911079367118faac7a1c71ac4987e6
[spider.git] / perl / RBN.pm
1 #
2 # The RBN connection system
3 #
4 # Copyright (c) 2020 Dirk Koopman G1TLH
5 #
6
7 use warnings;
8 use strict;
9
10 package RBN;
11
12 use 5.10.1;
13
14 use lib qw {.};
15
16 use DXDebug;
17 use DXUtil;
18 use DXLog;
19 use DXUser;
20 use DXChannel;
21 use Math::Round qw(nearest nearest_floor);
22 use Date::Parse;
23 use Time::HiRes qw(gettimeofday);
24 use Spot;
25 use DXJSON;
26 use IO::File;
27
28 use constant {
29                           ROrigin => 0,
30                           RQrg => 1,
31                           RCall => 2,
32                           RMode => 3,
33                           RStrength => 4,
34                           RTime => 5,
35                           RUtz => 6,
36                           Respot => 7,
37                           RQra => 8,
38                           RSpotData => 9,
39                          };
40
41 use constant {
42                           SQrg => 0,
43                           SCall => 1,
44                           STime => 2,
45                           SComment => 3,
46                           SOrigin => 4,
47                           SZone => 11,
48                          };
49 use constant {
50                           OQual => 0,
51                           OAvediff => 1,
52                           OSpare => 2,
53                           ODiff => 3,
54                          };
55 use constant {
56                           CTime => 0,
57                           CQual => 1,
58                           CData => 2,
59                          };
60
61 use constant {
62                           DScore => 0,
63                           DGood => 1,
64                           DBad => 2,
65                           DLastin => 3,
66                           DEviants => 4,
67                          };
68
69
70 our $DATA_VERSION = 1;
71
72 our @ISA = qw(DXChannel);
73
74 our $startup_delay = 5*60;              # don't send anything out until this timer has expired
75                                 # this is to allow the feed to "warm up" with duplicates
76                                 # so that the "big rush" doesn't happen.
77
78 our $minspottime = 30*60;               # the time between respots of a callsign - if a call is
79                                 # still being spotted (on the same freq) and it has been
80                                 # spotted before, it's spotted again after this time
81                                 # until the next minspottime has passed.
82
83 our $beacontime = 5*60;                 # same as minspottime, but for beacons (and shorter)
84
85 our $dwelltime = 10;                    # the amount of time to wait for duplicates before issuing
86                                 # a spot to the user (no doubt waiting with bated breath).
87
88 our $limbotime = 5*60;                  # if there are fewer than $minqual candidates and $dwelltime
89                                 # has expired then allow this spot to live a bit longer. It may
90                                 # simply be that it is not in standard spot coverage. (ask G4PIQ
91                                 # about this).
92
93 our $filterdef = $Spot::filterdef; # we use the same filter as the Spot system. Can't think why :-).
94
95 my $spots;                                              # the GLOBAL spot cache
96
97 my %runtime;                                    # how long each channel has been running
98
99 our $cachefn = localdata('rbn_cache');
100 our $cache_valid = 4*60;                # The cache file is considered valid if it is not more than this old
101
102 our $maxqrgdiff = 10;                   # the maximum
103 our $minqual = 2;                               # the minimum quality we will accept for output
104 our $maxqual = 9;                               # if there is enough quality, then short circuit any remaining dwelltime.
105
106 my $json;
107 my $noinrush = 0;                               # override the inrushpreventor if set
108 our $maxdeviants = 5;                   # the number of deviant QRGs to record for skimmer records
109
110 sub init
111 {
112         $json = DXJSON->new;
113         $json->canonical(0);
114         if (check_cache()) {
115                 $noinrush = 1;
116         } else {
117                 $spots = {VERSION=>$DATA_VERSION};
118         }
119         if (defined $DB::VERSION) {
120                 $noinrush = 1;
121                 $json->indent(1);
122         }
123         
124 }
125
126 sub new 
127 {
128         my $self = DXChannel::alloc(@_);
129
130         # routing, this must go out here to prevent race condx
131         my $pkg = shift;
132         my $call = shift;
133
134         $self->{last} = 0;
135         $self->{noraw} = 0;
136         $self->{nospot} = 0;
137         $self->{nouser} = {};
138         $self->{norbn} = 0;
139         $self->{noraw10} = 0;
140         $self->{nospot10} = 0;
141         $self->{nouser10} = {};
142         $self->{norbn10} = 0;
143         $self->{nospothour} = 0;
144         $self->{nouserhour} = {};
145         $self->{norbnhour} = 0;
146         $self->{norawhour} = 0;
147         $self->{sort} = 'N';
148         $self->{lasttime} = $main::systime;
149         $self->{minspottime} = $minspottime;
150         $self->{beacontime} = $beacontime;
151         $self->{showstats} = 0;
152         $self->{pingint} = 0;
153         $self->{nopings} = 0;
154         $self->{queue} = {};
155
156         return $self;
157 }
158
159 sub start
160
161         my ($self, $line, $sort) = @_;
162         my $user = $self->{user};
163         my $call = $self->{call};
164         my $name = $user->{name};
165                 
166         # log it
167         my $host = $self->{conn}->peerhost;
168         $host ||= "unknown";
169         $self->{hostname} = $host;
170
171         $self->{name} = $name ? $name : $call;
172         $self->state('prompt');         # a bit of room for further expansion, passwords etc
173         $self->{lang} = $user->lang || $main::lang || 'en';
174         if ($line =~ /host=/) {
175                 my ($h) = $line =~ /host=(\d+\.\d+\.\d+\.\d+)/;
176                 $line =~ s/\s*host=\d+\.\d+\.\d+\.\d+// if $h;
177                 unless ($h) {
178                         ($h) = $line =~ /host=([\da..fA..F:]+)/;
179                         $line =~ s/\s*host=[\da..fA..F:]+// if $h;
180                 }
181                 if ($h) {
182                         $h =~ s/^::ffff://;
183                         $self->{hostname} = $h;
184                 }
185         }
186         $self->{width} = 80 unless $self->{width} && $self->{width} > 80;
187         $self->{consort} = $line;       # save the connection type
188
189         LogDbg('DXCommand', "$call connected from $self->{hostname}");
190
191         # set some necessary flags on the user if they are connecting
192         $self->{registered} = 1;
193         # sort out privilege reduction
194         $self->{priv} = 0;
195
196         # get the filters
197         my $nossid = $call;
198         $nossid =~ s/-\d+$//;
199
200         $self->{inrbnfilter} = Filter::read_in('rbn', $call, 1) 
201                 || Filter::read_in('rbn', 'node_default', 1);
202         
203         # clean up qra locators
204         my $qra = $user->qra;
205         $qra = undef if ($qra && !DXBearing::is_qra($qra));
206         unless ($qra) {
207                 my $lat = $user->lat;
208                 my $long = $user->long;
209                 $user->qra(DXBearing::lltoqra($lat, $long)) if (defined $lat && defined $long);  
210         }
211
212         # if we have been running and stopped for a while 
213         # if the cache is warm enough don't operate the inrush preventor
214         $self->{inrushpreventor} = exists $runtime{$call} && $runtime{$call} > $startup_delay || $noinrush ?  0 : $main::systime + $startup_delay;
215         dbg("RBN: noinrush: $noinrush, setting inrushpreventor on $self->{call} to $self->{inrushpreventor}");
216 }
217
218 my @queue;                                              # the queue of spots ready to send
219
220 sub normal
221 {
222         my $self = shift;
223         my $line = shift;
224         my @ans;
225         my $dbgrbn = isdbg('rbn');
226         
227         # remove leading and trailing spaces
228         chomp $line;
229         $line =~ s/^\s*//;
230         $line =~ s/\s*$//;
231
232         # add base RBN
233
234         my $now = $main::systime;
235
236         # parse line
237         dbg "RBN:RAW,$line" if isdbg('rbnraw');
238         return unless $line=~/^DX\s+de/;
239
240         my (undef, undef, $origin, $qrg, $call, $mode, $s, $m, $spd, $u, $sort, $t, $tx) = split /[:\s]+/, $line;
241
242         # fix up FT8 spots from 7001
243         $t = $u, $u = '' if !$t && is_ztime($u);
244         $t = $sort, $sort = '' if !$t && is_ztime($sort);
245         my $qra = $spd, $spd = '' if is_qra($spd);
246         $u = $qra if $qra;
247
248         # is this anything like a callsign?
249         unless (is_callsign($call)) {
250                 dbg("RBN: ERROR $call from $origin on $qrg is invalid, dumped");
251                 return;
252         }
253
254         $origin =~ s/\-(?:\d{1,2}\-)?\#$//; # get rid of all the crap we aren't interested in
255
256
257         $sort ||= '';
258         $tx ||= '';
259         $qra ||= '';
260     dbg qq{RBN:input decode or:$origin qr:$qrg ca:$call mo:$mode s:$s m:$m sp:$spd u:$u sort:$sort t:$t tx:$tx qra:$qra} if $dbgrbn && isdbg('rbn');
261
262         ++$self->{noraw};
263         ++$self->{noraw10};
264         ++$self->{norawhour};
265         
266         my $b;
267         
268         if ($t || $tx) {
269
270                 # fix up times for things like 'NXDXF B' etc
271                 if ($tx && is_ztime($t)) {
272                         if (is_ztime($tx)) {
273                                 $b = $t;
274                                 $t = $tx;
275                         } else {
276                                 dbg "RBN:ERR,$line";
277                                 return (0);
278                         }
279                 }
280                 if ($sort && $sort eq 'NCDXF') {
281                         $mode = 'DXF';
282                         $t = $tx;
283                 }
284                 if ($sort && $sort eq 'BEACON') {
285                         $mode = 'BCN';
286                 }
287                 if ($mode =~ /^PSK/) {
288                         $mode = 'PSK';
289                 }
290                 if ($mode eq 'RTTY') {
291                         $mode = 'RTT';
292                 }
293
294                 # The main de-duping key is [call, $frequency], but we probe a bit around that frequency to find a
295                 # range of concurrent frequencies that might be in play. 
296
297                 # The key to this is deducing the true callsign by "majority voting" (the greater the number of spotters
298         # the more effective this is) together with some lexical analsys probably in conjuction with DXSpider
299                 # data sources (for singleton spots) to then generate a "centre" from and to zone (whatever that will mean if it isn't the usual one)
300                 # and some heuristical "Kwalitee" rating given distance from the zone centres of spotter, recipient user
301         # and spotted. A map can be generated once per user and spotter as they are essentially mostly static. 
302                 # The spotted will only get a coarse position unless other info is available. Programs that parse 
303                 # DX bulletins and the online data online databases could be be used and then cached. 
304
305                 # Obviously users have to opt in to receiving RBN spots and other users will simply be passed over and
306                 # ignored.
307
308                 # Clearly this will only work in the 'mojo' branch of DXSpider where it is possible to pass off external
309                 # data requests to ephemeral or semi resident forked processes that do any grunt work and the main
310                 # process to just the standard "message passing" which has been shown to be able to sustain over 5000 
311                 # per second (limited by the test program's output and network speed, rather than DXSpider's handling).
312
313                 my $search = 5;
314                 my $nqrg = nearest(1, $qrg * 10);  # normalised to nearest Khz
315                 my $sp = "$call|$nqrg";           # hopefully the skimmers will be calibrated at least this well!
316
317                 # find it?
318                 my $cand = $spots->{$sp};
319                 unless ($cand) {
320                         my ($i, $new);
321                         for ($i = $nqrg; !$cand && $i <= $nqrg+$search; $i += 1) {
322                                 $new = "$call|$i";
323                                 $cand = $spots->{$new}, last if exists $spots->{$new};
324                         }
325                         if ($cand) {
326                                 my $diff = $i - $nqrg;
327                                 dbg(qq{RBN: QRG Diff using $new (+$diff) for $sp for qrg $qrg}) if (isdbg('rbnqrg') || ($dbgrbn && isdbg('rbn')));
328                                 $sp = $new;
329                         }
330                 }
331                 unless ($cand) {
332                         my ($i, $new);
333                         for ($i = $nqrg; !$cand && $i >= $nqrg-$search; $i -= 1) {
334                                 $new = "$call|$i";
335                                 $cand = $spots->{$new}, last if exists $spots->{$new};
336                         }
337                         if ($cand) {
338                                 my $diff = $nqrg - $i;
339                                 dbg(qq{RBN: QRG Diff using $new (-$diff) for $sp for qrg $qrg}) if (isdbg('rbnqrg') || ($dbgrbn && isdbg('rbn')));
340                                 $sp = $new;
341                         }
342                 }
343                 
344                 # if we have one and there is only one slot and that slot's time isn't expired for respot then return
345                 my $respot = 0;
346                 if ($cand && ref $cand) {
347                         if (@$cand <= CData) {
348                                 if ($self->{minspottime} > 0 && $now - $cand->[CTime] < $self->{minspottime}) {
349                                         dbg("RBN: key: '$sp' call: $call qrg: $qrg DUPE \@ ". atime(int $cand->[CTime])) if $dbgrbn && isdbg('rbn');
350                                         return;
351                                 }
352                                 
353                                 dbg("RBN: key: '$sp' RESPOTTING call: $call qrg: $qrg last seen \@ ". atime(int $cand->[CTime])) if $dbgrbn && isdbg('rbn');
354                                 $cand->[CTime] = $now;
355                                 ++$respot;
356                         }
357
358                         # otherwise we have a spot being built up at the moment
359                 } elsif ($cand) {
360                         dbg("RBN: key '$sp' = '$cand' not ref");
361                         return;
362                 } else {
363                         # new spot / frequency
364                         $spots->{$sp} = $cand = [$now, 0];
365                         dbg("RBN: key: '$sp' call: $call qrg: $qrg NEW" . ($respot ? ' RESPOT' : '')) if $dbgrbn && isdbg('rbn');
366                 }
367
368                 # add me to the display queue unless we are waiting for initial in rush to finish
369                 return unless $noinrush || $self->{inrushpreventor} < $main::systime;
370
371                 # build up a new record and store it in the buildup
372                 # deal with the unix time
373                 my ($hh,$mm) = $t =~ /(\d\d)(\d\d)Z$/;
374                 my $utz = $hh*3600 + $mm*60 + $main::systime_daystart; # possible issue with late spot from previous day
375                 $utz -= 86400 if $utz > $now+3600;                                         # too far ahead, drag it back one day
376
377                 # create record and add into the buildup
378                 my $r = [$origin, nearest(.1, $qrg), $call, $mode, $s, $t, $utz, $respot, $u];
379                 my @s =  Spot::prepare($r->[RQrg], $r->[RCall], $r->[RUtz], '', $r->[ROrigin]);
380                 if ($s[5] == 666) {
381                         dbg("RBN: ERROR invalid prefix/callsign $call from $origin-# on $qrg, dumped");
382                         return;
383                 }
384                 
385                 if ($self->{inrbnfilter}) {
386                         my ($want, undef) = $self->{inrbnfilter}->it($s);
387                         return unless $want;    
388                 }
389                 $r->[RSpotData] = \@s;
390
391                 ++$self->{queue}->{$sp};# unless @$cand>= CData; # queue the KEY (not the record)
392
393                 dbg("RBN: key: '$sp' ADD RECORD call: $call qrg: $qrg origin: $origin respot: $respot") if $dbgrbn && isdbg('rbn');
394
395                 push @$cand, $r;
396
397         } else {
398                 dbg "RBN:DATA,$line" if $dbgrbn && isdbg('rbn');
399         }
400 }
401
402 # we should get the spot record minus the time, so just an array of record (arrays)
403 sub send_dx_spot
404 {
405         my $self = shift;
406         my $quality = shift;
407         my $cand = shift;
408
409         ++$self->{norbn};
410         ++$self->{norbn10};
411         ++$self->{norbnhour};
412         
413         # $r = [$origin, $qrg, $call, $mode, $s, $utz, $respot];
414
415         my $mode = $cand->[CData]->[RMode]; # as all the modes will be the same;
416         
417         my @dxchan = DXChannel::get_all();
418
419         foreach my $dxchan (@dxchan) {
420                 next unless $dxchan->is_user;
421                 my $user = $dxchan->{user};
422                 next unless $user &&  $user->wantrbn;
423
424                 # does this user want this sort of spot at all?
425                 my $want = 0;
426                 ++$want if $user->wantbeacon && $mode =~ /^BCN|DXF/;
427                 ++$want if $user->wantcw && $mode =~ /^CW/;
428                 ++$want if $user->wantrtty && $mode =~ /^RTT/;
429                 ++$want if $user->wantpsk && $mode =~ /^PSK|FSK|MSK/;
430                 ++$want if $user->wantft && $mode =~ /^FT/;
431
432                 dbg(sprintf("RBN: spot selection for $dxchan->{call} mode: '$mode' want: $want flags rbn:%d ft:%d bcn:%d cw:%d psk:%d rtty:%d",
433                                         $user->wantrbn,
434                                         $user->wantft,
435                                         $user->wantbeacon,
436                                         $user->wantcw,
437                                         $user->wantpsk,
438                                         $user->wantrtty,
439                                    )) if isdbg('rbnll');
440
441                 # send one spot to one user out of the ones that we have
442                 $self->dx_spot($dxchan, $quality, $cand) if $want;
443         }
444 }
445
446 sub dx_spot
447 {
448         my $self = shift;
449         my $dxchan = shift;
450         my $quality = shift;
451         my $cand = shift;
452         my $call = $dxchan->{call};
453         my $seeme = $dxchan->user->rbnseeme();
454         my $strength = 100;             # because it could if we talk about FTx
455         my $saver;
456         my %zone;
457         my $respot;
458         my $qra;
459
460         ++$self->{nousers}->{$call};
461         ++$self->{nousers10}->{$call};
462         ++$self->{nousershour}->{$call};
463
464         my $filtered;
465         my $rf = $dxchan->{rbnfilter} || $dxchan->{spotsfilter};
466         my $comment;
467         
468         foreach my $r (@$cand) {
469                 # $r = [$origin, $qrg, $call, $mode, $s, $t, $utz, $respot, $qra];
470                 # Spot::prepare($qrg, $call, $utz, $comment, $origin);
471                 next unless $r && ref $r;
472
473                 $qra = $r->[RQra] if !$qra && $r->[RQra] && is_qra($r->[RQra]);
474
475                 $comment = sprintf "%-3s %2ddB $quality", $r->[RMode], $r->[RStrength];
476                 my $s = $r->[RSpotData];                # the prepared spot
477                 $s->[SComment] = $comment;              # apply new generated comment
478
479                 ++$zone{$s->[SZone]};           # save the spotter's zone
480
481                 # if the 'see me' flag is set, then show all the spots without further adornment (see set/rbnseeme for more info)
482                 if ($seeme) {
483                         send_final($dxchan, $s);
484                         next;
485                 }
486
487                 # save the lowest strength one
488                 if ($r->[RStrength] < $strength) {
489                         $strength = $r->[RStrength];
490                         $saver = $s;
491                         dbg("RBN: STRENGTH spot: $s->[SCall] qrg: $s->[SQrg] origin: $s->[SOrigin] dB: $r->[RStrength] < $strength") if isdbg 'rbnll';
492                 }
493
494                 if ($rf) {
495                         my ($want, undef) = $rf->it($s);
496                         dbg("RBN: FILTERING for $call spot: $s->[SCall] qrg: $s->[SQrg] origin: $s->[SOrigin] dB: $r->[RStrength] com: '$s->[SComment]' want: " . ($want ? 'YES':'NO')) if isdbg 'rbnll';
497                         next unless $want;
498                         $filtered = $s;
499                 }
500         }
501
502         if ($rf) {
503                 $saver = $filtered;             # if nothing passed the filter's lips then $saver == $filtered == undef !
504         }
505         
506         if ($saver) {
507                 my $buf;
508                 # create a zone list of spotters
509                 delete $zone{$saver->[SZone]};  # remove this spotter's zone (leaving all the other zones)
510                 my $z = join ',', sort {$a <=> $b} keys %zone;
511
512                 # alter spot data accordingly
513                 $saver->[SComment] .= " Z:$z" if $z;
514                 
515                 send_final($dxchan, $saver);
516                 
517                 ++$self->{nospot};
518                 ++$self->{nospot10};
519                 ++$self->{nospothour};
520                 
521                 if ($qra) {
522                         my $user = DXUser::get_current($saver->[SCall]) || DXUser->new($saver->[SCall]);
523                         unless ($user->qra && is_qra($user->qra)) {
524                                 $user->qra($qra);
525                                 dbg("RBN: update qra on $saver->[SCall] to $qra");
526                                 $user->put;
527                         }
528                 }
529         }
530 }
531
532 sub send_final
533 {
534         my $dxchan = shift;
535         my $saver = shift;
536         my $call = $dxchan->{call};
537         my $buf;
538         
539         dbg("RBN: SENDING to $call spot: $saver->[SCall] qrg: $saver->[SQrg] origin: $saver->[SOrigin] $saver->[SComment]") if isdbg 'rbnll';
540         if ($dxchan->{ve7cc}) {
541                 my $call = $saver->[SOrigin];
542                 $saver->[SOrigin] .= '-#';
543                 $buf = VE7CC::dx_spot($dxchan, @$saver);
544                 $saver->[SOrigin] = $call;
545         } else {
546                 my $call = $saver->[SOrigin];
547                 $saver->[SOrigin] = substr($call, 0, 6);
548                 $saver->[SOrigin] .= '-#';
549                 $buf = $dxchan->format_dx_spot(@$saver);
550                 $saver->[SOrigin] = $call;
551         }
552         $dxchan->local_send('N', $buf);
553 }
554
555 # per second
556 sub process
557 {
558         my $rbnskim = isdbg('rbnskim');
559         
560         foreach my $dxchan (DXChannel::get_all()) {
561                 next unless $dxchan->is_rbn;
562
563                 # At this point we run the queue to see if anything can be sent onwards to the punter
564                 my $now = $main::systime;
565                 my $ta = [gettimeofday];
566                 my $items = 0;
567                 
568                 # now run the waiting queue which just contains KEYS ($call|$qrg)
569                 foreach my $sp (keys %{$dxchan->{queue}}) {
570                         my $cand = $spots->{$sp};
571                         ++$items;
572                         
573                         unless ($cand && $cand->[CTime]) {
574                                 dbg "RBN Cand $sp " . ($cand ? 'def' : 'undef') . " [CTime] " . ($cand->[CTime] ? 'def' : 'undef') . " dwell $dwelltime";
575                                 delete $spots->{$sp};
576                                 delete $dxchan->{queue}->{$sp};    # remove
577                                 next;
578                         }
579                         
580                         my $ctime = $cand->[CTime];
581                         my $quality = @$cand - CData;
582                         my $dwellsecs =  $now - $ctime;
583                         if ($quality >= $maxqual || $dwellsecs >= $dwelltime || $dwellsecs >= $limbotime) {
584                                 # we have a candidate, create qualitee value(s);
585                                 unless (@$cand > CData) {
586                                         dbg "RBN: QUEUE key '$sp' MISSING RECORDS, IGNORED" . dd($cand) if isdbg 'rbnqueue';
587                                         delete $spots->{$sp}; # don't remember it either - this means that a spot HAS to come in with sufficient spotters to be processed.
588                                         delete $dxchan->{queue}->{$sp};
589                                         next;
590                                 }
591                                 dbg "RBN: QUEUE PROCESSING key: '$sp' $now >= $cand->[CTime]" if isdbg 'rbnqueue'; 
592                                 my $spotters = $quality;
593
594                                 # dump it and remove it from the queue if it is of unadequate quality, but only if it is no longer in Limbo and can be reasonably passed on to its demise
595                                 my $r = $cand->[CData];
596                                 if ($dwellsecs > $limbotime && $quality < $minqual) {
597                                         if ( $rbnskim && isdbg('rbnskim')) {
598                                                 $r = $cand->[CData];
599                                                 if ($r) {
600                                                         my $lastin = difft($ctime, $now, 2);
601                                                         my $s = "RBN:SKIM time in Limbo exceeded DUMPED (lastin: $lastin Q:$quality < Q:$minqual) key: '$sp' = $r->[RCall] on $r->[RQrg] by $r->[ROrigin] \@ $r->[RTime] route: $dxchan->{call}";
602                                                         dbg($s);
603                                                 }
604                                         }
605                                         delete $spots->{$sp}; # don't remember it either - this means that a spot HAS to come in with sufficient spotters to be processed.
606                                         delete $dxchan->{queue}->{$sp};
607                                         next;
608                                 }
609
610                                 # we have a possible removal from Limbo, check for more than one skimmer and reset the quality if required
611                                 # DOES THIS TEST CAUSE RACES?
612                                 if (!$r->[Respot] && $quality >= $minqual && $dwellsecs > $dwelltime+1) {
613
614                                         # because we don't need to check for repeats by the same skimmer in the normal case, we do here
615                                         my %seen;
616                                         my @origin;
617                                         foreach my $wr (@$cand) {
618                                                 next unless ref $wr;
619                                                 push @origin, $wr->[ROrigin];
620                                                 if (exists $seen{$wr->[ROrigin]}) {
621                                                         next;
622                                                 }
623                                                 $seen{$wr->[ROrigin]} = $wr;
624                                         }
625                                         # reset the quality to ignore dupes
626                                         my $oq = $quality;
627                                         $quality = keys %seen;
628                                         if ($quality >= $minqual) {
629                                                 if ( $rbnskim && isdbg('rbnskim')) {
630                                                         my $lastin = difft($ctime, $now, 2);
631                                                         my $sk = join ' ', keys %seen;
632                                                         my $or = join ' ', @origin;
633                                                         my $s = "RBN:SKIM promoted from Limbo - key: '$sp' (lastin: $lastin Q now: $quality was $oq skimmers now: $sk";
634                                                         $s .= " was $or" if $or ne $sk;
635                                                         $s .= ')';
636                                                         dbg($s);
637                                                 } 
638                                         } elsif ($oq != $quality) {
639                                                 if ( $rbnskim && isdbg('rbnskim')) {
640                                                         my $lastin = difft($ctime, $now, 2);
641                                                         my $sk = join ' ', keys %seen;
642                                                         my $or = join ' ', @origin;
643                                                         my $s = "RBN:SKIM quality reset key: '$sp' (lastin: $lastin Q now: $quality was $oq skimmers now: $sk was: $or)";
644                                                         dbg($s);
645                                                 }
646                                                 # remove the excess
647                                                 my @ncand = (@$cand[CTime, CQual], values %seen);
648                                                 $spots->{$sp} = \@ncand;
649                                         }
650                                 }
651
652                                 # we now kick this spot into Limbo 
653                                 if ($quality < $minqual) {
654                                         next;
655                                 }
656
657                                 $quality = 9 if $quality > 9;
658                                 $cand->[CQual] = $quality if $quality > $cand->[CQual];
659
660                                 # this scores each candidate according to its skimmer's QRG score (i.e. how often it agrees with its peers)
661                                 # what happens is hash of all QRGs in candidates are incremented by that skimmer's reputation for "accuracy"
662                                 # or, more exactly, past agreement with the consensus. This score can be from -5 -> +5. 
663                                 my %qrg = ();
664                                 my $skimmer;
665                                 my $sk;
666                                 my $band;
667                                 my %seen = ();
668                                 foreach $r (@$cand) {
669                                         next unless ref $r;
670                                         if (exists $seen{$r->[ROrigin]}) {
671                                                 $r = 0;
672                                                 next;
673                                         }
674                                         $seen{$r->[ROrigin]} = 1;
675                                         $band ||= int $r->[RQrg] / 1000;
676                                         $sk = "SKIM|$r->[ROrigin]|$band"; # thus only once per set of candidates
677                                         $skimmer = $spots->{$sk};
678                                         unless ($skimmer) {
679                                                 $skimmer = $spots->{$sk} = [1, 0, 0, $now, []]; # this first time, this new skimmer gets the benefit of the doubt on frequency.
680                                                 dbg("RBN:SKIM new slot $sk " . $json->encode($skimmer)) if  $rbnskim && isdbg('rbnskim');
681                                         }
682                                         $qrg{$r->[RQrg]} += ($skimmer->[DScore] || 1);
683                                 }
684                                 
685                                 # determine the most likely qrg and then set it - NOTE (-)ve votes, generated by the skimmer scoring system above, are ignored
686                                 my @deviant;
687                                 my $c = 0;
688                                 my $mv = 0;
689                                 my $qrg = 0;
690                                 while (my ($k, $votes) = each %qrg) {
691                                         if ($votes >= $mv) {
692                                                 $qrg = $k;
693                                                 $mv = $votes;
694                                         }
695                                         ++$c;
696                                 }
697
698                                 # Ignore possible spots with 0 QRG score - as determined by the skimmer scoring system above -  as they are likely to be wrong 
699                                 unless ($qrg > 0) {
700                                         if ( $rbnskim && isdbg('rbnskim')) {
701                                                 my $keys;
702                                                 while (my ($k, $v) = (each %qrg)) {
703                                                         $keys .= "$k=>$v, ";
704                                                 }
705                                                 $keys =~ /,\s*$/;
706                                                 my $i = 0;
707                                                 foreach $r (@$cand) {
708                                                         next unless $r && ref $r;
709                                                         dbg "RBN:SKIM cand $i QRG likely wrong from '$sp' = $r->[RCall] on $r->[RQrg] by $r->[ROrigin] \@ $r->[RTime] (qrgs: $keys c: $c) route: $dxchan->{call}, ignored";
710                                                         ++$i;
711                                                 }
712                                         }
713                                         delete $spots->{$sp}; # get rid
714                                         delete $dxchan->{queue}->{$sp};
715                                         next;
716                                 }
717
718                                 # detemine and spit out the deviants. Then adjust the scores according to whether it is a deviant or good
719                                 # NOTE: deviant nodes can become good (or less bad), and good nodes bad (or less good) on each spot that
720                                 # they generate. This is based solely on each skimmer's agreement (or not) with the "consensus" score generated
721                                 # above ($qrg). The resultant score + good + bad is stored per band and will be used the next time a spot
722                                 # appears on this band from each skimmer.
723                                 foreach $r (@$cand) {
724                                         next unless $r && ref $r;
725                                         my $diff = $c > 1 ? nearest(.1, $r->[RQrg] - $qrg) : 0;
726                                         $sk = "SKIM|$r->[ROrigin]|$band";
727                                         $skimmer = $spots->{$sk};
728                                         if ($diff) {
729                                                 ++$skimmer->[DBad] if $skimmer->[DBad] < $maxdeviants;
730                                                 --$skimmer->[DGood] if $skimmer->[DGood] > 0;
731                                                 push @deviant, sprintf("$r->[ROrigin]:%+.1f", $diff);
732                                                 push @{$skimmer->[DEviants]}, $diff;
733                                                 shift @{$skimmer->[DEviants]} while @{$skimmer->[DEviants]} > $maxdeviants;
734                                         } else {
735                                                 ++$skimmer->[DGood] if $skimmer->[DGood] < $maxdeviants;
736                                                 --$skimmer->[DBad] if $skimmer->[DBad] > 0;
737                                                 shift @{$skimmer->[DEviants]};
738                                         }
739                                         $skimmer->[DScore] = $skimmer->[DGood] - $skimmer->[DBad];
740                                         if ($rbnskim && isdbg('rbnskim')) {
741                                                 my $lastin = difft($skimmer->[DLastin], $now, 2);
742                                                 my $difflist = join(', ', @{$skimmer->[DEviants]});
743                                                 $difflist = " band qrg diffs: $difflist" if $difflist;
744                                                 dbg("RBN:SKIM key $sp slot $sk $r->[RQrg] - $qrg = $diff Skimmer score: $skimmer->[DGood] - $skimmer->[DBad] = $skimmer->[DScore] lastseen:$lastin ago$difflist"); 
745                                         }
746                                         $skimmer->[DLastin] = $now;
747                                         $r->[RSpotData]->[SQrg] = $qrg if $qrg && $c > 1; # set all the QRGs to the agreed value
748                                 }
749
750                                 $qrg = (sprintf "%.1f",  $qrg)+0;
751                                 $r = $cand->[CData];
752                                 $r->[RQrg] = $qrg;
753                                 my $squality = "Q:$cand->[CQual]";
754                                 $squality .= '*' if $c > 1; 
755                                 $squality .= '+' if $r->[Respot];
756
757                                 if (isdbg('progress')) {
758                                         my $rt = difft($ctime, $now, 2);
759                                         my $s = "RBN: SPOT key: '$sp' = $r->[RCall] on $r->[RQrg] by $r->[ROrigin] \@ $r->[RTime] $squality route: $dxchan->{call} dwell:$rt";
760                                         my $td = @deviant;
761                                         $s .= " QRGScore: $mv Deviants: $td/$spotters";
762                                         $s .= ' (' . join(', ', sort @deviant) . ')' if $td;
763                                         dbg($s);
764                                 }
765
766                                 # finally send it out to any waiting public
767                                 send_dx_spot($dxchan, $squality, $cand);
768                                 
769                                 # clear out the data and make this now just "spotted", but no further action required until respot time
770                                 dbg "RBN: QUEUE key '$sp' cleared" if isdbg 'rbn';
771
772                                 delete $dxchan->{queue}->{$sp};
773
774                                 # calculate new sp (which will be 70% likely the same as the old one)
775                                 # we do this to cope with the fact that the first spotter may well be "wrongly calibrated" giving a qrg that disagrees with the majority.
776                                 # and we want to store the key that corresponds to majority opinion. 
777                                 my $nqrg = nearest(1, $qrg * 10);  # normalised to nearest Khz
778                                 my $nsp = "$r->[RCall]|$nqrg";
779                                 if ($sp ne $nsp) {
780                                         dbg("RBN:SKIM CHANGE KEY sp '$sp' -> '$nsp' for storage") if  $rbnskim && isdbg('rbnskim');
781                                         delete $spots->{$sp};
782                                         $spots->{$nsp} = [$now, $cand->[CQual]];
783                                 } else {
784                                         $spots->{$sp} = [$now, $cand->[CQual]];
785                                 }
786                         }
787                         else {
788                                 dbg sprintf("RBN: QUEUE key: '$sp' SEND time not yet reached %.1f secs left", $cand->[CTime] + $dwelltime - $now) if isdbg 'rbnqueue'; 
789                         }
790                 }
791                 if (isdbg('rbntimer')) {
792                         my $diff = _diffus($ta);
793                         dbg "RBN: TIMER process queue for call: $dxchan->{call} $items spots $diff uS";
794                 }
795         }
796 }
797
798 sub per_minute
799 {
800         foreach my $dxchan (DXChannel::get_all()) {
801                 next unless $dxchan->is_rbn;
802                 dbg "RBN:STATS minute $dxchan->{call} raw: $dxchan->{noraw} retrieved spots: $dxchan->{norbn} delivered: $dxchan->{nospot} after filtering to users: " . scalar keys %{$dxchan->{nousers}} if isdbg('rbnstats');
803                 if ($dxchan->{noraw} == 0 && $dxchan->{lasttime} > 60) {
804                         LogDbg('RBN', "RBN: no input from $dxchan->{call}, disconnecting");
805                         $dxchan->disconnect;
806                 }
807                 $dxchan->{noraw} = $dxchan->{norbn} = $dxchan->{nospot} = 0; $dxchan->{nousers} = {};
808                 $runtime{$dxchan->{call}} += 60;
809         }
810
811         # save the spot cache
812         write_cache() unless $main::systime + $startup_delay < $main::systime;;
813 }
814
815 sub per_10_minute
816 {
817         my $count = 0;
818         my $removed = 0;
819         while (my ($k,$cand) = each %{$spots}) {
820                 next if $k eq 'VERSION';
821                 next if $k =~ /^O\|/;
822                 next if $k =~ /^SKIM\|/;
823                 
824                 if ($main::systime - $cand->[CTime] > $minspottime*2) {
825                         delete $spots->{$k};
826                         ++$removed;
827                 }
828                 else {
829                         ++$count;
830                 }
831         }
832         dbg "RBN:STATS spot cache remain: $count removed: $removed"; # if isdbg('rbn');
833         foreach my $dxchan (DXChannel::get_all()) {
834                 next unless $dxchan->is_rbn;
835                 my $nq = keys %{$dxchan->{queue}};
836                 my $pc = $dxchan->{noraw10} ? sprintf("%.1f%%",$dxchan->{norbn10}*100/$dxchan->{noraw10}) : '0.0%';
837                 dbg "RBN:STATS 10-minute $dxchan->{call} queue: $nq raw: $dxchan->{noraw10} retrieved spots: $dxchan->{norbn10} ($pc) delivered: $dxchan->{nospot10} after filtering to  users: " . scalar keys %{$dxchan->{nousers10}};
838                 $dxchan->{noraw10} = $dxchan->{norbn10} = $dxchan->{nospot10} = 0; $dxchan->{nousers10} = {};
839         }
840 }
841
842 sub per_hour
843 {
844         foreach my $dxchan (DXChannel::get_all()) {
845                 next unless $dxchan->is_rbn;
846                 my $nq = keys %{$dxchan->{queue}};
847                 my $pc = $dxchan->{norawhour} ? sprintf("%.1f%%",$dxchan->{norbnhour}*100/$dxchan->{norawhour}) : '0.0%';
848                 dbg "RBN:STATS hour $dxchan->{call} queue: $nq raw: $dxchan->{norawhour} retrieved spots: $dxchan->{norbnhour} ($pc) delivered: $dxchan->{nospothour} after filtering to users: " . scalar keys %{$dxchan->{nousershour}};
849                 $dxchan->{norawhour} = $dxchan->{norbnhour} = $dxchan->{nospothour} = 0; $dxchan->{nousershour} = {};
850         }
851 }
852
853 sub finish
854 {
855         write_cache();
856 }
857
858 sub write_cache
859 {
860         my $ta = [ gettimeofday ];
861         $json->indent(1)->canonical(1) if isdbg 'rbncache';
862         my $s = eval {$json->encode($spots)};
863         if ($s) {
864                 my $fh = IO::File->new(">$cachefn") or confess("writing $cachefn $!");
865                 $fh->print($s);
866                 $fh->close;
867         } else {
868                 dbg("RBN:Write_cache error '$@'");
869                 return;
870         }
871         $json->indent(0)->canonical(0);
872         my $diff = _diffms($ta);
873         my $size = sprintf('%.3fKB', (length($s) / 1000));
874         dbg("RBN:WRITE_CACHE size: $size time to write: $diff mS");
875 }
876
877 sub check_cache
878 {
879         if (-e $cachefn) {
880                 my $mt = (stat($cachefn))[9];
881                 my $t = $main::systime - $mt || 1;
882                 my $p = difft($mt, 2);
883                 if ($t < $cache_valid) {
884                         dbg("RBN:check_cache '$cachefn' spot cache exists, created $p ago and not too old");
885                         my $fh = IO::File->new($cachefn);
886                         my $s;
887                         if ($fh) {
888                                 local $/ = undef;
889                                 $s = <$fh>;
890                                 dbg("RBN:check_cache cache read size " . length $s);
891                                 $fh->close;
892                         } else {
893                                 dbg("RBN:check_cache file read error $!");
894                                 return undef;
895                         }
896                         if ($s) {
897                                 eval {$spots = $json->decode($s)};
898                                 if ($spots && ref $spots) {     
899                                         if (exists $spots->{VERSION} && $spots->{VERSION} == $DATA_VERSION) {
900                                                 # now clean out anything that has spot build ups in progress
901                                                 while (my ($k, $cand) = each %$spots) {
902                                                         next if $k eq 'VERSION';
903                                                         next if $k =~ /^O\|/;
904                                                         next if $k =~ /^SKIM\|/;
905                                                         if (@$cand > CData) {
906                                                                 $spots->{$k} = [$cand->[CTime], $cand->[CQual]];
907                                                         }
908                                                 }
909                                                 dbg("RBN:check_cache spot cache restored");
910                                                 return 1;
911                                         } 
912                                 }
913                                 dbg("RBN::checkcache error decoding $@");
914                         }
915                 } else {
916                         my $d = difft($main::systime-$cache_valid);
917                         dbg("RBN::checkcache '$cachefn' created $p ago is too old (> $d), ignored");
918                 }
919         } else {
920                 dbg("RBN:check_cache '$cachefn' spot cache not present");
921         }
922         
923         return undef;
924 }
925
926 1;