00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.ft.servers.faultdetection;
00032
00033 import java.rmi.RemoteException;
00034 import java.util.ArrayList;
00035 import java.util.Iterator;
00036
00037 import org.apache.log4j.Logger;
00038 import org.objectweb.proactive.core.body.UniversalBody;
00039 import org.objectweb.proactive.core.body.ft.exception.NotImplementedException;
00040 import org.objectweb.proactive.core.body.ft.internalmsg.Heartbeat;
00041 import org.objectweb.proactive.core.body.ft.servers.FTServer;
00042 import org.objectweb.proactive.core.util.log.Loggers;
00043 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00044
00045
00050 public class FaultDetectorImpl implements FaultDetector {
00051
00052 protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE);
00053
00054
00055 private FTServer server;
00056
00057
00058 private FaultDetectorThread fdt;
00059
00060
00061 private static final Heartbeat hbe = new Heartbeat();
00062
00063
00064 private long faultDetectionPeriod;
00065
00069 public FaultDetectorImpl(FTServer server, long faultDetectPeriod) {
00070 this.faultDetectionPeriod = faultDetectPeriod;
00071 this.server = server;
00072 this.fdt = new FaultDetectorThread();
00073 }
00074
00078 public boolean isUnreachable(UniversalBody body) throws RemoteException {
00079 Object res = null;
00080 try {
00081 res = body.receiveFTMessage(FaultDetectorImpl.hbe);
00082 } catch (Exception e) {
00083
00084 return true;
00085 }
00086 if (res.equals(FaultDetector.OK)) {
00087
00088 return false;
00089 } else {
00090
00091 return true;
00092 }
00093 }
00094
00098 public void startFailureDetector() throws RemoteException {
00099 this.fdt.start();
00100 }
00101
00105 public void suspendFailureDetector() throws RemoteException {
00106 throw new NotImplementedException();
00107 }
00108
00112 public void stopFailureDetector() throws RemoteException {
00113 throw new NotImplementedException();
00114 }
00115
00119 public void forceDetection() throws RemoteException {
00120 this.fdt.wakeUp();
00121 }
00122
00123
00124
00125
00126
00127 private class FaultDetectorThread extends Thread {
00128 private boolean kill;
00129 private boolean wakeUpCalled;
00130
00131 public FaultDetectorThread() {
00132 this.kill = false;
00133 this.wakeUpCalled = false;
00134 this.setName("FaultDetectorThread");
00135 }
00136
00137 public synchronized void wakeUp() {
00138 notifyAll();
00139 this.wakeUpCalled = true;
00140 }
00141
00142 public synchronized void killMe() {
00143 this.kill = true;
00144 notifyAll();
00145 }
00146
00147 public synchronized void pause() {
00148 try {
00149 this.wakeUpCalled = false;
00150 this.wait(FaultDetectorImpl.this.faultDetectionPeriod);
00151 } catch (InterruptedException e) {
00152 e.printStackTrace();
00153 }
00154 }
00155
00156 public void run() {
00157 while (true) {
00158 try {
00159 if (kill) {
00160 break;
00161 }
00162
00163
00164 ArrayList al = FaultDetectorImpl.this.server.getAllLocations();
00165 Iterator it = al.iterator();
00166 logger.info("[FAULT DETECTOR] Scanning " + al.size() +
00167 " objects ...");
00168 while (it.hasNext()) {
00169 if (kill) {
00170 break;
00171 }
00172 if (wakeUpCalled) {
00173
00174 it = al.iterator();
00175 this.wakeUpCalled = false;
00176 }
00177 UniversalBody current = (UniversalBody) (it.next());
00178 if (FaultDetectorImpl.this.server.isUnreachable(current)) {
00179 FaultDetectorImpl.this.server.failureDetected(current.getID());
00180
00181 break;
00182 }
00183 }
00184 logger.info("[FAULT DETECTOR] End scanning.");
00185 if (kill) {
00186 break;
00187 }
00188 this.pause();
00189 } catch (RemoteException e) {
00190 e.printStackTrace();
00191 }
00192 }
00193 }
00194 }
00195
00199 public void initialize() throws RemoteException {
00200 this.fdt.killMe();
00201 this.fdt = new FaultDetectorThread();
00202 }
00203 }