Prepare for git repository
[spider.git] / perl / cluster.pl
index 6d4dc90f66b2fc523a8d735069f231642d253341..37ace021654a4858cab156ab4196e66c083e61e5 100755 (executable)
@@ -68,7 +68,6 @@ use DXCommandmode;
 use DXProtVars;
 use DXProtout;
 use DXProt;
-use QXProt;
 use DXMsg;
 use DXCron;
 use DXConnect;
@@ -99,11 +98,16 @@ use Mrtg;
 use USDB;
 use UDPMsg;
 use QSL;
+use RouteDB;
+use DXXml;
+use DXSql;
+use IsoTime;
 
 use Data::Dumper;
 use IO::File;
 use Fcntl ':flock'; 
 use POSIX ":sys_wait_h";
+use Version;
 
 use Local;
 
@@ -112,25 +116,20 @@ package main;
 use strict;
 use vars qw(@inqueue $systime $version $starttime $lockfn @outstanding_connects 
                        $zombies $root @listeners $lang $myalias @debug $userfn $clusteraddr 
-                       $clusterport $mycall $decease $is_win $routeroot $me $reqreg
+                       $clusterport $mycall $decease $is_win $routeroot $me $reqreg $bumpexisting
+                       $allowdxby $dbh $dsn $dbuser $dbpass $do_xml
                   );
 
 @inqueue = ();                                 # the main input queue, an array of hashes
 $systime = 0;                                  # the time now (in seconds)
-$version = "1.51";                             # the version no of the software
 $starttime = 0;                 # the starting time of the cluster   
-#@outstanding_connects = ();     # list of outstanding connects
+@outstanding_connects = ();     # list of outstanding connects
 @listeners = ();                               # list of listeners
 $reqreg = 0;                                   # 1 = registration required, 2 = deregister people
+$bumpexisting = 1;                             # 1 = allow new connection to disconnect old, 0 - don't allow it
+$allowdxby = 0;                                        # 1 = allow "dx by <othercall>", 0 - don't allow it
 
-use vars qw($VERSION $BRANCH $build $branch);
-$VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ );
-$BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/  || (0,0));
-$main::build += 5;                             # add an offset to make it bigger than last system
-$main::build += $VERSION;
-$main::branch += $BRANCH;
 
-      
 # send a message to call on conn and disconnect
 sub already_conn
 {
@@ -165,11 +164,21 @@ sub new_channel
        # set up the basic channel info
        # is there one already connected to me - locally? 
        my $user = DXUser->get_current($call);
-       my $dxchan = DXChannel->get($call);
+       my $dxchan = DXChannel::get($call);
        if ($dxchan) {
-               my $mess = DXM::msg($lang, ($user && $user->is_node) ? 'concluster' : 'conother', $call, $main::mycall);
-               already_conn($conn, $call, $mess);
-               return;
+               if ($user && $user->is_node) {
+                       already_conn($conn, $call, DXM::msg($lang, 'concluster', $call, $main::mycall));
+                       return;
+               }
+               if ($bumpexisting) {
+                       my $ip = $conn->{peerhost} || 'unknown';
+                       $dxchan->send_now('D', DXM::msg($lang, 'conbump', $call, $ip));
+                       LogDbg('DXCommand', "$call bumped off by $ip, disconnected");
+                       $dxchan->disconnect;
+               } else {
+                       already_conn($conn, $call, DXM::msg($lang, 'conother', $call, $main::mycall));
+                       return;
+               }
        }
 
        # is he locked out ?
@@ -180,7 +189,7 @@ sub new_channel
        if ($baseuser && $baseuser->lockout || $lock) {
                if (!$user || !defined $lock || $lock) {
                        my $host = $conn->{peerhost} || "unknown";
-                       Log('DXCommand', "$call on $host is locked out, disconnected");
+                       LogDbg('DXCommand', "$call on $host is locked out, disconnected");
                        $conn->disconnect;
                        return;
                }
@@ -192,24 +201,8 @@ sub new_channel
                $user = DXUser->new($call);
        }
        
-
        # create the channel
-       if ($user->wantnp) {
-               if ($user->passphrase && $main::me->user->passphrase) {
-                       $dxchan = QXProt->new($call, $conn, $user);
-               } else {
-                       unless ($user->passphrase) {
-                               Log('DXCommand', "$call using NP but has no passphrase");
-                               dbg("$call using NP but has no passphrase");
-                       }
-                       unless ($main::me->user->passphrase) {
-                               Log('DXCommand', "$main::mycall using NP but has no passphrase");
-                               dbg("$main::mycall using NP but has no passphrase");
-                       }
-                       already_conn($conn, $call, "Need to exchange passphrases");
-                       return;
-               }
-       } elsif ($user->is_node) {
+       if ($user->is_node) {
                $dxchan = DXProt->new($call, $conn, $user);
        } elsif ($user->is_user) {
                $dxchan = DXCommandmode->new($call, $conn, $user);
@@ -224,29 +217,10 @@ sub new_channel
 
        # set callbacks
        $conn->set_error(sub {error_handler($dxchan)});
-       $conn->set_rproc(sub {my ($conn,$msg) = @_; rec($dxchan, $conn, $msg);});
-       rec($dxchan, $conn, $msg);
-}
-
-sub rec        
-{
-       my ($dxchan, $conn, $msg) = @_;
-       
-       # queue the message and the channel object for later processing
-       if (defined $msg) {
-               my $self = bless {}, "inqueue";
-               $self->{dxchan} = $dxchan;
-               $self->{data} = $msg;
-               push @inqueue, $self;
-       }
+       $conn->set_rproc(sub {my ($conn,$msg) = @_; $dxchan->rec($msg);});
+       $dxchan->rec($msg);
 }
 
-# remove any outstanding entries on the inqueue after a disconnection (usually)
-sub clean_inqueue
-{
-       my $dxchan = shift;
-       @inqueue = grep {$_->{dxchan} != $dxchan} @inqueue;
-}
 
 sub login
 {
@@ -271,13 +245,13 @@ sub cease
        dbg("Local::finish error $@") if $@;
 
        # disconnect nodes
-       foreach $dxchan (DXChannel->get_all_nodes) {
+       foreach $dxchan (DXChannel::get_all_nodes) {
            $dxchan->disconnect(2) unless $dxchan == $main::me;
        }
        Msg->event_loop(100, 0.01);
 
        # disconnect users
-       foreach $dxchan (DXChannel->get_all_users) {
+       foreach $dxchan (DXChannel::get_all_users) {
                $dxchan->disconnect;
        }
 
@@ -300,10 +274,12 @@ sub cease
                $l->close_server;
        }
 
-       dbg("DXSpider version $version, build $build ended") if isdbg('chan');
-       Log('cluster', "DXSpider V$version, build $build ended");
+       LogDbg('cluster', "DXSpider V$version, build $build ended");
        dbgclose();
        Logclose();
+
+       $dbh->finish if $dbh;
+       
        unlink $lockfn;
 #      $SIG{__WARN__} = $SIG{__DIE__} =  sub {my $a = shift; cluck($a); };
        exit(0);
@@ -323,45 +299,6 @@ sub reap
 
 # this is where the input queue is dealt with and things are dispatched off to other parts of
 # the cluster
-sub process_inqueue
-{
-       while (@inqueue) {
-               my $self = shift @inqueue;
-               return if !$self;
-
-               my $data = $self->{data};
-               my $dxchan = $self->{dxchan};
-               my $error;
-               my ($sort, $call, $line) = DXChannel::decode_input($dxchan, $data);
-               return unless defined $sort;
-       
-               # do the really sexy console interface bit! (Who is going to do the TK interface then?)
-               dbg("<- $sort $call $line") if $sort ne 'D' && isdbg('chan');
-               if ($self->{disconnecting}) {
-                       dbg('In disconnection, ignored');
-                       next;
-               }
-               
-               # handle A records
-               my $user = $dxchan->user;
-               if ($sort eq 'A' || $sort eq 'O') {
-                       $dxchan->start($line, $sort);  
-               } elsif ($sort eq 'I') {
-                       die "\$user not defined for $call" if !defined $user;
-
-                       # normal input
-                       $dxchan->normal($line);
-               } elsif ($sort eq 'Z') {
-                       $dxchan->disconnect;
-               } elsif ($sort eq 'D') {
-                       ;                                       # ignored (an echo)
-               } elsif ($sort eq 'G') {
-                       $dxchan->enhanced($line);
-               } else {
-                       print STDERR atime, " Unknown command letter ($sort) received from $call\n";
-               }
-       }
-}
 
 sub uptime
 {
@@ -388,6 +325,10 @@ sub AGWrestart
 $starttime = $systime = time;
 $lang = 'en' unless $lang;
 
+unless ($DB::VERSION) {
+       $SIG{INT} = $SIG{TERM} = \&cease;
+}
+
 # open the debug file, set various FHs to be unbuffered
 dbginit(\&DXCommandmode::broadcast_debug);
 foreach (@debug) {
@@ -395,21 +336,26 @@ foreach (@debug) {
 }
 STDOUT->autoflush(1);
 
-# calculate build number
-$build += $main::version;
-$build = "$build.$branch" if $branch;
+# try to load the database
+if (DXSql::init($dsn)) {
+       $dbh = DXSql->new($dsn);
+       $dbh = $dbh->connect($dsn, $dbuser, $dbpass) if $dbh;
+}
 
-Log('cluster', "DXSpider V$version, build $build started");
+# try to load XML::Simple
+DXXml::init();
 
 # banner
-dbg("Copyright (c) 1998-2002 Dirk Koopman G1TLH");
-dbg("DXSpider Version $version, build $build started");
+my ($year) = (gmtime)[5];
+$year += 1900;
+LogDbg('cluster', "DXSpider V$version, build $build started");
+dbg("Copyright (c) 1998-$year Dirk Koopman G1TLH");
 
 # load Prefixes
 dbg("loading prefixes ...");
+dbg(USDB::init());
 my $r = Prefix::init();
 confess $r if $r;
-dbg(USDB::init());
 
 # load band data
 dbg("loading band data ...");
@@ -430,14 +376,18 @@ DXUser->init($userfn, 1);
 # start listening for incoming messages/connects
 dbg("starting listeners ...");
 my $conn = IntMsg->new_server($clusteraddr, $clusterport, \&login);
-$conn->conns("Server $clusteraddr/$clusterport");
+$conn->conns("Server $clusteraddr/$clusterport using IntMsg");
 push @listeners, $conn;
-dbg("Internal port: $clusteraddr $clusterport");
+dbg("Internal port: $clusteraddr $clusterport using IntMsg");
 foreach my $l (@main::listen) {
-       $conn = ExtMsg->new_server($l->[0], $l->[1], \&login);
-       $conn->conns("Server $l->[0]/$l->[1]");
+       no strict 'refs';
+       my $pkg = $l->[2] || 'ExtMsg';
+       my $login = $l->[3] || 'login'; 
+       
+       $conn = $pkg->new_server($l->[0], $l->[1], \&{"${pkg}::${login}"});
+       $conn->conns("Server $l->[0]/$l->[1] using ${pkg}::${login}");
        push @listeners, $conn;
-       dbg("External Port: $l->[0] $l->[1]");
+       dbg("External Port: $l->[0] $l->[1] using ${pkg}::${login}");
 }
 
 dbg("AGW Listener") if $AGWMsg::enable;
@@ -497,7 +447,6 @@ Spot->init();
 # initialise the protocol engine
 dbg("Start Protocol Engines ...");
 DXProt->init();
-QXProt->init();
 
 # put in a DXCluster node for us here so we can add users and take them away
 $routeroot = Route::Node->new($mycall, $version*100+5300, Route::here($main::me->here)|Route::conf($main::me->conf));
@@ -541,24 +490,26 @@ for (;;) {
        
        Msg->event_loop(10, 0.010);
        my $timenow = time;
-       process_inqueue();                      # read in lines from the input queue and despatch them
+
+       DXChannel::process();
+       
 #      $DB::trace = 0;
        
        # do timed stuff, ongoing processing happens one a second
        if ($timenow != $systime) {
-               reap if $zombies;
-               $systime = $timenow;
+               reap() if $zombies;
+               IsoTime::update($systime = $timenow);
                DXCron::process();      # do cron jobs
                DXCommandmode::process(); # process ongoing command mode stuff
+               DXXml::process();
                DXProt::process();              # process ongoing ak1a pcxx stuff
-               QXProt::process();
                DXConnect::process();
                DXMsg::process();
                DXDb::process();
                DXUser::process();
                DXDupe::process();
                AGWMsg::process();
-                               
+
                eval { 
                        Local::process();       # do any localised processing
                };