org/objectweb/proactive/core/body/ft/servers/faultdetection/FaultDetectorImpl.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.ft.servers.faultdetection;
00032 
00033 import java.rmi.RemoteException;
00034 import java.util.ArrayList;
00035 import java.util.Iterator;
00036 
00037 import org.apache.log4j.Logger;
00038 import org.objectweb.proactive.core.body.UniversalBody;
00039 import org.objectweb.proactive.core.body.ft.exception.NotImplementedException;
00040 import org.objectweb.proactive.core.body.ft.internalmsg.Heartbeat;
00041 import org.objectweb.proactive.core.body.ft.servers.FTServer;
00042 import org.objectweb.proactive.core.util.log.Loggers;
00043 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00044 
00045 
00050 public class FaultDetectorImpl implements FaultDetector {
00051     //logger
00052     protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE);
00053 
00054     // global server
00055     private FTServer server;
00056 
00057     // detection thread
00058     private FaultDetectorThread fdt;
00059 
00060     // static heartbeat message
00061     private static final Heartbeat hbe = new Heartbeat();
00062 
00063     // detection period
00064     private long faultDetectionPeriod;
00065 
00069     public FaultDetectorImpl(FTServer server, long faultDetectPeriod) {
00070         this.faultDetectionPeriod = faultDetectPeriod;
00071         this.server = server;
00072         this.fdt = new FaultDetectorThread();
00073     }
00074 
00078     public boolean isUnreachable(UniversalBody body) throws RemoteException {
00079         Object res = null;
00080         try {
00081             res = body.receiveFTMessage(FaultDetectorImpl.hbe);
00082         } catch (Exception e) {
00083             // object is unreachable
00084             return true;
00085         }
00086         if (res.equals(FaultDetector.OK)) {
00087             // object is OK
00088             return false;
00089         } else {
00090             // object is dead
00091             return true;
00092         }
00093     }
00094 
00098     public void startFailureDetector() throws RemoteException {
00099         this.fdt.start();
00100     }
00101 
00105     public void suspendFailureDetector() throws RemoteException {
00106         throw new NotImplementedException();
00107     }
00108 
00112     public void stopFailureDetector() throws RemoteException {
00113         throw new NotImplementedException();
00114     }
00115 
00119     public void forceDetection() throws RemoteException {
00120         this.fdt.wakeUp();
00121     }
00122 
00123     /*
00124      * Thread for fault detection. One unique thread scans all active objects
00125      * @author cdelbe
00126      */
00127     private class FaultDetectorThread extends Thread {
00128         private boolean kill;
00129         private boolean wakeUpCalled;
00130 
00131         public FaultDetectorThread() {
00132             this.kill = false;
00133             this.wakeUpCalled = false;
00134             this.setName("FaultDetectorThread");
00135         }
00136 
00137         public synchronized void wakeUp() {
00138             notifyAll();
00139             this.wakeUpCalled = true;
00140         }
00141 
00142         public synchronized void killMe() {
00143             this.kill = true;
00144             notifyAll();
00145         }
00146 
00147         public synchronized void pause() {
00148             try {
00149                 this.wakeUpCalled = false;
00150                 this.wait(FaultDetectorImpl.this.faultDetectionPeriod);
00151             } catch (InterruptedException e) {
00152                 e.printStackTrace();
00153             }
00154         }
00155 
00156         public void run() {
00157             while (true) {
00158                 try {
00159                     if (kill) {
00160                         break;
00161                     }
00162 
00163                     //synchronized (FaultDetectorImpl.this.server){
00164                     ArrayList al = FaultDetectorImpl.this.server.getAllLocations();
00165                     Iterator it = al.iterator();
00166                     logger.info("[FAULT DETECTOR] Scanning " + al.size() +
00167                         " objects ...");
00168                     while (it.hasNext()) {
00169                         if (kill) {
00170                             break;
00171                         }
00172                         if (wakeUpCalled) {
00173                             // a detection is forced, restart...
00174                             it = al.iterator();
00175                             this.wakeUpCalled = false;
00176                         }
00177                         UniversalBody current = (UniversalBody) (it.next());
00178                         if (FaultDetectorImpl.this.server.isUnreachable(current)) {
00179                             FaultDetectorImpl.this.server.failureDetected(current.getID());
00180                             // other failures may be detected by the recoveryProcess
00181                             break;
00182                         }
00183                     }
00184                     logger.info("[FAULT DETECTOR] End scanning.");
00185                     if (kill) {
00186                         break;
00187                     }
00188                     this.pause();
00189                 } catch (RemoteException e) {
00190                     e.printStackTrace();
00191                 }
00192             }
00193         }
00194     }
00195 
00199     public void initialize() throws RemoteException {
00200         this.fdt.killMe();
00201         this.fdt = new FaultDetectorThread();
00202     }
00203 }

Generated on Mon Jan 22 15:16:05 2007 for ProActive by  doxygen 1.5.1