ec9f39467942ef2014557938a393f44482068164
[spider.git] / perl / RBN.pm
1 #
2 # The RBN connection system
3 #
4 # Copyright (c) 2020 Dirk Koopman G1TLH
5 #
6
7 use warnings;
8 use strict;
9
10 package RBN;
11
12 use 5.10.1;
13
14 use lib qw {.};
15
16 use DXDebug;
17 use DXUtil;
18 use DXLog;
19 use DXUser;
20 use DXChannel;
21 use Math::Round qw(nearest nearest_floor);
22 use Date::Parse;
23 use Time::HiRes qw(gettimeofday);
24 use Spot;
25 use DXJSON;
26 use IO::File;
27
28 use constant {
29                           ROrigin => 0,
30                           RQrg => 1,
31                           RCall => 2,
32                           RMode => 3,
33                           RStrength => 4,
34                           RTime => 5,
35                           RUtz => 6,
36                           Respot => 7,
37                           RQra => 8,
38                           RSpotData => 9,
39                          };
40
41 use constant {
42                           SQrg => 0,
43                           SCall => 1,
44                           STime => 2,
45                           SComment => 3,
46                           SOrigin => 4,
47                           SZone => 11,
48                          };
49 use constant {
50                           OQual => 0,
51                           OAvediff => 1,
52                           OSpare => 2,
53                           ODiff => 3,
54                          };
55 use constant {
56                           CTime => 0,
57                           CQual => 1,
58                           CData => 2,
59                          };
60
61 use constant {
62                           DScore => 0,
63                           DGood => 1,
64                           DBad => 2,
65                           DLastin => 3,
66                           DEviants => 4,
67                          };
68
69
70 our $DATA_VERSION = 1;
71
72 our @ISA = qw(DXChannel);
73
74 our $startup_delay = 5*60;              # don't send anything out until this timer has expired
75                                 # this is to allow the feed to "warm up" with duplicates
76                                 # so that the "big rush" doesn't happen.
77
78 our $minspottime = 30*60;               # the time between respots of a callsign - if a call is
79                                 # still being spotted (on the same freq) and it has been
80                                 # spotted before, it's spotted again after this time
81                                 # until the next minspottime has passed.
82
83 our $beacontime = 5*60;                 # same as minspottime, but for beacons (and shorter)
84
85 our $dwelltime = 10;                    # the amount of time to wait for duplicates before issuing
86                                 # a spot to the user (no doubt waiting with bated breath).
87
88 our $limbotime = 5*60;                  # if there are fewer than $minqual candidates and $dwelltime
89                                 # has expired then allow this spot to live a bit longer. It may
90                                 # simply be that it is not in standard spot coverage. (ask G4PIQ
91                                 # about this).
92
93 our $filterdef = $Spot::filterdef; # we use the same filter as the Spot system. Can't think why :-).
94
95 my $spots;                                              # the GLOBAL spot cache
96
97 my %runtime;                                    # how long each channel has been running
98
99 our $cachefn = localdata('rbn_cache');
100 our $cache_valid = 4*60;                # The cache file is considered valid if it is not more than this old
101
102 our $maxqrgdiff = 10;                   # the maximum
103 our $minqual = 2;                               # the minimum quality we will accept for output
104 our $maxqual = 9;                               # if there is enough quality, then short circuit any remaining dwelltime.
105
106 my $json;
107 my $noinrush = 0;                               # override the inrushpreventor if set
108 our $maxdeviants = 5;                   # the number of deviant QRGs to record for skimmer records
109
110 sub init
111 {
112         $json = DXJSON->new;
113         $json->canonical(0);
114         if (check_cache()) {
115                 $noinrush = 1;
116         } else {
117                 $spots = {VERSION=>$DATA_VERSION};
118         }
119         if (defined $DB::VERSION) {
120                 $noinrush = 1;
121                 $json->indent(1);
122         }
123         
124 }
125
126 sub new 
127 {
128         my $self = DXChannel::alloc(@_);
129
130         # routing, this must go out here to prevent race condx
131         my $pkg = shift;
132         my $call = shift;
133
134         $self->{last} = 0;
135         $self->{noraw} = 0;
136         $self->{nospot} = 0;
137         $self->{nouser} = {};
138         $self->{norbn} = 0;
139         $self->{noraw10} = 0;
140         $self->{nospot10} = 0;
141         $self->{nouser10} = {};
142         $self->{norbn10} = 0;
143         $self->{nospothour} = 0;
144         $self->{nouserhour} = {};
145         $self->{norbnhour} = 0;
146         $self->{norawhour} = 0;
147         $self->{sort} = 'N';
148         $self->{lasttime} = $main::systime;
149         $self->{minspottime} = $minspottime;
150         $self->{beacontime} = $beacontime;
151         $self->{showstats} = 0;
152         $self->{pingint} = 0;
153         $self->{nopings} = 0;
154         $self->{queue} = {};
155
156         return $self;
157 }
158
159 sub start
160
161         my ($self, $line, $sort) = @_;
162         my $user = $self->{user};
163         my $call = $self->{call};
164         my $name = $user->{name};
165                 
166         # log it
167         my $host = $self->{conn}->peerhost;
168         $host ||= "unknown";
169         $self->{hostname} = $host;
170
171         $self->{name} = $name ? $name : $call;
172         $self->state('prompt');         # a bit of room for further expansion, passwords etc
173         $self->{lang} = $user->lang || $main::lang || 'en';
174         if ($line =~ /host=/) {
175                 my ($h) = $line =~ /host=(\d+\.\d+\.\d+\.\d+)/;
176                 $line =~ s/\s*host=\d+\.\d+\.\d+\.\d+// if $h;
177                 unless ($h) {
178                         ($h) = $line =~ /host=([\da..fA..F:]+)/;
179                         $line =~ s/\s*host=[\da..fA..F:]+// if $h;
180                 }
181                 if ($h) {
182                         $h =~ s/^::ffff://;
183                         $self->{hostname} = $h;
184                 }
185         }
186         $self->{width} = 80 unless $self->{width} && $self->{width} > 80;
187         $self->{consort} = $line;       # save the connection type
188
189         LogDbg('DXCommand', "$call connected from $self->{hostname}");
190
191         # set some necessary flags on the user if they are connecting
192         $self->{registered} = 1;
193         # sort out privilege reduction
194         $self->{priv} = 0;
195
196         # get the filters
197         my $nossid = $call;
198         $nossid =~ s/-\d+$//;
199
200         $self->{inrbnfilter} = Filter::read_in('rbn', $call, 1) 
201                 || Filter::read_in('rbn', 'node_default', 1);
202         
203         # clean up qra locators
204         my $qra = $user->qra;
205         $qra = undef if ($qra && !DXBearing::is_qra($qra));
206         unless ($qra) {
207                 my $lat = $user->lat;
208                 my $long = $user->long;
209                 $user->qra(DXBearing::lltoqra($lat, $long)) if (defined $lat && defined $long);  
210         }
211
212         # if we have been running and stopped for a while 
213         # if the cache is warm enough don't operate the inrush preventor
214         $self->{inrushpreventor} = exists $runtime{$call} && $runtime{$call} > $startup_delay || $noinrush ?  0 : $main::systime + $startup_delay;
215         dbg("RBN: noinrush: $noinrush, setting inrushpreventor on $self->{call} to $self->{inrushpreventor}");
216 }
217
218 my @queue;                                              # the queue of spots ready to send
219
220 sub normal
221 {
222         my $self = shift;
223         my $line = shift;
224         my @ans;
225         my $dbgrbn = isdbg('rbn');
226         
227         # remove leading and trailing spaces
228         chomp $line;
229         $line =~ s/^\s*//;
230         $line =~ s/\s*$//;
231
232         # add base RBN
233
234         my $now = $main::systime;
235
236         # parse line
237         dbg "RBN:RAW,$line" if isdbg('rbnraw');
238         return unless $line=~/^DX\s+de/;
239
240         my (undef, undef, $origin, $qrg, $call, $mode, $s, $m, $spd, $u, $sort, $t, $tx) = split /[:\s]+/, $line;
241
242         # fix up FT8 spots from 7001
243         $t = $u, $u = '' if !$t && is_ztime($u);
244         $t = $sort, $sort = '' if !$t && is_ztime($sort);
245         my $qra = $spd, $spd = '' if is_qra($spd);
246         $u = $qra if $qra;
247
248         # is this anything like a callsign?
249         unless (is_callsign($call)) {
250                 dbg("RBN: ERROR $call from $origin on $qrg is invalid, dumped");
251                 return;
252         }
253
254         # remove all extraneous crap from the origin - just leave the base callsign
255         my $norigin = basecall($origin);
256         unless ($norigin) {
257                 dbg("RBN: ERROR '$origin' is an invalid callsign, dumped");
258                 return;
259         }
260         $origin = $norigin;
261
262         # is this callsign in badspotter list?
263         if ($DXProt::badspotter->in($origin) || $DXProt::badnode->in($origin)) {
264                 dbg("RBN: ERROR $origin is a bad spotter/node, dumped");
265                 return;
266         }
267         
268         # is the qrg valid
269         unless ($qrg =~ /^\d+\.\d{1,3}$/) {
270                 dbg("RBN: ERROR qrg $qrg from $origin invalid, dumped");
271                 return;
272         }
273
274         $sort ||= '';
275         $tx ||= '';
276         $qra ||= '';
277     dbg qq{RBN:input decode or:$origin qr:$qrg ca:$call mo:$mode s:$s m:$m sp:$spd u:$u sort:$sort t:$t tx:$tx qra:$qra} if $dbgrbn && isdbg('rbn');
278
279         ++$self->{noraw};
280         ++$self->{noraw10};
281         ++$self->{norawhour};
282         
283         my $b;
284         
285         if ($t || $tx) {
286
287                 # fix up times for things like 'NXDXF B' etc
288                 if ($tx && is_ztime($t)) {
289                         if (is_ztime($tx)) {
290                                 $b = $t;
291                                 $t = $tx;
292                         } else {
293                                 dbg "RBN:ERR,$line";
294                                 return (0);
295                         }
296                 }
297                 if ($sort && $sort eq 'NCDXF') {
298                         $mode = 'DXF';
299                         $t = $tx;
300                 }
301                 if ($sort && $sort eq 'BEACON') {
302                         $mode = 'BCN';
303                 }
304                 if ($mode =~ /^PSK/) {
305                         $mode = 'PSK';
306                 }
307                 if ($mode eq 'RTTY') {
308                         $mode = 'RTT';
309                 }
310
311                 # The main de-duping key is [call, $frequency], but we probe a bit around that frequency to find a
312                 # range of concurrent frequencies that might be in play. 
313
314                 # The key to this is deducing the true callsign by "majority voting" (the greater the number of spotters
315         # the more effective this is) together with some lexical analsys probably in conjuction with DXSpider
316                 # data sources (for singleton spots) to then generate a "centre" from and to zone (whatever that will mean if it isn't the usual one)
317                 # and some heuristical "Kwalitee" rating given distance from the zone centres of spotter, recipient user
318         # and spotted. A map can be generated once per user and spotter as they are essentially mostly static. 
319                 # The spotted will only get a coarse position unless other info is available. Programs that parse 
320                 # DX bulletins and the online data online databases could be be used and then cached. 
321
322                 # Obviously users have to opt in to receiving RBN spots and other users will simply be passed over and
323                 # ignored.
324
325                 # Clearly this will only work in the 'mojo' branch of DXSpider where it is possible to pass off external
326                 # data requests to ephemeral or semi resident forked processes that do any grunt work and the main
327                 # process to just the standard "message passing" which has been shown to be able to sustain over 5000 
328                 # per second (limited by the test program's output and network speed, rather than DXSpider's handling).
329
330                 my $search = 5;
331                 my $nqrg = nearest(1, $qrg * 10);  # normalised to nearest Khz
332                 my $sp = "$call|$nqrg";           # hopefully the skimmers will be calibrated at least this well!
333
334                 # find it?
335                 my $cand = $spots->{$sp};
336                 unless ($cand) {
337                         my ($i, $new);
338                         for ($i = $nqrg; !$cand && $i <= $nqrg+$search; $i += 1) {
339                                 $new = "$call|$i";
340                                 $cand = $spots->{$new}, last if exists $spots->{$new};
341                         }
342                         if ($cand) {
343                                 my $diff = $i - $nqrg;
344                                 dbg(qq{RBN: QRG Diff using $new (+$diff) for $sp for qrg $qrg}) if (isdbg('rbnqrg') || ($dbgrbn && isdbg('rbn')));
345                                 $sp = $new;
346                         }
347                 }
348                 unless ($cand) {
349                         my ($i, $new);
350                         for ($i = $nqrg; !$cand && $i >= $nqrg-$search; $i -= 1) {
351                                 $new = "$call|$i";
352                                 $cand = $spots->{$new}, last if exists $spots->{$new};
353                         }
354                         if ($cand) {
355                                 my $diff = $nqrg - $i;
356                                 dbg(qq{RBN: QRG Diff using $new (-$diff) for $sp for qrg $qrg}) if (isdbg('rbnqrg') || ($dbgrbn && isdbg('rbn')));
357                                 $sp = $new;
358                         }
359                 }
360                 
361                 # if we have one and there is only one slot and that slot's time isn't expired for respot then return
362                 my $respot = 0;
363                 if ($cand && ref $cand) {
364                         if (@$cand <= CData) {
365                                 if ($self->{minspottime} > 0 && $now - $cand->[CTime] < $self->{minspottime}) {
366                                         dbg("RBN: key: '$sp' call: $call qrg: $qrg DUPE \@ ". atime(int $cand->[CTime])) if $dbgrbn && isdbg('rbn');
367                                         return;
368                                 }
369                                 
370                                 dbg("RBN: key: '$sp' RESPOTTING call: $call qrg: $qrg last seen \@ ". atime(int $cand->[CTime])) if $dbgrbn && isdbg('rbn');
371                                 $cand->[CTime] = $now;
372                                 ++$respot;
373                         }
374
375                         # otherwise we have a spot being built up at the moment
376                 } elsif ($cand) {
377                         dbg("RBN: key '$sp' = '$cand' not ref");
378                         return;
379                 } else {
380                         # new spot / frequency
381                         $spots->{$sp} = $cand = [$now, 0];
382                         dbg("RBN: key: '$sp' call: $call qrg: $qrg NEW" . ($respot ? ' RESPOT' : '')) if $dbgrbn && isdbg('rbn');
383                 }
384
385                 # add me to the display queue unless we are waiting for initial in rush to finish
386                 return unless $noinrush || $self->{inrushpreventor} < $main::systime;
387
388                 # build up a new record and store it in the buildup
389                 # deal with the unix time
390                 my ($hh,$mm) = $t =~ /(\d\d)(\d\d)Z$/;
391                 my $utz = $hh*3600 + $mm*60 + $main::systime_daystart; # possible issue with late spot from previous day
392                 $utz -= 86400 if $utz > $now+3600;                                         # too far ahead, drag it back one day
393
394                 # create record and add into the buildup
395                 my $r = [$origin, nearest(.1, $qrg), $call, $mode, $s, $t, $utz, $respot, $u];
396                 my @s =  Spot::prepare($r->[RQrg], $r->[RCall], $r->[RUtz], '', $r->[ROrigin]);
397                 if ($s[5] == 666) {
398                         dbg("RBN: ERROR invalid prefix/callsign $call from $origin-# on $qrg, dumped");
399                         return;
400                 }
401                 
402                 if ($self->{inrbnfilter}) {
403                         my ($want, undef) = $self->{inrbnfilter}->it($s);
404                         return unless $want;    
405                 }
406                 $r->[RSpotData] = \@s;
407
408                 ++$self->{queue}->{$sp};# unless @$cand>= CData; # queue the KEY (not the record)
409
410                 dbg("RBN: key: '$sp' ADD RECORD call: $call qrg: $qrg origin: $origin respot: $respot") if $dbgrbn && isdbg('rbn');
411
412                 push @$cand, $r;
413
414         } else {
415                 dbg "RBN:DATA,$line" if $dbgrbn && isdbg('rbn');
416         }
417 }
418
419 # we should get the spot record minus the time, so just an array of record (arrays)
420 sub send_dx_spot
421 {
422         my $self = shift;
423         my $quality = shift;
424         my $cand = shift;
425
426         ++$self->{norbn};
427         ++$self->{norbn10};
428         ++$self->{norbnhour};
429         
430         # $r = [$origin, $qrg, $call, $mode, $s, $utz, $respot];
431
432         my $mode = $cand->[CData]->[RMode]; # as all the modes will be the same;
433         
434         my @dxchan = DXChannel::get_all();
435
436         foreach my $dxchan (@dxchan) {
437                 next unless $dxchan->is_user;
438                 my $user = $dxchan->{user};
439                 next unless $user &&  $user->wantrbn;
440
441                 # does this user want this sort of spot at all?
442                 my $want = 0;
443                 ++$want if $user->wantbeacon && $mode =~ /^BCN|DXF/;
444                 ++$want if $user->wantcw && $mode =~ /^CW/;
445                 ++$want if $user->wantrtty && $mode =~ /^RTT/;
446                 ++$want if $user->wantpsk && $mode =~ /^PSK|FSK|MSK/;
447                 ++$want if $user->wantft && $mode =~ /^FT/;
448
449                 dbg(sprintf("RBN: spot selection for $dxchan->{call} mode: '$mode' want: $want flags rbn:%d ft:%d bcn:%d cw:%d psk:%d rtty:%d",
450                                         $user->wantrbn,
451                                         $user->wantft,
452                                         $user->wantbeacon,
453                                         $user->wantcw,
454                                         $user->wantpsk,
455                                         $user->wantrtty,
456                                    )) if isdbg('rbnll');
457
458                 # send one spot to one user out of the ones that we have
459                 $self->dx_spot($dxchan, $quality, $cand) if $want;
460         }
461 }
462
463 sub dx_spot
464 {
465         my $self = shift;
466         my $dxchan = shift;
467         my $quality = shift;
468         my $cand = shift;
469         my $call = $dxchan->{call};
470         my $seeme = $dxchan->user->rbnseeme();
471         my $strength = 100;             # because it could if we talk about FTx
472         my $saver;
473         my %zone;
474         my $respot;
475         my $qra;
476
477         ++$self->{nousers}->{$call};
478         ++$self->{nousers10}->{$call};
479         ++$self->{nousershour}->{$call};
480
481         my $filtered;
482         my $rf = $dxchan->{rbnfilter} || $dxchan->{spotsfilter};
483         my $comment;
484         
485         foreach my $r (@$cand) {
486                 # $r = [$origin, $qrg, $call, $mode, $s, $t, $utz, $respot, $qra];
487                 # Spot::prepare($qrg, $call, $utz, $comment, $origin);
488                 next unless $r && ref $r;
489
490                 $qra = $r->[RQra] if !$qra && $r->[RQra] && is_qra($r->[RQra]);
491
492                 $comment = sprintf "%-3s %2ddB $quality", $r->[RMode], $r->[RStrength];
493                 my $s = $r->[RSpotData];                # the prepared spot
494                 $s->[SComment] = $comment;              # apply new generated comment
495
496                 ++$zone{$s->[SZone]};           # save the spotter's zone
497
498                 # if the 'see me' flag is set, then show all the spots without further adornment (see set/rbnseeme for more info)
499                 if ($seeme) {
500                         send_final($dxchan, $s);
501                         next;
502                 }
503
504                 # save the lowest strength one
505                 if ($r->[RStrength] < $strength) {
506                         $strength = $r->[RStrength];
507                         $saver = $s;
508                         dbg("RBN: STRENGTH spot: $s->[SCall] qrg: $s->[SQrg] origin: $s->[SOrigin] dB: $r->[RStrength] < $strength") if isdbg 'rbnll';
509                 }
510
511                 if ($rf) {
512                         my ($want, undef) = $rf->it($s);
513                         dbg("RBN: FILTERING for $call spot: $s->[SCall] qrg: $s->[SQrg] origin: $s->[SOrigin] dB: $r->[RStrength] com: '$s->[SComment]' want: " . ($want ? 'YES':'NO')) if isdbg 'rbnll';
514                         next unless $want;
515                         $filtered = $s;
516                 }
517         }
518
519         if ($rf) {
520                 $saver = $filtered;             # if nothing passed the filter's lips then $saver == $filtered == undef !
521         }
522         
523         if ($saver) {
524                 my $buf;
525                 # create a zone list of spotters
526                 delete $zone{$saver->[SZone]};  # remove this spotter's zone (leaving all the other zones)
527                 my $z = join ',', sort {$a <=> $b} keys %zone;
528
529                 # alter spot data accordingly
530                 $saver->[SComment] .= " Z:$z" if $z;
531                 
532                 send_final($dxchan, $saver);
533                 
534                 ++$self->{nospot};
535                 ++$self->{nospot10};
536                 ++$self->{nospothour};
537                 
538                 if ($qra) {
539                         my $user = DXUser::get_current($saver->[SCall]) || DXUser->new($saver->[SCall]);
540                         unless ($user->qra && is_qra($user->qra)) {
541                                 $user->qra($qra);
542                                 dbg("RBN: update qra on $saver->[SCall] to $qra");
543                         }
544                         # update lastseen if nothing else
545                         $user->put;
546                 }
547         }
548 }
549
550 sub send_final
551 {
552         my $dxchan = shift;
553         my $saver = shift;
554         my $call = $dxchan->{call};
555         my $buf;
556         
557         dbg("RBN: SENDING to $call spot: $saver->[SCall] qrg: $saver->[SQrg] origin: $saver->[SOrigin] $saver->[SComment]") if isdbg 'rbnll';
558         if ($dxchan->{ve7cc}) {
559                 my $call = $saver->[SOrigin];
560                 $saver->[SOrigin] .= '-#';
561                 $buf = VE7CC::dx_spot($dxchan, @$saver);
562                 $saver->[SOrigin] = $call;
563         } else {
564                 my $call = $saver->[SOrigin];
565                 $saver->[SOrigin] = substr($call, 0, 6);
566                 $saver->[SOrigin] .= '-#';
567                 $buf = $dxchan->format_dx_spot(@$saver);
568                 $saver->[SOrigin] = $call;
569         }
570         $dxchan->local_send('N', $buf);
571 }
572
573 # per second
574 sub process
575 {
576         my $rbnskim = isdbg('rbnskim');
577         
578         foreach my $dxchan (DXChannel::get_all()) {
579                 next unless $dxchan->is_rbn;
580
581                 # At this point we run the queue to see if anything can be sent onwards to the punter
582                 my $now = $main::systime;
583                 my $ta = [gettimeofday];
584                 my $items = 0;
585                 
586                 # now run the waiting queue which just contains KEYS ($call|$qrg)
587                 foreach my $sp (keys %{$dxchan->{queue}}) {
588                         my $cand = $spots->{$sp};
589                         ++$items;
590                         
591                         unless ($cand && $cand->[CTime]) {
592                                 dbg "RBN Cand $sp " . ($cand ? 'def' : 'undef') . " [CTime] " . ($cand->[CTime] ? 'def' : 'undef') . " dwell $dwelltime";
593                                 delete $spots->{$sp};
594                                 delete $dxchan->{queue}->{$sp};    # remove
595                                 next;
596                         }
597                         
598                         my $ctime = $cand->[CTime];
599                         my $quality = @$cand - CData;
600                         my $dwellsecs =  $now - $ctime;
601                         if ($quality >= $maxqual || $dwellsecs >= $dwelltime || $dwellsecs >= $limbotime) {
602                                 # we have a candidate, create qualitee value(s);
603                                 unless (@$cand > CData) {
604                                         dbg "RBN: QUEUE key '$sp' MISSING RECORDS, IGNORED" . dd($cand) if isdbg 'rbnqueue';
605                                         delete $spots->{$sp}; # don't remember it either - this means that a spot HAS to come in with sufficient spotters to be processed.
606                                         delete $dxchan->{queue}->{$sp};
607                                         next;
608                                 }
609                                 dbg "RBN: QUEUE PROCESSING key: '$sp' $now >= $cand->[CTime]" if isdbg 'rbnqueue'; 
610                                 my $spotters = $quality;
611
612                                 # dump it and remove it from the queue if it is of unadequate quality, but only if it is no longer in Limbo and can be reasonably passed on to its demise
613                                 my $r = $cand->[CData];
614                                 if ($dwellsecs > $limbotime && $quality < $minqual) {
615                                         if ( $rbnskim && isdbg('rbnskim')) {
616                                                 $r = $cand->[CData];
617                                                 if ($r) {
618                                                         my $lastin = difft($ctime, $now, 2);
619                                                         my $s = "RBN:SKIM time in Limbo exceeded DUMPED (lastin: $lastin Q:$quality < Q:$minqual) key: '$sp' = $r->[RCall] on $r->[RQrg] by $r->[ROrigin] \@ $r->[RTime] route: $dxchan->{call}";
620                                                         dbg($s);
621                                                 }
622                                         }
623                                         delete $spots->{$sp}; # don't remember it either - this means that a spot HAS to come in with sufficient spotters to be processed.
624                                         delete $dxchan->{queue}->{$sp};
625                                         next;
626                                 }
627
628                                 # we have a possible removal from Limbo, check for more than one skimmer and reset the quality if required
629                                 # DOES THIS TEST CAUSE RACES?
630                                 if (!$r->[Respot] && $quality >= $minqual && $dwellsecs > $dwelltime+1) {
631
632                                         # because we don't need to check for repeats by the same skimmer in the normal case, we do here
633                                         my %seen;
634                                         my @origin;
635                                         foreach my $wr (@$cand) {
636                                                 next unless ref $wr;
637                                                 push @origin, $wr->[ROrigin];
638                                                 if (exists $seen{$wr->[ROrigin]}) {
639                                                         next;
640                                                 }
641                                                 $seen{$wr->[ROrigin]} = $wr;
642                                         }
643                                         # reset the quality to ignore dupes
644                                         my $oq = $quality;
645                                         $quality = keys %seen;
646                                         if ($quality >= $minqual) {
647                                                 if ( $rbnskim && isdbg('rbnskim')) {
648                                                         my $lastin = difft($ctime, $now, 2);
649                                                         my $sk = join ' ', keys %seen;
650                                                         my $or = join ' ', @origin;
651                                                         my $s = "RBN:SKIM promoted from Limbo - key: '$sp' (lastin: $lastin Q now: $quality was $oq skimmers now: $sk";
652                                                         $s .= " was $or" if $or ne $sk;
653                                                         $s .= ')';
654                                                         dbg($s);
655                                                 } 
656                                         } elsif ($oq != $quality) {
657                                                 if ( $rbnskim && isdbg('rbnskim')) {
658                                                         my $lastin = difft($ctime, $now, 2);
659                                                         my $sk = join ' ', keys %seen;
660                                                         my $or = join ' ', @origin;
661                                                         my $s = "RBN:SKIM quality reset key: '$sp' (lastin: $lastin Q now: $quality was $oq skimmers now: $sk was: $or)";
662                                                         dbg($s);
663                                                 }
664                                                 # remove the excess
665                                                 my @ncand = (@$cand[CTime, CQual], values %seen);
666                                                 $spots->{$sp} = \@ncand;
667                                         }
668                                 }
669
670                                 # we now kick this spot into Limbo 
671                                 if ($quality < $minqual) {
672                                         next;
673                                 }
674
675                                 $quality = 9 if $quality > 9;
676                                 $cand->[CQual] = $quality if $quality > $cand->[CQual];
677
678                                 # this scores each candidate according to its skimmer's QRG score (i.e. how often it agrees with its peers)
679                                 # what happens is hash of all QRGs in candidates are incremented by that skimmer's reputation for "accuracy"
680                                 # or, more exactly, past agreement with the consensus. This score can be from -5 -> +5. 
681                                 my %qrg = ();
682                                 my $skimmer;
683                                 my $sk;
684                                 my $band;
685                                 my %seen = ();
686                                 foreach $r (@$cand) {
687                                         next unless ref $r;
688                                         if (exists $seen{$r->[ROrigin]}) {
689                                                 $r = 0;
690                                                 next;
691                                         }
692                                         $seen{$r->[ROrigin]} = 1;
693                                         $band ||= int $r->[RQrg] / 1000;
694                                         $sk = "SKIM|$r->[ROrigin]|$band"; # thus only once per set of candidates
695                                         $skimmer = $spots->{$sk};
696                                         unless ($skimmer) {
697                                                 $skimmer = $spots->{$sk} = [1, 0, 0, $now, []]; # this first time, this new skimmer gets the benefit of the doubt on frequency.
698                                                 dbg("RBN:SKIM new slot $sk " . $json->encode($skimmer)) if  $rbnskim && isdbg('rbnskim');
699                                         }
700                                         $qrg{$r->[RQrg]} += ($skimmer->[DScore] || 1);
701                                 }
702                                 
703                                 # determine the most likely qrg and then set it - NOTE (-)ve votes, generated by the skimmer scoring system above, are ignored
704                                 my @deviant;
705                                 my $c = 0;
706                                 my $mv = 0;
707                                 my $qrg = 0;
708                                 while (my ($k, $votes) = each %qrg) {
709                                         if ($votes >= $mv) {
710                                                 $qrg = $k;
711                                                 $mv = $votes;
712                                         }
713                                         ++$c;
714                                 }
715
716                                 # Ignore possible spots with 0 QRG score - as determined by the skimmer scoring system above -  as they are likely to be wrong 
717                                 unless ($qrg > 0) {
718                                         if ( $rbnskim && isdbg('rbnskim')) {
719                                                 my $keys;
720                                                 while (my ($k, $v) = (each %qrg)) {
721                                                         $keys .= "$k=>$v, ";
722                                                 }
723                                                 $keys =~ /,\s*$/;
724                                                 my $i = 0;
725                                                 foreach $r (@$cand) {
726                                                         next unless $r && ref $r;
727                                                         dbg "RBN:SKIM cand $i QRG likely wrong from '$sp' = $r->[RCall] on $r->[RQrg] by $r->[ROrigin] \@ $r->[RTime] (qrgs: $keys c: $c) route: $dxchan->{call}, ignored";
728                                                         ++$i;
729                                                 }
730                                         }
731                                         delete $spots->{$sp}; # get rid
732                                         delete $dxchan->{queue}->{$sp};
733                                         next;
734                                 }
735
736                                 # detemine and spit out the deviants. Then adjust the scores according to whether it is a deviant or good
737                                 # NOTE: deviant nodes can become good (or less bad), and good nodes bad (or less good) on each spot that
738                                 # they generate. This is based solely on each skimmer's agreement (or not) with the "consensus" score generated
739                                 # above ($qrg). The resultant score + good + bad is stored per band and will be used the next time a spot
740                                 # appears on this band from each skimmer.
741                                 foreach $r (@$cand) {
742                                         next unless $r && ref $r;
743                                         my $diff = $c > 1 ? nearest(.1, $r->[RQrg] - $qrg) : 0;
744                                         $sk = "SKIM|$r->[ROrigin]|$band";
745                                         $skimmer = $spots->{$sk};
746                                         if ($diff) {
747                                                 ++$skimmer->[DBad] if $skimmer->[DBad] < $maxdeviants;
748                                                 --$skimmer->[DGood] if $skimmer->[DGood] > 0;
749                                                 push @deviant, sprintf("$r->[ROrigin]:%+.1f", $diff);
750                                                 push @{$skimmer->[DEviants]}, $diff;
751                                                 shift @{$skimmer->[DEviants]} while @{$skimmer->[DEviants]} > $maxdeviants;
752                                         } else {
753                                                 ++$skimmer->[DGood] if $skimmer->[DGood] < $maxdeviants;
754                                                 --$skimmer->[DBad] if $skimmer->[DBad] > 0;
755                                                 shift @{$skimmer->[DEviants]};
756                                         }
757                                         $skimmer->[DScore] = $skimmer->[DGood] - $skimmer->[DBad];
758                                         if ($rbnskim && isdbg('rbnskim')) {
759                                                 my $lastin = difft($skimmer->[DLastin], $now, 2);
760                                                 my $difflist = join(', ', @{$skimmer->[DEviants]});
761                                                 $difflist = " band qrg diffs: $difflist" if $difflist;
762                                                 dbg("RBN:SKIM key $sp slot $sk $r->[RQrg] - $qrg = $diff Skimmer score: $skimmer->[DGood] - $skimmer->[DBad] = $skimmer->[DScore] lastseen:$lastin ago$difflist"); 
763                                         }
764                                         $skimmer->[DLastin] = $now;
765                                         $r->[RSpotData]->[SQrg] = $qrg if $qrg && $c > 1; # set all the QRGs to the agreed value
766                                 }
767
768                                 $qrg = (sprintf "%.1f",  $qrg)+0;
769                                 $r = $cand->[CData];
770                                 $r->[RQrg] = $qrg;
771                                 my $squality = "Q:$cand->[CQual]";
772                                 $squality .= '*' if $c > 1; 
773                                 $squality .= '+' if $r->[Respot];
774
775                                 if (isdbg('progress')) {
776                                         my $rt = difft($ctime, $now, 2);
777                                         my $s = "RBN: SPOT key: '$sp' = $r->[RCall] on $r->[RQrg] by $r->[ROrigin] \@ $r->[RTime] $squality route: $dxchan->{call} dwell:$rt";
778                                         my $td = @deviant;
779                                         $s .= " QRGScore: $mv Deviants: $td/$spotters";
780                                         $s .= ' (' . join(', ', sort @deviant) . ')' if $td;
781                                         dbg($s);
782                                 }
783
784                                 # finally send it out to any waiting public
785                                 send_dx_spot($dxchan, $squality, $cand);
786                                 
787                                 # clear out the data and make this now just "spotted", but no further action required until respot time
788                                 dbg "RBN: QUEUE key '$sp' cleared" if isdbg 'rbn';
789
790                                 delete $dxchan->{queue}->{$sp};
791
792                                 # calculate new sp (which will be 70% likely the same as the old one)
793                                 # we do this to cope with the fact that the first spotter may well be "wrongly calibrated" giving a qrg that disagrees with the majority.
794                                 # and we want to store the key that corresponds to majority opinion. 
795                                 my $nqrg = nearest(1, $qrg * 10);  # normalised to nearest Khz
796                                 my $nsp = "$r->[RCall]|$nqrg";
797                                 if ($sp ne $nsp) {
798                                         dbg("RBN:SKIM CHANGE KEY sp '$sp' -> '$nsp' for storage") if  $rbnskim && isdbg('rbnskim');
799                                         delete $spots->{$sp};
800                                         $spots->{$nsp} = [$now, $cand->[CQual]];
801                                 } else {
802                                         $spots->{$sp} = [$now, $cand->[CQual]];
803                                 }
804                         }
805                         else {
806                                 dbg sprintf("RBN: QUEUE key: '$sp' SEND time not yet reached %.1f secs left", $cand->[CTime] + $dwelltime - $now) if isdbg 'rbnqueue'; 
807                         }
808                 }
809                 if (isdbg('rbntimer')) {
810                         my $diff = _diffus($ta);
811                         dbg "RBN: TIMER process queue for call: $dxchan->{call} $items spots $diff uS";
812                 }
813         }
814 }
815
816 sub per_minute
817 {
818         foreach my $dxchan (DXChannel::get_all()) {
819                 next unless $dxchan->is_rbn;
820                 dbg "RBN:STATS minute $dxchan->{call} raw: $dxchan->{noraw} retrieved spots: $dxchan->{norbn} delivered: $dxchan->{nospot} after filtering to users: " . scalar keys %{$dxchan->{nousers}} if isdbg('rbnstats');
821                 if ($dxchan->{noraw} == 0 && $dxchan->{lasttime} > 60) {
822                         LogDbg('RBN', "RBN: no input from $dxchan->{call}, disconnecting");
823                         $dxchan->disconnect;
824                 }
825                 $dxchan->{noraw} = $dxchan->{norbn} = $dxchan->{nospot} = 0; $dxchan->{nousers} = {};
826                 $runtime{$dxchan->{call}} += 60;
827         }
828
829         # save the spot cache
830         write_cache() unless $main::systime + $startup_delay < $main::systime;;
831 }
832
833 sub per_10_minute
834 {
835         my $count = 0;
836         my $removed = 0;
837         while (my ($k,$cand) = each %{$spots}) {
838                 next if $k eq 'VERSION';
839                 next if $k =~ /^O\|/;
840                 next if $k =~ /^SKIM\|/;
841                 
842                 if ($main::systime - $cand->[CTime] > $minspottime*2) {
843                         delete $spots->{$k};
844                         ++$removed;
845                 }
846                 else {
847                         ++$count;
848                 }
849         }
850         dbg "RBN:STATS spot cache remain: $count removed: $removed"; # if isdbg('rbn');
851         foreach my $dxchan (DXChannel::get_all()) {
852                 next unless $dxchan->is_rbn;
853                 my $nq = keys %{$dxchan->{queue}};
854                 my $pc = $dxchan->{noraw10} ? sprintf("%.1f%%",$dxchan->{norbn10}*100/$dxchan->{noraw10}) : '0.0%';
855                 dbg "RBN:STATS 10-minute $dxchan->{call} queue: $nq raw: $dxchan->{noraw10} retrieved spots: $dxchan->{norbn10} ($pc) delivered: $dxchan->{nospot10} after filtering to  users: " . scalar keys %{$dxchan->{nousers10}};
856                 $dxchan->{noraw10} = $dxchan->{norbn10} = $dxchan->{nospot10} = 0; $dxchan->{nousers10} = {};
857         }
858 }
859
860 sub per_hour
861 {
862         foreach my $dxchan (DXChannel::get_all()) {
863                 next unless $dxchan->is_rbn;
864                 my $nq = keys %{$dxchan->{queue}};
865                 my $pc = $dxchan->{norawhour} ? sprintf("%.1f%%",$dxchan->{norbnhour}*100/$dxchan->{norawhour}) : '0.0%';
866                 dbg "RBN:STATS hour $dxchan->{call} queue: $nq raw: $dxchan->{norawhour} retrieved spots: $dxchan->{norbnhour} ($pc) delivered: $dxchan->{nospothour} after filtering to users: " . scalar keys %{$dxchan->{nousershour}};
867                 $dxchan->{norawhour} = $dxchan->{norbnhour} = $dxchan->{nospothour} = 0; $dxchan->{nousershour} = {};
868         }
869 }
870
871 sub finish
872 {
873         write_cache();
874 }
875
876 sub write_cache
877 {
878         my $ta = [ gettimeofday ];
879         $json->indent(1)->canonical(1) if isdbg 'rbncache';
880         my $s = eval {$json->encode($spots)};
881         if ($s) {
882                 my $fh = IO::File->new(">$cachefn") or confess("writing $cachefn $!");
883                 $fh->print($s);
884                 $fh->close;
885         } else {
886                 dbg("RBN:Write_cache error '$@'");
887                 return;
888         }
889         $json->indent(0)->canonical(0);
890         my $diff = _diffms($ta);
891         my $size = sprintf('%.3fKB', (length($s) / 1000));
892         dbg("RBN:WRITE_CACHE size: $size time to write: $diff mS");
893 }
894
895 sub check_cache
896 {
897         if (-e $cachefn) {
898                 my $mt = (stat($cachefn))[9];
899                 my $t = $main::systime - $mt || 1;
900                 my $p = difft($mt, 2);
901                 if ($t < $cache_valid) {
902                         dbg("RBN:check_cache '$cachefn' spot cache exists, created $p ago and not too old");
903                         my $fh = IO::File->new($cachefn);
904                         my $s;
905                         if ($fh) {
906                                 local $/ = undef;
907                                 $s = <$fh>;
908                                 dbg("RBN:check_cache cache read size " . length $s);
909                                 $fh->close;
910                         } else {
911                                 dbg("RBN:check_cache file read error $!");
912                                 return undef;
913                         }
914                         if ($s) {
915                                 eval {$spots = $json->decode($s)};
916                                 if ($spots && ref $spots) {     
917                                         if (exists $spots->{VERSION} && $spots->{VERSION} == $DATA_VERSION) {
918                                                 # now clean out anything that has spot build ups in progress
919                                                 while (my ($k, $cand) = each %$spots) {
920                                                         next if $k eq 'VERSION';
921                                                         next if $k =~ /^O\|/;
922                                                         next if $k =~ /^SKIM\|/;
923                                                         if (@$cand > CData) {
924                                                                 $spots->{$k} = [$cand->[CTime], $cand->[CQual]];
925                                                         }
926                                                 }
927                                                 dbg("RBN:check_cache spot cache restored");
928                                                 return 1;
929                                         } 
930                                 }
931                                 dbg("RBN::checkcache error decoding $@");
932                         }
933                 } else {
934                         my $d = difft($main::systime-$cache_valid);
935                         dbg("RBN::checkcache '$cachefn' created $p ago is too old (> $d), ignored");
936                 }
937         } else {
938                 dbg("RBN:check_cache '$cachefn' spot cache not present");
939         }
940         
941         return undef;
942 }
943
944 1;