00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030  
00031 package org.objectweb.proactive.p2p.loadbalancer;
00032 
00033 import java.io.BufferedReader;
00034 import java.io.IOException;
00035 import java.io.InputStreamReader;
00036 import java.util.Iterator;
00037 import java.util.Random;
00038 import java.util.Vector;
00039 
00040 import org.objectweb.proactive.ActiveObjectCreationException;
00041 import org.objectweb.proactive.Body;
00042 import org.objectweb.proactive.ProActive;
00043 import org.objectweb.proactive.ProActiveInternalObject;
00044 import org.objectweb.proactive.RunActive;
00045 import org.objectweb.proactive.Service;
00046 import org.objectweb.proactive.core.exceptions.NonFunctionalException;
00047 import org.objectweb.proactive.core.node.Node;
00048 import org.objectweb.proactive.core.node.NodeException;
00049 import org.objectweb.proactive.core.node.NodeFactory;
00050 import org.objectweb.proactive.loadbalancing.LoadBalancer;
00051 import org.objectweb.proactive.loadbalancing.LoadBalancingConstants;
00052 import org.objectweb.proactive.loadbalancing.LoadMonitor;
00053 import org.objectweb.proactive.loadbalancing.metrics.CPURanking.LinuxCPURanking;
00054 import org.objectweb.proactive.p2p.service.P2PService;
00055 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00056 
00057 public class P2PLoadBalancer extends LoadBalancer implements RunActive,ProActiveInternalObject{
00058         static int MAX_KNOWN_PEERS = 10;
00059         static long MAX_DISTANCE = 100;
00060         protected String balancerName;
00061     protected Random randomizer;
00062         protected P2PService p2pService;
00063     protected Vector acquaintances, forBalancing, forStealing;
00064     protected P2PLoadBalancer myThis;
00065     
00066     protected double ranking;
00067     
00068     
00069     public P2PLoadBalancer() {}
00070 
00071     protected void addToBalanceList(int n) {
00072         int i=0;
00073         Iterator it = acquaintances.iterator();
00074         while (i < n && it.hasNext()) {
00075                 String itAddress = null;
00076                 try {
00077                         P2PService oService = (P2PService) it.next();
00078                         itAddress = oService.getAddress().stringValue();
00079                         itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00080                                 P2PLoadBalancer oLB =  (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00081                                 
00082                                 if (forBalancing.indexOf(oLB) < 0) {
00083                                         long distance = ping(itAddress);
00084                                         if (distance < MAX_DISTANCE) {
00085                                                 forBalancing.add(oLB);
00086                                                 i++;
00087                                                 }
00088                                         }
00089                                 } catch (ActiveObjectCreationException e) {
00090                                 logger.error("[P2PLB] ActiveObjectCreationException");
00091                                 } catch (IOException e) {
00092                                 logger.error("[P2PLB] IOException");
00093                                 } catch (NonFunctionalException e) {
00094                                         logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00095                                 }
00096                 }
00097         if (i >= n) return;
00098         
00099         
00100 
00101         it = acquaintances.iterator();
00102         while (i < n && it.hasNext()) {
00103                 try {
00104                         P2PService oService = (P2PService) it.next();
00105                         String itAddress = oService.getAddress().stringValue();
00106                         itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00107                                 P2PLoadBalancer oLB =  (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00108                                 
00109                                 if (!forBalancing.contains(oLB)) {
00110                                         forBalancing.add(oLB);
00111                                         i++;
00112                                         }
00113                 } catch (ActiveObjectCreationException e) {
00114                         logger.error("[P2PLB] ActiveObjectCreationException");
00115                         } catch (IOException e) {
00116                         logger.error("[P2PLB] IOException");
00117                         } catch (NonFunctionalException e) {
00118                                 logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00119                         }
00120                 }
00121     }
00122 
00123     
00124     protected void addToStealList(int n) {
00125         int i=0;
00126         Iterator it = acquaintances.iterator();
00127         while (i < n && it.hasNext()) {
00128                 String itAddress = null;
00129                 try {
00130                         P2PService oService = (P2PService) it.next();
00131                         itAddress = oService.getAddress().stringValue();
00132                         itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/robinhood";
00133                                 P2PLoadBalancer oLB =  (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00134                                 
00135                                 if (!forStealing.contains(oLB)) {
00136                                         long distance = ping(itAddress);
00137                                         if (distance < MAX_DISTANCE ) {
00138                                                 forStealing.add(oLB);
00139                                                 i++;
00140                                                 }
00141                                         }
00142                         } catch (ActiveObjectCreationException e) {
00143                         logger.error("[P2PLB] ActiveObjectCreationException");
00144                         } catch (IOException e) {
00145                         logger.error("[P2PLB] IOException");
00146                         } catch (NonFunctionalException e) {
00147                                 logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00148                         }
00149                 }
00150         if (i >= n) return;
00151         
00152         
00153 
00154         it = acquaintances.iterator();
00155         while (i < n && it.hasNext()) {
00156                 String itAddress = null;
00157                 try {
00158                         P2PService oService = (P2PService) it.next();
00159                         itAddress = oService.getAddress().stringValue();
00160                         itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00161                                 P2PLoadBalancer oLB =  (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00162                                 
00163                                 if (!forStealing.contains(oLB)) {
00164                                         forBalancing.add(oLB);
00165                                                 i++;
00166                                         }
00167                         } catch (ActiveObjectCreationException e) {
00168                         logger.error("[P2PLB] ActiveObjectCreationException");
00169                         } catch (IOException e) {
00170                         logger.error("[P2PLB] IOException");
00171                         } catch (NonFunctionalException e) {
00172                                 logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00173                         }
00174                 }
00175         
00176         
00177     }
00178 
00179     public double getRanking() {
00180         return this.ranking;
00181     }
00182     
00183     protected long ping(String nodeAddress) {
00184         
00185     long timeResp = Long.MAX_VALUE;
00186         String itAddress = new String(nodeAddress.substring(0,nodeAddress.lastIndexOf("/")));
00187         if (itAddress.lastIndexOf(':') >= itAddress.length()-6) itAddress = itAddress.substring(0,itAddress.lastIndexOf(':'));
00188         if (itAddress.lastIndexOf('/') >= 0) itAddress = itAddress.substring(itAddress.lastIndexOf('/')+1);
00189 
00190     BufferedReader in = null;
00191     Runtime rtime = Runtime.getRuntime();
00192     Process s;
00193         try {
00194                 s = rtime.exec("/bin/ping -c 3 -l 2 -q " + itAddress);
00195             in = new BufferedReader(new InputStreamReader(s.getInputStream()));
00196             String line;
00197             while ((line = in.readLine()) != null) {
00198                  if (line.indexOf('=') > 0)  {
00199                          line = line.substring(line.indexOf('=')+2,line.indexOf("ms")-1);
00200                          
00201                          String pingStats[] = line.split("/");
00202                          timeResp = 1+Math.round(Double.parseDouble(pingStats[1])); 
00203                   }
00204                 } 
00205             in.close();
00206                 } catch (IOException e) {
00207                         logger.error("[P2PLB] PING ERROR! ");
00208                 }
00209     return timeResp;
00210     }
00215     public void startBalancing() {
00216         int size = forBalancing.size();
00217         if (size < 1) return;
00218 
00219         int badRemote = 0;
00220         
00221         int first = randomizer.nextInt(size);
00222         for (int i = 0; i < LoadBalancingConstants.SUBSET_SIZE  && size > 0; i++) {
00223                 P2PLoadBalancer remoteP2Plb = ((P2PLoadBalancer) forBalancing.get((first+i)%size));
00224                 try {
00225                                 remoteP2Plb.getActiveObjectsFrom(myThis,ranking);
00226                         } catch (NonFunctionalException e) {
00227                                 badRemote++;
00228                                 forStealing.remove((first+i)%size);
00229                         size--;
00230                         }
00231         }
00232         if (badRemote > 0) addToStealList(badRemote);
00233     }
00234 
00235     protected void getActiveObjectsFrom (P2PLoadBalancer remoteBalancer, double remoteRanking){
00236                 if (remoteRanking < ranking * LoadBalancingConstants.BALANCE_FACTOR) { 
00237                         remoteBalancer.sendActiveObjectsTo(myNode);                     
00238                 }
00239         }
00240 
00245     public void stealWork() {
00246         int size = forStealing.size();
00247         if (size < 1) return;
00248 
00249         int badRemote = 0;
00250         int first = randomizer.nextInt(size);
00251         for (int i = 0; i < LoadBalancingConstants.NEIGHBORS_TO_STEAL && size > 0; i++) {
00252                 P2PLoadBalancer remoteP2Plb = ((P2PLoadBalancer) forStealing.get((first+i)%size));
00253                         try {
00254                                 remoteP2Plb.sendActiveObjectsTo(myNode,ranking);
00255                         } catch (NonFunctionalException e) {
00256                                 badRemote++;
00257                                 forStealing.remove((first+i)%size);
00258                         size--;
00259                         }
00260         }
00261         if (badRemote > 0) addToStealList(badRemote);
00262     }
00263 
00264 public void sendActiveObjectsTo(Node remoteNode, double remoteRanking) {
00265 
00266     if (this.ranking < remoteRanking * LoadBalancingConstants.STEAL_FACTOR) { 
00267         sendActiveObjectsTo(remoteNode);
00268         }
00269         }
00270 
00271 public void runActivity(Body body) {
00272 
00273         this.myThis = (P2PLoadBalancer) ProActive.getStubOnThis();
00274     this.balancerName = "robinhood";
00275     
00276     
00277     try {
00278         String itAddress = body.getNodeURL();
00279         itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+P2PConstants.SHARED_NODE_NAME+"_0";
00280         this.myNode = NodeFactory.getNode(itAddress);
00281                 } catch (NodeException e) {
00282                         e.printStackTrace();
00283                 }
00284 
00285     
00286     try {
00287                 String itAddress = body.getNodeURL();
00288                 itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00289                 ProActive.register(myThis,itAddress);
00290         } catch (IOException e) {
00291                 e.printStackTrace();
00292         }
00293 
00294         Service service = new Service(body);
00295         
00296         while (body.isActive()) {
00297             service.blockingServeOldest(); 
00298             }
00299         }
00300 
00301         public void killMePlease() {
00302                 lm.killMePlease();
00303                 ProActive.terminateActiveObject(myThis,true);
00304         }
00305 
00306         public void init() {
00307         this.forBalancing = new Vector(MAX_KNOWN_PEERS);
00308         this.forStealing = new Vector(MAX_KNOWN_PEERS);
00309         this.randomizer = new Random();
00310         
00311         LinuxCPURanking thisCPURanking = new LinuxCPURanking();
00312         ranking = thisCPURanking.getRanking();
00313 
00314                 try {
00315                         this.acquaintances = ((P2PService) P2PService.getLocalP2PService()).getAcquaintanceList();
00316                 } catch (Exception e) {
00317                         e.printStackTrace();
00318                 }
00319                 
00320             
00321             this.addToBalanceList(MAX_KNOWN_PEERS);
00322             this.addToStealList(MAX_KNOWN_PEERS);
00323 
00324             
00325             lm = new LoadMonitor(myThis,thisCPURanking);
00326             new Thread(lm).start();
00327 
00328         }
00329 
00330 }