00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.migration;
00032
00033 import java.util.Timer;
00034
00035 import org.apache.log4j.Logger;
00036 import org.objectweb.proactive.Body;
00037 import org.objectweb.proactive.core.body.LocalBodyStore;
00038 import org.objectweb.proactive.core.body.UniversalBody;
00039 import org.objectweb.proactive.core.body.reply.ReplyReceiver;
00040 import org.objectweb.proactive.core.body.reply.ReplyReceiverForwarder;
00041 import org.objectweb.proactive.core.body.request.RequestReceiver;
00042 import org.objectweb.proactive.core.body.request.RequestReceiverForwarder;
00043 import org.objectweb.proactive.core.event.AbstractEventProducer;
00044 import org.objectweb.proactive.core.event.MigrationEvent;
00045 import org.objectweb.proactive.core.event.MigrationEventListener;
00046 import org.objectweb.proactive.core.event.ProActiveEvent;
00047 import org.objectweb.proactive.core.event.ProActiveListener;
00048 import org.objectweb.proactive.core.node.Node;
00049 import org.objectweb.proactive.core.node.NodeFactory;
00050 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00051 import org.objectweb.proactive.core.util.log.Loggers;
00052 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00053 import org.objectweb.proactive.ext.locationserver.LocationServer;
00054 import org.objectweb.proactive.ext.locationserver.LocationServerFactory;
00055
00056 public class MigrationManagerImpl extends AbstractEventProducer implements
00057 MigrationManager, java.io.Serializable {
00058 protected static Logger logger = ProActiveLogger
00059 .getLogger(Loggers.MIGRATION);
00060
00061 transient protected LocationServer locationServer;
00062
00063 protected int nbOfMigrationWithoutUpdate;
00064
00065 protected int migrationCounter;
00066
00067 public static final int INFINITE_TTL = -1;
00068
00069 public static final int INFINITE_MAX_MIGRATION_NB = -1;
00070
00071 public static final int INFINITE_MAX_TIME_ON_SITE = -1;
00072
00073
00074
00075 private int ttl;
00076
00077 private boolean updatingForwarder;
00078
00079 private int maxMigrationNb;
00080
00081 private int maxTimeOnSite;
00082
00083 private transient Timer maxTimeOnSiteTimer;
00084
00085 private transient Timer ttlTimer;
00086
00087
00088
00089
00090 public MigrationManagerImpl() {
00091 super(true);
00092
00093 if (System.getProperty("proactive.mixedlocation.ttl") != null) {
00094 this.ttl = Integer.valueOf(
00095 System.getProperty("proactive.mixedlocation.ttl")).intValue();
00096 } else {
00097 this.ttl = INFINITE_TTL;
00098 }
00099
00100 if (System.getProperty("proactive.mixedlocation.updatingForwarder") != null) {
00101 this.updatingForwarder = Boolean.parseBoolean(System
00102 .getProperty("proactive.mixedlocation.updatingForwarder"));
00103 } else {
00104 this.updatingForwarder = false;
00105 }
00106
00107 if (System.getProperty("proactive.mixedlocation.maxMigrationNb") != null) {
00108 this.maxMigrationNb = Integer.valueOf(
00109 System.getProperty("proactive.mixedlocation.maxMigrationNb")).intValue();
00110 } else {
00111 this.maxMigrationNb = INFINITE_MAX_MIGRATION_NB;
00112 }
00113
00114 if (System.getProperty("proactive.mixedlocation.maxTimeOnSite") != null) {
00115 this.maxTimeOnSite = Integer.valueOf(
00116 System.getProperty("proactive.mixedlocation.maxTimeOnSite")).intValue();
00117 } else {
00118 this.maxTimeOnSite = INFINITE_MAX_TIME_ON_SITE;
00119 }
00120
00121 this.nbOfMigrationWithoutUpdate = 0;
00122 this.migrationCounter = 0;
00123
00124 }
00125
00126
00127
00128
00129 public void updateLocation(UniversalBody body) {
00130 if (locationServer == null) {
00131 this.locationServer = LocationServerFactory.getLocationServer();
00132 }
00133 if (locationServer != null) {
00134 locationServer.updateLocation(body.getID(),
00135 body.getRemoteAdapter(), this.migrationCounter);
00136 }
00137 resetNbOfMigrationWithoutUpdate();
00138 }
00139
00140
00141 public void resetNbOfMigrationWithoutUpdate() {
00142 this.nbOfMigrationWithoutUpdate = 0;
00143 }
00144
00145 public void launchTimeToLive(MigratableBody body, UniversalBody migratedBody) {
00146 if (this.ttl != INFINITE_TTL) {
00147 ttlTimer = new Timer();
00148 ttlTimer
00149 .schedule(
00150 new TimeToLiveTimerTask(this, body, migratedBody),
00151 this.ttl);
00152 }
00153 }
00154
00155
00156
00157
00158
00159
00160
00161 public Node checkNode(Node node) throws MigrationException {
00162 if (node == null) {
00163 throw new MigrationException(
00164 "The RemoteNodeImpl could not be found");
00165 }
00166
00167
00168 if (NodeFactory.isNodeLocal(node)) {
00169 MigrationException me = new MigrationException("The given node "
00170 + node.getNodeInformation().getURL()
00171 + " is in the same virtual machine");
00172 if (hasListeners()) {
00173 notifyAllListeners(new MigrationEvent(me));
00174 }
00175 throw me;
00176 }
00177 return node;
00178 }
00179
00180 public UniversalBody migrateTo(Node node, Body body)
00181 throws MigrationException {
00182 if (hasListeners()) {
00183 notifyAllListeners(new MigrationEvent(body,
00184 MigrationEvent.BEFORE_MIGRATION));
00185 }
00186 try {
00187 long l1 = 0;
00188 if (logger.isDebugEnabled()) {
00189 l1 = System.currentTimeMillis();
00190 }
00191
00192
00193
00194
00195 ProActiveRuntime part = node.getProActiveRuntime();
00196 UniversalBody remoteBody = part.receiveBody(node
00197 .getNodeInformation().getName(), body);
00198
00199 if (logger.isDebugEnabled()) {
00200 logger.debug("runtime = " + part);
00201 logger.debug("remoteBody = " + remoteBody);
00202 }
00203
00204
00205
00206
00207 long l2 = 0;
00208 if (logger.isDebugEnabled()) {
00209 l2 = System.currentTimeMillis();
00210 logger.debug("Migration took " + (l2 - l1));
00211 }
00212 if (hasListeners()) {
00213 notifyAllListeners(new MigrationEvent(body,
00214 MigrationEvent.AFTER_MIGRATION));
00215 }
00216
00217
00218
00219
00220 if (maxTimeOnSiteTimer != null)
00221 maxTimeOnSiteTimer.cancel();
00222
00223 return remoteBody;
00224
00225 } catch (Exception e) {
00226 MigrationException me = new MigrationException(
00227 "Exception while sending the Object", e.getCause());
00228 if (hasListeners()) {
00229 notifyAllListeners(new MigrationEvent(me));
00230 }
00231 throw me;
00232 }
00233 }
00234
00240 public void changeBodyAfterMigration(MigratableBody body,
00241 UniversalBody migratedBody) {
00242 if (this.ttl == 0) {
00243
00244
00245 body.terminate();
00246 } else {
00247 body.setRequestReceiver(createRequestReceiver(migratedBody, body
00248 .getRequestReceiver()));
00249 body.setReplyReceiver(createReplyReceiver(migratedBody, body
00250 .getReplyReceiver()));
00251
00252 body.setHasMigrated();
00253
00254 LocalBodyStore.getInstance().registerForwarder(body);
00255
00256
00257
00258
00259 launchTimeToLive(body, migratedBody);
00260 }
00261
00262 }
00263
00264 public void startingAfterMigration(Body body) {
00265 if (hasListeners()) {
00266 notifyAllListeners(new MigrationEvent(body,
00267 MigrationEvent.RESTARTING_AFTER_MIGRATING));
00268 }
00269
00270 this.nbOfMigrationWithoutUpdate++;
00271 this.migrationCounter++;
00272 if (logger.isDebugEnabled()) {
00273 logger.debug("XXX counter == " + this.nbOfMigrationWithoutUpdate);
00274 }
00275
00276
00277 if (this.maxMigrationNb != INFINITE_MAX_MIGRATION_NB
00278 && this.nbOfMigrationWithoutUpdate >= this.maxMigrationNb) {
00279 updateLocation(body);
00280 }
00281
00282
00283 else if (this.maxTimeOnSite != INFINITE_MAX_TIME_ON_SITE) {
00284 maxTimeOnSiteTimer = new Timer();
00285 maxTimeOnSiteTimer.schedule(new MaxTimeOnSiteTimerTask(this, body),
00286 maxTimeOnSite);
00287 }
00288 }
00289
00290 public RequestReceiver createRequestReceiver(UniversalBody remoteBody,
00291 RequestReceiver currentRequestReceiver) {
00292 return new RequestReceiverForwarder(remoteBody);
00293 }
00294
00295 public ReplyReceiver createReplyReceiver(UniversalBody remoteBody,
00296 ReplyReceiver currentReplyReceiver) {
00297 return new ReplyReceiverForwarder(remoteBody);
00298 }
00299
00300 public void addMigrationEventListener(MigrationEventListener listener) {
00301 addListener(listener);
00302 }
00303
00304 public void removeMigrationEventListener(MigrationEventListener listener) {
00305 removeListener(listener);
00306 }
00307
00308 public void setMigrationStrategy(int ttl, boolean updatingForwarder, int maxMigrationNb, int maxTimeOnSite){
00309 this.ttl = ttl;
00310 this.updatingForwarder = updatingForwarder;
00311 this.maxMigrationNb = maxMigrationNb;
00312 this.maxTimeOnSite = maxTimeOnSite;
00313 }
00314
00315
00316
00317
00318 protected void notifyOneListener(ProActiveListener listener,
00319 ProActiveEvent event) {
00320 MigrationEvent migrationEvent = (MigrationEvent) event;
00321 MigrationEventListener migrationEventListener = (MigrationEventListener) listener;
00322 switch (event.getType()) {
00323 case MigrationEvent.BEFORE_MIGRATION:
00324 migrationEventListener.migrationAboutToStart(migrationEvent);
00325 break;
00326 case MigrationEvent.AFTER_MIGRATION:
00327 migrationEventListener.migrationFinished(migrationEvent);
00328 break;
00329 case MigrationEvent.MIGRATION_EXCEPTION:
00330 migrationEventListener.migrationExceptionThrown(migrationEvent);
00331 break;
00332 case MigrationEvent.RESTARTING_AFTER_MIGRATING:
00333 migrationEventListener.migratedBodyRestarted(migrationEvent);
00334 break;
00335 }
00336 }
00337
00338
00339
00340
00341 protected class MaxTimeOnSiteTimerTask extends java.util.TimerTask {
00342 protected MigrationManagerImpl migrationManager;
00343
00344 protected Body body;
00345
00346 public MaxTimeOnSiteTimerTask(MigrationManagerImpl migrationManager,
00347 Body body) {
00348 this.migrationManager = migrationManager;
00349 this.body = body;
00350
00351 }
00352
00353 public void run() {
00354 this.body.enterInThreadStore();
00355
00356 if ((this.body instanceof Migratable)
00357 && !((Migratable) this.body).hasJustMigrated()) {
00358 this.migrationManager.updateLocation(this.body);
00359 }
00360 this.body.exitFromThreadStore();
00361 }
00362 }
00363
00364 protected class TimeToLiveTimerTask extends java.util.TimerTask {
00365 protected MigrationManagerImpl migrationManager;
00366
00367 protected MigratableBody body;
00368
00369 protected UniversalBody migratedBody;
00370
00371 private long creationTime;
00372
00373 public TimeToLiveTimerTask(MigrationManagerImpl migrationManager,
00374 MigratableBody body, UniversalBody migratedBody) {
00375 this.migrationManager = migrationManager;
00376 this.body = body;
00377 this.migratedBody = migratedBody;
00378 this.creationTime = System.currentTimeMillis();
00379 }
00380
00381 public void run() {
00382
00383 if (this.migrationManager.updatingForwarder)
00384 this.migrationManager.updateLocation(this.migratedBody.getRemoteAdapter());
00385
00386
00387 LocalBodyStore.getInstance().unregisterForwarder(body);
00388
00389 this.body.terminate();
00390 body.setRequestReceiver(null);
00391 body.setReplyReceiver(null);
00392
00393
00394
00395 }
00396 }
00397 }