00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.core.body.migration;
00032
00033 import java.io.IOException;
00034 import java.util.ArrayList;
00035
00036 import org.apache.log4j.Logger;
00037 import org.objectweb.proactive.core.UniqueID;
00038 import org.objectweb.proactive.core.body.BodyImpl;
00039 import org.objectweb.proactive.core.body.MetaObjectFactory;
00040 import org.objectweb.proactive.core.body.UniversalBody;
00041 import org.objectweb.proactive.core.body.reply.ReplyReceiver;
00042 import org.objectweb.proactive.core.body.request.RequestReceiver;
00043 import org.objectweb.proactive.core.event.MigrationEventListener;
00044 import org.objectweb.proactive.core.node.Node;
00045 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00046 import org.objectweb.proactive.core.util.log.Loggers;
00047 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00048 import org.objectweb.proactive.ext.security.InternalBodySecurity;
00049 import org.objectweb.proactive.ext.security.SecurityContext;
00050 import org.objectweb.proactive.ext.security.exceptions.SecurityNotAvailableException;
00051 import org.objectweb.proactive.ext.security.securityentity.Entity;
00052
00053
00054 public class MigratableBody extends BodyImpl implements Migratable,
00055 java.io.Serializable {
00056 protected static Logger bodyLogger = ProActiveLogger.getLogger(Loggers.BODY);
00057 protected static Logger migrationLogger = ProActiveLogger.getLogger(Loggers.MIGRATION);
00058
00059
00060
00061
00062
00064 protected MigrationManager migrationManager;
00065
00067 protected transient boolean hasJustMigrated;
00068
00069
00070
00071
00072 public MigratableBody() {
00073 }
00074
00075 public MigratableBody(Object reifiedObject, String nodeURL,
00076 MetaObjectFactory factory, String jobID) {
00077 super(reifiedObject, nodeURL, factory, jobID);
00078 this.migrationManager = factory.newMigrationManagerFactory()
00079 .newMigrationManager();
00080 }
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090 public boolean hasJustMigrated(){
00091 return this.hasJustMigrated;
00092 }
00093
00094 public UniversalBody migrateTo(Node node) throws MigrationException {
00095 return internalMigrateTo(node, false);
00096 }
00097
00098 public UniversalBody cloneTo(Node node) throws MigrationException {
00099 return internalMigrateTo(node, true);
00100 }
00101
00102 public void addMigrationEventListener(MigrationEventListener listener) {
00103 if (migrationManager != null) {
00104 migrationManager.addMigrationEventListener(listener);
00105 }
00106 }
00107
00108 public void removeMigrationEventListener(MigrationEventListener listener) {
00109 if (migrationManager != null) {
00110 migrationManager.removeMigrationEventListener(listener);
00111 }
00112 }
00113
00114 public MigrationManager getMigrationManager() {
00115 return migrationManager;
00116 }
00117
00118
00119
00120
00121
00125 protected void activityStarted() {
00126 super.activityStarted();
00127
00128 if (migrationLogger.isDebugEnabled()) {
00129 migrationLogger.debug("Body run on node " + nodeURL +
00130 " migration=" + hasJustMigrated);
00131 }
00132 if (bodyLogger.isDebugEnabled()) {
00133 bodyLogger.debug("Body run on node " + nodeURL + " migration=" +
00134 hasJustMigrated);
00135 }
00136 if (hasJustMigrated) {
00137 if (migrationManager != null) {
00138 migrationManager.startingAfterMigration(this);
00139 }
00140 hasJustMigrated = false;
00141 }
00142 }
00143
00144 protected void setRequestReceiver(RequestReceiver requestReceiver){
00145 this.requestReceiver = requestReceiver;
00146 }
00147
00148 protected void setReplyReceiver(ReplyReceiver replyReceiver){
00149 this.replyReceiver = replyReceiver;
00150 }
00151
00152 protected void setHasMigrated(){
00153 this.hasJustMigrated = true;
00154 }
00155
00156 protected RequestReceiver getRequestReceiver(){
00157 return this.requestReceiver;
00158 }
00159
00160 protected ReplyReceiver getReplyReceiver(){
00161 return this.replyReceiver;
00162 }
00163
00164
00165
00166
00167 private UniversalBody internalMigrateTo(Node node, boolean byCopy)
00168 throws MigrationException {
00169 UniqueID savedID = null;
00170 UniversalBody migratedBody = null;
00171
00172 if (!isAlive()) {
00173 throw new MigrationException(
00174 "Attempt to migrate a dead body that has been terminated");
00175 }
00176
00177 if (!isActive()) {
00178 throw new MigrationException("Attempt to migrate a non active body");
00179 }
00180
00181
00182 node = migrationManager.checkNode(node);
00183
00184
00185 String saveNodeURL = nodeURL;
00186 nodeURL = node.getNodeInformation().getURL();
00187
00188 try {
00189 if (this.isSecurityOn) {
00190
00191 try {
00192 ProActiveRuntime runtimeDestination = node.getProActiveRuntime();
00193
00194 ArrayList<Entity> entitiesFrom = null;
00195 ArrayList<Entity> entitiesTo = null;
00196
00197 entitiesFrom = this.getEntities();
00198 entitiesTo = runtimeDestination.getEntities();
00199
00200 SecurityContext sc = new SecurityContext(SecurityContext.MIGRATION_TO,
00201 entitiesFrom, entitiesTo);
00202
00203 SecurityContext result = null;
00204
00205 if (isSecurityOn) {
00206 result = psm.getPolicy(sc);
00207
00208 if (!result.isMigration()) {
00209 ProActiveLogger.getLogger(Loggers.SECURITY)
00210 .info("NOTE : Security manager forbids the migration");
00211 return this;
00212 }
00213 } else {
00214
00215 result = runtimeDestination.getPolicy(sc);
00216
00217 if (!result.isMigration()) {
00218 ProActiveLogger.getLogger(Loggers.SECURITY)
00219 .info("NOTE : Security manager forbids the migration");
00220 return this;
00221 }
00222 }
00223 } catch (SecurityNotAvailableException e1) {
00224 bodyLogger.debug("Security not available");
00225 e1.printStackTrace();
00226 } catch (IOException e) {
00227 e.printStackTrace();
00228 }
00229 }
00230 nodeURL = node.getNodeInformation().getURL();
00231
00232
00233 blockCommunication();
00234
00235
00236 savedID = bodyID;
00237 if (byCopy) {
00238
00239
00240 bodyID = null;
00241 }
00242
00243
00244
00245 if (this.isSecurityOn) {
00246 openedSessions = psm.getOpenedConnexion();
00247 }
00248
00249
00250 migratedBody = migrationManager.migrateTo(node, this);
00251 if (isSecurityOn) {
00252 this.internalBodySecurity.setDistantBody(migratedBody);
00253 }
00254
00255
00256
00257
00258
00259 if (this.ftmanager != null) {
00260 this.ftmanager.updateLocationAtServer(savedID, migratedBody);
00261 }
00262
00263 } catch (MigrationException e) {
00264 openedSessions = null;
00265 nodeURL = saveNodeURL;
00266 bodyID = savedID;
00267 localBodyStrategy.getFuturePool().unsetMigrationTag();
00268 if (this.isSecurityOn) {
00269 this.internalBodySecurity.setDistantBody(null);
00270 }
00271 acceptCommunication();
00272 throw e;
00273 }
00274
00275 if (!byCopy) {
00276 this.migrationManager.changeBodyAfterMigration(this,migratedBody);
00277 activityStopped();
00278
00279 } else {
00280 bodyID = savedID;
00281 nodeURL = saveNodeURL;
00282 }
00283 acceptCommunication();
00284
00285 return migratedBody;
00286 }
00287
00288
00289
00290
00291 private void writeObject(java.io.ObjectOutputStream out)
00292 throws java.io.IOException {
00293 if (migrationLogger.isDebugEnabled()) {
00294 migrationLogger.debug("stream = " + out);
00295 }
00296 out.defaultWriteObject();
00297 }
00298
00299 private void readObject(java.io.ObjectInputStream in)
00300 throws java.io.IOException, ClassNotFoundException {
00301 if (migrationLogger.isDebugEnabled()) {
00302 migrationLogger.debug("stream = " + in);
00303 }
00304 in.defaultReadObject();
00305 hasJustMigrated = true;
00306 if (this.isSecurityOn) {
00307 internalBodySecurity = new InternalBodySecurity(null);
00308 psm.setBody(this);
00309 }
00310 }
00311
00312
00313
00314
00315 public long getNextSequenceID() {
00316 return localBodyStrategy.getNextSequenceID();
00317 }
00318 }