org/objectweb/proactive/core/body/migration/MixedLocationServer.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.core.body.migration;
00032 
00033 import java.io.IOException;
00034 import java.net.UnknownHostException;
00035 import java.util.Hashtable;
00036 
00037 import org.apache.log4j.Logger;
00038 import org.objectweb.proactive.ProActive;
00039 import org.objectweb.proactive.Service;
00040 import org.objectweb.proactive.core.UniqueID;
00041 import org.objectweb.proactive.core.body.UniversalBody;
00042 import org.objectweb.proactive.core.body.request.Request;
00043 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00044 import org.objectweb.proactive.core.node.NodeFactory;
00045 import org.objectweb.proactive.core.util.UrlBuilder;
00046 import org.objectweb.proactive.core.util.log.Loggers;
00047 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00048 import org.objectweb.proactive.ext.locationserver.LocationServer;
00049 import org.objectweb.proactive.ext.security.exceptions.RenegotiateSessionException;
00050 
00051 public class MixedLocationServer implements org.objectweb.proactive.RunActive,
00052                 LocationServer {
00053         static Logger logger = ProActiveLogger.getLogger(Loggers.MIGRATION);
00054 
00058         public static final int DELAY_SAME_REPLY = 1000;
00059 
00060         private LocationMap table;
00061 
00062         private Hashtable requestTable;
00063 
00064         private String url;
00065 
00066         public MixedLocationServer() {}
00067 
00068         public MixedLocationServer(String url) {
00069                 this.url = normalizeURL(url);
00070                 this.table = new LocationMap();
00071                 this.requestTable = new Hashtable();
00072         }
00073 
00077         public void updateLocation(UniqueID i, UniversalBody s) {
00078                 updateLocation(i, s, LocationMap.CONSTANT_VERSION);
00079         }
00080 
00085         public void updateLocation(UniqueID i, UniversalBody s, int version) {
00086                 table.updateBody(i, s, version);
00087         }
00088 
00092         public UniversalBody searchObject(UniqueID id) {
00093                 //System.out.print("Searching for "+ id);
00094                 UniversalBody u = (UniversalBody) table.getBody(id);
00095                 return u;
00096         }
00097 
00105         public void runActivity(org.objectweb.proactive.Body body) {
00106 
00107                 this.register();
00108                 Service service = new Service(body);
00109 
00110                 while (body.isActive()) {
00111                         while (service.hasRequestToServe("updateLocation")) {
00112                                 service.serveOldest("updateLocation");
00113                         }
00114                         Request oldest = service.blockingRemoveOldest();
00115 
00116                         if (oldest != null) {
00117                                 synchronized (this.requestTable) {
00118                                         if (oldest.getMethodName().equals("searchObject")) {
00119                                                 // we have to verify if we have received a request of
00120                                                 // the
00121                                                 // same sender for the same ID
00122                                                 UniqueID id = (UniqueID) oldest.getParameter(0);
00123                                                 String requestID = oldest.getSender().getID()
00124                                                                 .toString()
00125                                                                 + id.toString();
00126 
00127                                                 LocationRequestInfo oldRequest = (LocationRequestInfo) requestTable
00128                                                                 .get(requestID);
00129 
00130                                                 if (oldRequest != null) {
00131                                                         int oldVersion = oldRequest.getVersion();
00132                                                         // an old version exists
00133                                                         // if we don't have the same version ==> serve this
00134                                                         // request and save it
00135                                                         int newVersion = table.getVersion(id);
00136                                                         if (oldVersion != newVersion) {
00137                                                                 service.serve(oldest);
00138                                                                 requestTable.put(requestID,
00139                                                                                 new LocationRequestInfo(newVersion,
00140                                                                                                 System.currentTimeMillis(),
00141                                                                                                 true));
00142 
00143                                                         } else {
00144                                                                 // we still have the same version
00145                                                                 // if the previous call hasn't been served ==>
00146                                                                 // have a look to it's creation time
00147                                                                 // else put the request in the table and in the
00148                                                                 // queue
00149                                                                 if (oldRequest.hasBeenServed()) {
00150                                                                         requestTable.put(requestID,
00151                                                                                         new LocationRequestInfo(newVersion,
00152                                                                                                         System.currentTimeMillis(),
00153                                                                                                         false));
00154                                                                         try {
00155                                                                                 body.receiveRequest(oldest);
00156                                                                         } catch (IOException e) {
00157                                                                                 e.printStackTrace();
00158                                                                         } catch (RenegotiateSessionException e) {
00159                                                                                 e.printStackTrace();
00160                                                                         }
00161                                                                 } else {
00162                                                                         // if the delay is gone, serve the request
00163                                                                         // else put it in the queue
00164                                                                         if (oldRequest.getCreationTime()
00165                                                                                         + DELAY_SAME_REPLY < System
00166                                                                                         .currentTimeMillis()) {
00167                                                                                 service.serve(oldest);
00168                                                                                 oldRequest.setServed(true);
00169                                                                         } else {
00170                                                                                 try {
00171                                                                                         body.receiveRequest(oldest);
00172                                                                                 } catch (IOException e) {
00173                                                                                         e.printStackTrace();
00174                                                                                 } catch (RenegotiateSessionException e) {
00175                                                                                         e.printStackTrace();
00176                                                                                 }
00177                                                                         }
00178                                                                 }
00179 
00180                                                         }
00181                                                 } else {
00182                                                         // no old request so serve the request and save it
00183                                                         service.serve(oldest);
00184 
00185                                                         int version = table.getVersion(id);
00186                                                         requestTable.put(requestID,
00187                                                                         new LocationRequestInfo(version, System
00188                                                                                         .currentTimeMillis(), true));
00189                                                 }
00190 
00191                                         } else {
00192                                                 service.serve(oldest);
00193                                         }
00194                                 }
00195                         }
00196                 }
00197         }
00198 
00199         protected String normalizeURL(String url) {
00200                 String tmp = url;
00201 
00202                 try {
00203                         tmp = UrlBuilder.checkUrl(url);
00204                 } catch (UnknownHostException e) {
00205                         e.printStackTrace();
00206                 }
00207                 return tmp;
00208         }
00209 
00210         protected void register() {
00211                 try {
00212                         logger.info("Attempt at binding : " + url);
00213                         ProActive.register(ProActive.getStubOnThis(), url);
00214                         logger.info("Location Server bound in registry : " + url);
00215                 } catch (Exception e) {
00216                         logger.fatal("Cannot bind in registry - aborting " + url);
00217                         e.printStackTrace();
00218                         return;
00219                 }
00220         }
00221 
00222         public static void main(String[] args) {
00223                 ProActiveConfiguration.load();
00224                 String name = ProActiveConfiguration.getLocationServerRmi();
00225 
00226                 Object[] arg = new Object[1];
00227                 arg[0] = name;
00228 
00229                 MixedLocationServer server = null;
00230 
00231                 try {
00232 
00233                         if (args.length == 1) {
00234                                 server = (MixedLocationServer) ProActive.newActive(
00235                                                 MixedLocationServer.class.getName(), arg, NodeFactory
00236                                                                 .getNode(args[0]));
00237                         } else {
00238                                 server = (MixedLocationServer) ProActive.newActive(
00239                                                 MixedLocationServer.class.getName(), arg);
00240                         }
00241                 } catch (Exception e) {
00242                         e.printStackTrace();
00243                 }
00244         }
00245 
00246         //
00247         // -- INNER CLASSES -----------------------------------------------
00248         //
00249 
00250         protected class LocationMap {
00251                 public static final int CONSTANT_VERSION = 0;
00252 
00253                 public static final int NO_VERSION_FOUND = -1;
00254 
00255                 public static final int MIGRATING_OUT = 2000;
00256 
00257                 private Hashtable idToBodyMap;
00258 
00259                 public LocationMap() {
00260                         idToBodyMap = new Hashtable();
00261                 }
00262 
00263                 public void updateBody(UniqueID id, UniversalBody body, int version) {
00264                         synchronized (this.idToBodyMap) {
00265                                 // remove old reference if exists and if is an older version
00266                                 WrappedLocationBody wrappedBody = (WrappedLocationBody) idToBodyMap
00267                                                 .get(id);
00268 
00269                                 if (wrappedBody == null) {
00270                                         // add new reference
00271                                         idToBodyMap.put(id, new WrappedLocationBody(body, version));
00272                                 } else if (wrappedBody.getVersion() <= version) {
00273                                         idToBodyMap.remove(id);
00274 
00275                                         // add new reference
00276                                         idToBodyMap.put(id, new WrappedLocationBody(body, version));
00277                                 }
00278                         }
00279                 }
00280 
00281                 public UniversalBody getBody(UniqueID id) {
00282                         Object o = null;
00283                         if (id != null) {
00284                                 synchronized (this.idToBodyMap) {
00285                                         o = idToBodyMap.get(id);
00286                                         if (o != null) {
00287                                                 /*
00288                                                 WrappedLocationBody wrappedBody = (WrappedLocationBody) o;
00289                                                 
00290                                                 if (wrappedBody.isMigrating()) {
00291                                                         try {
00292                                                                 wait(MIGRATING_OUT);
00293                                                         } catch (InterruptedException e) {
00294                                                                 e.printStackTrace();
00295                                                         }
00296                                                 }
00297                                                 */
00298                                                 return ((WrappedLocationBody) o).getBody();
00299                                         }
00300                                 }
00301                         }
00302 
00303                         return (UniversalBody) o;
00304                 }
00305 
00306                 public int getVersion(UniqueID id) {
00307                         if (id != null) {
00308                                 synchronized (this.idToBodyMap) {
00309                                         Object o = idToBodyMap.get(id);
00310                                         if (o != null) {
00311                                                 WrappedLocationBody wrappedBody = (WrappedLocationBody) o;
00312                                                 return wrappedBody.getVersion();
00313                                         }
00314                                 }
00315                         }
00316                         return NO_VERSION_FOUND;
00317 
00318                 }
00319         }
00320 
00321         protected class WrappedLocationBody {
00322                 private int version;
00323 
00324                 private UniversalBody wrappedBody;
00325 
00326                 private boolean isMigrating;
00327 
00328                 public WrappedLocationBody(UniversalBody body, int version) {
00329                         this.wrappedBody = body;
00330                         this.version = version;
00331                         this.isMigrating = false;
00332                 }
00333 
00334                 public WrappedLocationBody(UniversalBody body, int version,
00335                                 boolean isMigrating) {
00336                         this.wrappedBody = body;
00337                         this.version = version;
00338                         this.isMigrating = isMigrating;
00339                 }
00340 
00341                 public int getVersion() {
00342                         return this.version;
00343                 }
00344 
00345                 public UniversalBody getBody() {
00346                         return this.wrappedBody;
00347                 }
00348 
00349                 public boolean isMigrating() {
00350                         return this.isMigrating;
00351                 }
00352 
00353         }
00354 
00355         protected class LocationRequestInfo {
00356                 private int version;
00357 
00358                 private long creationTime;
00359 
00360                 private boolean served;
00361 
00362                 public LocationRequestInfo(int version, long creationTime,
00363                                 boolean served) {
00364                         this.version = version;
00365                         this.creationTime = creationTime;
00366                         this.served = served;
00367                 }
00368 
00369                 public boolean hasBeenServed() {
00370                         return this.served;
00371                 }
00372 
00373                 public int getVersion() {
00374                         return this.version;
00375                 }
00376 
00377                 public long getCreationTime() {
00378                         return this.creationTime;
00379                 }
00380 
00381                 public void setServed(boolean served) {
00382                         this.served = served;
00383                 }
00384         }
00385 
00386 }

Generated on Mon Jan 22 15:16:06 2007 for ProActive by  doxygen 1.5.1