org/objectweb/proactive/p2p/loadbalancer/P2PLoadBalancer.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.p2p.loadbalancer;
00032 
00033 import java.io.BufferedReader;
00034 import java.io.IOException;
00035 import java.io.InputStreamReader;
00036 import java.util.Iterator;
00037 import java.util.Random;
00038 import java.util.Vector;
00039 
00040 import org.objectweb.proactive.ActiveObjectCreationException;
00041 import org.objectweb.proactive.Body;
00042 import org.objectweb.proactive.ProActive;
00043 import org.objectweb.proactive.ProActiveInternalObject;
00044 import org.objectweb.proactive.RunActive;
00045 import org.objectweb.proactive.Service;
00046 import org.objectweb.proactive.core.exceptions.NonFunctionalException;
00047 import org.objectweb.proactive.core.node.Node;
00048 import org.objectweb.proactive.core.node.NodeException;
00049 import org.objectweb.proactive.core.node.NodeFactory;
00050 import org.objectweb.proactive.loadbalancing.LoadBalancer;
00051 import org.objectweb.proactive.loadbalancing.LoadBalancingConstants;
00052 import org.objectweb.proactive.loadbalancing.LoadMonitor;
00053 import org.objectweb.proactive.loadbalancing.metrics.CPURanking.LinuxCPURanking;
00054 import org.objectweb.proactive.p2p.service.P2PService;
00055 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00056 
00057 public class P2PLoadBalancer extends LoadBalancer implements RunActive,ProActiveInternalObject{
00058         static int MAX_KNOWN_PEERS = 10;
00059         static long MAX_DISTANCE = 100;
00060         protected String balancerName;
00061     protected Random randomizer;
00062         protected P2PService p2pService;
00063     protected Vector acquaintances, forBalancing, forStealing;
00064     protected P2PLoadBalancer myThis;
00065     
00066     protected double ranking;
00067     
00068     
00069     public P2PLoadBalancer() {}
00070 
00071     protected void addToBalanceList(int n) {
00072         int i=0;
00073         Iterator it = acquaintances.iterator();
00074         while (i < n && it.hasNext()) {
00075                 String itAddress = null;
00076                 try {
00077                         P2PService oService = (P2PService) it.next();
00078                         itAddress = oService.getAddress().stringValue();
00079                         itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00080                                 P2PLoadBalancer oLB =  (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00081                                 
00082                                 if (forBalancing.indexOf(oLB) < 0) {
00083                                         long distance = ping(itAddress);
00084                                         if (distance < MAX_DISTANCE) {
00085                                                 forBalancing.add(oLB);
00086                                                 i++;
00087                                                 }
00088                                         }
00089                                 } catch (ActiveObjectCreationException e) {
00090                                 logger.error("[P2PLB] ActiveObjectCreationException");
00091                                 } catch (IOException e) {
00092                                 logger.error("[P2PLB] IOException");
00093                                 } catch (NonFunctionalException e) {
00094                                         logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00095                                 }
00096                 }
00097         if (i >= n) return;
00098         
00099         // Still missing acquaintances
00100 
00101         it = acquaintances.iterator();
00102         while (i < n && it.hasNext()) {
00103                 try {
00104                         P2PService oService = (P2PService) it.next();
00105                         String itAddress = oService.getAddress().stringValue();
00106                         itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00107                                 P2PLoadBalancer oLB =  (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00108                                 
00109                                 if (!forBalancing.contains(oLB)) {
00110                                         forBalancing.add(oLB);
00111                                         i++;
00112                                         }
00113                 } catch (ActiveObjectCreationException e) {
00114                         logger.error("[P2PLB] ActiveObjectCreationException");
00115                         } catch (IOException e) {
00116                         logger.error("[P2PLB] IOException");
00117                         } catch (NonFunctionalException e) {
00118                                 logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00119                         }
00120                 }
00121     }
00122 
00123     
00124     protected void addToStealList(int n) {
00125         int i=0;
00126         Iterator it = acquaintances.iterator();
00127         while (i < n && it.hasNext()) {
00128                 String itAddress = null;
00129                 try {
00130                         P2PService oService = (P2PService) it.next();
00131                         itAddress = oService.getAddress().stringValue();
00132                         itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/robinhood";
00133                                 P2PLoadBalancer oLB =  (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00134                                 
00135                                 if (!forStealing.contains(oLB)) {
00136                                         long distance = ping(itAddress);
00137                                         if (distance < MAX_DISTANCE ) {
00138                                                 forStealing.add(oLB);
00139                                                 i++;
00140                                                 }
00141                                         }
00142                         } catch (ActiveObjectCreationException e) {
00143                         logger.error("[P2PLB] ActiveObjectCreationException");
00144                         } catch (IOException e) {
00145                         logger.error("[P2PLB] IOException");
00146                         } catch (NonFunctionalException e) {
00147                                 logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00148                         }
00149                 }
00150         if (i >= n) return;
00151         
00152         // Still missing acquaintances
00153 
00154         it = acquaintances.iterator();
00155         while (i < n && it.hasNext()) {
00156                 String itAddress = null;
00157                 try {
00158                         P2PService oService = (P2PService) it.next();
00159                         itAddress = oService.getAddress().stringValue();
00160                         itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00161                                 P2PLoadBalancer oLB =  (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00162                                 
00163                                 if (!forStealing.contains(oLB)) {
00164                                         forBalancing.add(oLB);
00165                                                 i++;
00166                                         }
00167                         } catch (ActiveObjectCreationException e) {
00168                         logger.error("[P2PLB] ActiveObjectCreationException");
00169                         } catch (IOException e) {
00170                         logger.error("[P2PLB] IOException");
00171                         } catch (NonFunctionalException e) {
00172                                 logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00173                         }
00174                 }
00175         
00176         
00177     }
00178 
00179     public double getRanking() {
00180         return this.ranking;
00181     }
00182     
00183     protected long ping(String nodeAddress) {
00184         // nodeAddress come in format "protocol://host:port/nodename"
00185     long timeResp = Long.MAX_VALUE;
00186         String itAddress = new String(nodeAddress.substring(0,nodeAddress.lastIndexOf("/")));
00187         if (itAddress.lastIndexOf(':') >= itAddress.length()-6) itAddress = itAddress.substring(0,itAddress.lastIndexOf(':'));
00188         if (itAddress.lastIndexOf('/') >= 0) itAddress = itAddress.substring(itAddress.lastIndexOf('/')+1);
00189 
00190     BufferedReader in = null;
00191     Runtime rtime = Runtime.getRuntime();
00192     Process s;
00193         try {
00194                 s = rtime.exec("/bin/ping -c 3 -l 2 -q " + itAddress);
00195             in = new BufferedReader(new InputStreamReader(s.getInputStream()));
00196             String line;
00197             while ((line = in.readLine()) != null) {
00198                  if (line.indexOf('=') > 0)  {
00199                          line = line.substring(line.indexOf('=')+2,line.indexOf("ms")-1);
00200                          // rtt min/avg/max/mdev = 0.246/0.339/0.413/0.069 ms, pipe 3
00201                          String pingStats[] = line.split("/");
00202                          timeResp = 1+Math.round(Double.parseDouble(pingStats[1])); 
00203                   }
00204                 } 
00205             in.close();
00206                 } catch (IOException e) {
00207                         logger.error("[P2PLB] PING ERROR! ");
00208                 }
00209     return timeResp;
00210     }
00215     public void startBalancing() {
00216         int size = forBalancing.size();
00217         if (size < 1) return;
00218 
00219         int badRemote = 0;
00220         
00221         int first = randomizer.nextInt(size);
00222         for (int i = 0; i < LoadBalancingConstants.SUBSET_SIZE  && size > 0; i++) {
00223                 P2PLoadBalancer remoteP2Plb = ((P2PLoadBalancer) forBalancing.get((first+i)%size));
00224                 try {
00225                                 remoteP2Plb.getActiveObjectsFrom(myThis,ranking);
00226                         } catch (NonFunctionalException e) {
00227                                 badRemote++;
00228                                 forStealing.remove((first+i)%size);
00229                         size--;
00230                         }
00231         }
00232         if (badRemote > 0) addToStealList(badRemote);
00233     }
00234 
00235     protected void getActiveObjectsFrom (P2PLoadBalancer remoteBalancer, double remoteRanking){
00236                 if (remoteRanking < ranking * LoadBalancingConstants.BALANCE_FACTOR) { // I'm better than him!
00237                         remoteBalancer.sendActiveObjectsTo(myNode);                     
00238                 }
00239         }
00240 
00245     public void stealWork() {
00246         int size = forStealing.size();
00247         if (size < 1) return;
00248 
00249         int badRemote = 0;
00250         int first = randomizer.nextInt(size);
00251         for (int i = 0; i < LoadBalancingConstants.NEIGHBORS_TO_STEAL && size > 0; i++) {
00252                 P2PLoadBalancer remoteP2Plb = ((P2PLoadBalancer) forStealing.get((first+i)%size));
00253                         try {
00254                                 remoteP2Plb.sendActiveObjectsTo(myNode,ranking);
00255                         } catch (NonFunctionalException e) {
00256                                 badRemote++;
00257                                 forStealing.remove((first+i)%size);
00258                         size--;
00259                         }
00260         }
00261         if (badRemote > 0) addToStealList(badRemote);
00262     }
00263 
00264 public void sendActiveObjectsTo(Node remoteNode, double remoteRanking) {
00265 
00266     if (this.ranking < remoteRanking * LoadBalancingConstants.STEAL_FACTOR) { // it's better than me!
00267         sendActiveObjectsTo(remoteNode);
00268         }
00269         }
00270 
00271 public void runActivity(Body body) {
00272 
00273         this.myThis = (P2PLoadBalancer) ProActive.getStubOnThis();
00274     this.balancerName = "robinhood";
00275     
00276     /* Updating the node reference */
00277     try {
00278         String itAddress = body.getNodeURL();
00279         itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+P2PConstants.SHARED_NODE_NAME+"_0";
00280         this.myNode = NodeFactory.getNode(itAddress);
00281                 } catch (NodeException e) {
00282                         e.printStackTrace();
00283                 }
00284 
00285     // registering myself
00286     try {
00287                 String itAddress = body.getNodeURL();
00288                 itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00289                 ProActive.register(myThis,itAddress);
00290         } catch (IOException e) {
00291                 e.printStackTrace();
00292         }
00293 
00294         Service service = new Service(body);
00295         
00296         while (body.isActive()) {
00297             service.blockingServeOldest(); 
00298             }
00299         }
00300 
00301         public void killMePlease() {
00302                 lm.killMePlease();
00303                 ProActive.terminateActiveObject(myThis,true);
00304         }
00305 
00306         public void init() {
00307         this.forBalancing = new Vector(MAX_KNOWN_PEERS);
00308         this.forStealing = new Vector(MAX_KNOWN_PEERS);
00309         this.randomizer = new Random();
00310         /* We update the ranking */
00311         LinuxCPURanking thisCPURanking = new LinuxCPURanking();
00312         ranking = thisCPURanking.getRanking();
00313 
00314                 try {
00315                         this.acquaintances = ((P2PService) P2PService.getLocalP2PService()).getAcquaintanceList();
00316                 } catch (Exception e) {
00317                         e.printStackTrace();
00318                 }
00319                 
00320             /* We update the lists */
00321             this.addToBalanceList(MAX_KNOWN_PEERS);
00322             this.addToStealList(MAX_KNOWN_PEERS);
00323 
00324             // by now we use only P2P over Linux
00325             lm = new LoadMonitor(myThis,thisCPURanking);
00326             new Thread(lm).start();
00327 
00328         }
00329 
00330 }

Generated on Mon Jan 22 15:16:11 2007 for ProActive by  doxygen 1.5.1