Fix (now) missing dxchan error_handler
[spider.git] / perl / cluster.pl
index 10dca5eb265c8bf08a030608d39fea7223f0032d..ab6673247fcf04c899d1756eb24766493fce7641 100755 (executable)
@@ -10,7 +10,7 @@
 #
 #
 
-require 5.10;
+require 5.10.1;
 
 # make sure that modules are searched in the order local then perl
 BEGIN {
@@ -138,7 +138,9 @@ $maxconnect_user = 3;                       # the maximum no of concurrent connections a user can ha
 $maxconnect_node = 0;                  # Ditto but for nodes. In either case if a new incoming connection
                                                                # takes the no of references in the routing table above these numbers
                                                                # then the connection is refused. This only affects INCOMING connections.
-$idle_interval = 0.100;                        # the wait between invocations of the main idle loop processing.
+$idle_interval = 0.500;                        # the wait between invocations of the main idle loop processing.
+our $ending;                                                              # signal that we are ending;
+
 
 # send a message to call on conn and disconnect
 sub already_conn
@@ -152,13 +154,6 @@ sub already_conn
        $conn->disconnect;
 }
 
-sub error_handler
-{
-       my $dxchan = shift;
-       $dxchan->{conn}->set_error(undef) if exists $dxchan->{conn};
-       $dxchan->disconnect(1);
-}
-
 # handle incoming messages
 sub new_channel
 {
@@ -177,7 +172,7 @@ sub new_channel
        my $dxchan = DXChannel::get($call);
        if ($dxchan) {
                if ($user && $user->is_node) {
-                       already_conn($conn, $call, DXM::msg($lang, 'concluster', $call, $main::mycall));
+                       already_conn($conn, $call, DXM::msg($lang, 'conother', $call, $main::mycall));
                        return;
                }
                if ($bumpexisting) {
@@ -242,7 +237,8 @@ sub new_channel
        $conn->conns($call) if $conn->isa('IntMsg');
 
        # set callbacks
-       $conn->set_error(sub {error_handler($dxchan)});
+       $conn->set_error(sub {my $err = shift; LogDbg('DXCommand', "Comms error '$err' received for call $dxchan->{call}"); $dxchan->disconnect(1);});
+       $conn->set_on_eof(sub {$dxchan->disconnect});
        $conn->set_rproc(sub {my ($conn,$msg) = @_; $dxchan->rec($msg);});
        $dxchan->rec($msg);
 }
@@ -253,11 +249,17 @@ sub login
        return \&new_channel;
 }
 
+our $ceasing;
+
 # cease running this program, close down all the connections nicely
 sub cease
 {
        my $dxchan;
 
+       cluck("ceasing") if $ceasing; 
+       
+       return if $ceasing++;
+       
        unless ($is_win) {
                $SIG{'TERM'} = 'IGNORE';
                $SIG{'INT'} = 'IGNORE';
@@ -272,15 +274,6 @@ sub cease
                dbg("Local::finish error $@") if $@;
        }
 
-       # disconnect nodes
-       foreach $dxchan (DXChannel::get_all_nodes) {
-           $dxchan->disconnect(2) unless $dxchan == $main::me;
-       }
-
-       # disconnect users
-       foreach $dxchan (DXChannel::get_all_users) {
-               $dxchan->disconnect;
-       }
 
        # disconnect AGW
        AGWMsg::finish();
@@ -309,8 +302,6 @@ sub cease
        $dbh->finish if $dbh;
 
        unlink $lockfn;
-#      $SIG{__WARN__} = $SIG{__DIE__} =  sub {my $a = shift; cluck($a); };
-       exit(0);
 }
 
 # the reaper of children
@@ -344,11 +335,14 @@ sub AGWrestart
        AGWMsg::init(\&new_channel);
 }
 
+our $io_disconnected;
+
 sub idle_loop
 {
        my $timenow = time;
 
-       DXChannel::process();
+       BPQMsg::process();
+#      DXChannel::process();
 
        #      $DB::trace = 0;
 
@@ -371,23 +365,16 @@ sub idle_loop
                DXDb::process();
                DXUser::process();
                DXDupe::process();
-               $systime_days = $days;
-               $systime_daystart = $days * 86400;
+               DXCron::process();                      # do cron jobs
+               IsoTime::update($systime);
+               DXProt::process();                      # process ongoing ak1a pcxx stuff
+               DXConnect::process();
+               DXUser::process();
+               AGWMsg::process();
+               
+               Timer::handler();
+               DXLog::flush_all();
        }
-       IsoTime::update($systime);
-       DXCron::process();                      # do cron jobs
-       DXCommandmode::process();       # process ongoing command mode stuff
-       DXXml::process();
-       DXProt::process();                      # process ongoing ak1a pcxx stuff
-       DXConnect::process();
-       DXMsg::process();
-       DXDb::process();
-       DXUser::process();
-       DXDupe::process();
-       AGWMsg::process();
-       BPQMsg::process();
-
-       Timer::handler();
 
        if (defined &Local::process) {
                eval {
@@ -395,6 +382,29 @@ sub idle_loop
                };
                dbg("Local::process error $@") if $@;
        }
+
+       while ($ending) {
+               my $dxchan;
+
+               dbg("DXSpider Ending $ending");
+
+               unless ($io_disconnected++) {
+
+                       # disconnect users
+                       foreach $dxchan (DXChannel::get_all_users) {
+                               $dxchan->disconnect;
+                       }
+
+                       # disconnect nodes
+                       foreach $dxchan (DXChannel::get_all_nodes) {
+                               next if $dxchan == $main::me;
+                               $dxchan->disconnect(2);
+                       }
+                       $main::me->disconnect;
+               }
+
+               Mojo::IOLoop->stop if --$ending <= 0;
+       }
 }
 
 
@@ -502,7 +512,7 @@ dbg("load badwords: " . (BadWords::load or "Ok"));
 
 # prime some signals
 unless ($DB::VERSION) {
-       $SIG{INT} = $SIG{TERM} = sub { Mojo::IOLoop->stop; };
+       $SIG{INT} = $SIG{TERM} = sub { $ending = 10; };
 }
 
 unless ($is_win) {
@@ -595,7 +605,7 @@ my $main_loop = Mojo::IOLoop->recurring($idle_interval => \&idle_loop);
 
 Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
 
+dbg("After Mojo::IOLoop");
 cease(0);
 exit(0);
 
-