00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.p2p.loadbalancer;
00032
00033 import java.io.BufferedReader;
00034 import java.io.IOException;
00035 import java.io.InputStreamReader;
00036 import java.util.Iterator;
00037 import java.util.Random;
00038 import java.util.Vector;
00039
00040 import org.objectweb.proactive.ActiveObjectCreationException;
00041 import org.objectweb.proactive.Body;
00042 import org.objectweb.proactive.ProActive;
00043 import org.objectweb.proactive.ProActiveInternalObject;
00044 import org.objectweb.proactive.RunActive;
00045 import org.objectweb.proactive.Service;
00046 import org.objectweb.proactive.core.exceptions.NonFunctionalException;
00047 import org.objectweb.proactive.core.node.Node;
00048 import org.objectweb.proactive.core.node.NodeException;
00049 import org.objectweb.proactive.core.node.NodeFactory;
00050 import org.objectweb.proactive.loadbalancing.LoadBalancer;
00051 import org.objectweb.proactive.loadbalancing.LoadBalancingConstants;
00052 import org.objectweb.proactive.loadbalancing.LoadMonitor;
00053 import org.objectweb.proactive.loadbalancing.metrics.CPURanking.LinuxCPURanking;
00054 import org.objectweb.proactive.p2p.service.P2PService;
00055 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00056
00057 public class P2PLoadBalancer extends LoadBalancer implements RunActive,ProActiveInternalObject{
00058 static int MAX_KNOWN_PEERS = 10;
00059 static long MAX_DISTANCE = 100;
00060 protected String balancerName;
00061 protected Random randomizer;
00062 protected P2PService p2pService;
00063 protected Vector acquaintances, forBalancing, forStealing;
00064 protected P2PLoadBalancer myThis;
00065
00066 protected double ranking;
00067
00068
00069 public P2PLoadBalancer() {}
00070
00071 protected void addToBalanceList(int n) {
00072 int i=0;
00073 Iterator it = acquaintances.iterator();
00074 while (i < n && it.hasNext()) {
00075 String itAddress = null;
00076 try {
00077 P2PService oService = (P2PService) it.next();
00078 itAddress = oService.getAddress().stringValue();
00079 itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00080 P2PLoadBalancer oLB = (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00081
00082 if (forBalancing.indexOf(oLB) < 0) {
00083 long distance = ping(itAddress);
00084 if (distance < MAX_DISTANCE) {
00085 forBalancing.add(oLB);
00086 i++;
00087 }
00088 }
00089 } catch (ActiveObjectCreationException e) {
00090 logger.error("[P2PLB] ActiveObjectCreationException");
00091 } catch (IOException e) {
00092 logger.error("[P2PLB] IOException");
00093 } catch (NonFunctionalException e) {
00094 logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00095 }
00096 }
00097 if (i >= n) return;
00098
00099
00100
00101 it = acquaintances.iterator();
00102 while (i < n && it.hasNext()) {
00103 try {
00104 P2PService oService = (P2PService) it.next();
00105 String itAddress = oService.getAddress().stringValue();
00106 itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00107 P2PLoadBalancer oLB = (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00108
00109 if (!forBalancing.contains(oLB)) {
00110 forBalancing.add(oLB);
00111 i++;
00112 }
00113 } catch (ActiveObjectCreationException e) {
00114 logger.error("[P2PLB] ActiveObjectCreationException");
00115 } catch (IOException e) {
00116 logger.error("[P2PLB] IOException");
00117 } catch (NonFunctionalException e) {
00118 logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00119 }
00120 }
00121 }
00122
00123
00124 protected void addToStealList(int n) {
00125 int i=0;
00126 Iterator it = acquaintances.iterator();
00127 while (i < n && it.hasNext()) {
00128 String itAddress = null;
00129 try {
00130 P2PService oService = (P2PService) it.next();
00131 itAddress = oService.getAddress().stringValue();
00132 itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/robinhood";
00133 P2PLoadBalancer oLB = (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00134
00135 if (!forStealing.contains(oLB)) {
00136 long distance = ping(itAddress);
00137 if (distance < MAX_DISTANCE ) {
00138 forStealing.add(oLB);
00139 i++;
00140 }
00141 }
00142 } catch (ActiveObjectCreationException e) {
00143 logger.error("[P2PLB] ActiveObjectCreationException");
00144 } catch (IOException e) {
00145 logger.error("[P2PLB] IOException");
00146 } catch (NonFunctionalException e) {
00147 logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00148 }
00149 }
00150 if (i >= n) return;
00151
00152
00153
00154 it = acquaintances.iterator();
00155 while (i < n && it.hasNext()) {
00156 String itAddress = null;
00157 try {
00158 P2PService oService = (P2PService) it.next();
00159 itAddress = oService.getAddress().stringValue();
00160 itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00161 P2PLoadBalancer oLB = (P2PLoadBalancer) ProActive.lookupActive(P2PLoadBalancer.class.getName(),itAddress);
00162
00163 if (!forStealing.contains(oLB)) {
00164 forBalancing.add(oLB);
00165 i++;
00166 }
00167 } catch (ActiveObjectCreationException e) {
00168 logger.error("[P2PLB] ActiveObjectCreationException");
00169 } catch (IOException e) {
00170 logger.error("[P2PLB] IOException");
00171 } catch (NonFunctionalException e) {
00172 logger.error("[P2PLoadBalancing] Trying to reach a non-existing peer from "+myNode.getNodeInformation().getHostName());
00173 }
00174 }
00175
00176
00177 }
00178
00179 public double getRanking() {
00180 return this.ranking;
00181 }
00182
00183 protected long ping(String nodeAddress) {
00184
00185 long timeResp = Long.MAX_VALUE;
00186 String itAddress = new String(nodeAddress.substring(0,nodeAddress.lastIndexOf("/")));
00187 if (itAddress.lastIndexOf(':') >= itAddress.length()-6) itAddress = itAddress.substring(0,itAddress.lastIndexOf(':'));
00188 if (itAddress.lastIndexOf('/') >= 0) itAddress = itAddress.substring(itAddress.lastIndexOf('/')+1);
00189
00190 BufferedReader in = null;
00191 Runtime rtime = Runtime.getRuntime();
00192 Process s;
00193 try {
00194 s = rtime.exec("/bin/ping -c 3 -l 2 -q " + itAddress);
00195 in = new BufferedReader(new InputStreamReader(s.getInputStream()));
00196 String line;
00197 while ((line = in.readLine()) != null) {
00198 if (line.indexOf('=') > 0) {
00199 line = line.substring(line.indexOf('=')+2,line.indexOf("ms")-1);
00200
00201 String pingStats[] = line.split("/");
00202 timeResp = 1+Math.round(Double.parseDouble(pingStats[1]));
00203 }
00204 }
00205 in.close();
00206 } catch (IOException e) {
00207 logger.error("[P2PLB] PING ERROR! ");
00208 }
00209 return timeResp;
00210 }
00215 public void startBalancing() {
00216 int size = forBalancing.size();
00217 if (size < 1) return;
00218
00219 int badRemote = 0;
00220
00221 int first = randomizer.nextInt(size);
00222 for (int i = 0; i < LoadBalancingConstants.SUBSET_SIZE && size > 0; i++) {
00223 P2PLoadBalancer remoteP2Plb = ((P2PLoadBalancer) forBalancing.get((first+i)%size));
00224 try {
00225 remoteP2Plb.getActiveObjectsFrom(myThis,ranking);
00226 } catch (NonFunctionalException e) {
00227 badRemote++;
00228 forStealing.remove((first+i)%size);
00229 size--;
00230 }
00231 }
00232 if (badRemote > 0) addToStealList(badRemote);
00233 }
00234
00235 protected void getActiveObjectsFrom (P2PLoadBalancer remoteBalancer, double remoteRanking){
00236 if (remoteRanking < ranking * LoadBalancingConstants.BALANCE_FACTOR) {
00237 remoteBalancer.sendActiveObjectsTo(myNode);
00238 }
00239 }
00240
00245 public void stealWork() {
00246 int size = forStealing.size();
00247 if (size < 1) return;
00248
00249 int badRemote = 0;
00250 int first = randomizer.nextInt(size);
00251 for (int i = 0; i < LoadBalancingConstants.NEIGHBORS_TO_STEAL && size > 0; i++) {
00252 P2PLoadBalancer remoteP2Plb = ((P2PLoadBalancer) forStealing.get((first+i)%size));
00253 try {
00254 remoteP2Plb.sendActiveObjectsTo(myNode,ranking);
00255 } catch (NonFunctionalException e) {
00256 badRemote++;
00257 forStealing.remove((first+i)%size);
00258 size--;
00259 }
00260 }
00261 if (badRemote > 0) addToStealList(badRemote);
00262 }
00263
00264 public void sendActiveObjectsTo(Node remoteNode, double remoteRanking) {
00265
00266 if (this.ranking < remoteRanking * LoadBalancingConstants.STEAL_FACTOR) {
00267 sendActiveObjectsTo(remoteNode);
00268 }
00269 }
00270
00271 public void runActivity(Body body) {
00272
00273 this.myThis = (P2PLoadBalancer) ProActive.getStubOnThis();
00274 this.balancerName = "robinhood";
00275
00276
00277 try {
00278 String itAddress = body.getNodeURL();
00279 itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+P2PConstants.SHARED_NODE_NAME+"_0";
00280 this.myNode = NodeFactory.getNode(itAddress);
00281 } catch (NodeException e) {
00282 e.printStackTrace();
00283 }
00284
00285
00286 try {
00287 String itAddress = body.getNodeURL();
00288 itAddress = itAddress.substring(0,itAddress.lastIndexOf("/"))+"/"+this.balancerName;
00289 ProActive.register(myThis,itAddress);
00290 } catch (IOException e) {
00291 e.printStackTrace();
00292 }
00293
00294 Service service = new Service(body);
00295
00296 while (body.isActive()) {
00297 service.blockingServeOldest();
00298 }
00299 }
00300
00301 public void killMePlease() {
00302 lm.killMePlease();
00303 ProActive.terminateActiveObject(myThis,true);
00304 }
00305
00306 public void init() {
00307 this.forBalancing = new Vector(MAX_KNOWN_PEERS);
00308 this.forStealing = new Vector(MAX_KNOWN_PEERS);
00309 this.randomizer = new Random();
00310
00311 LinuxCPURanking thisCPURanking = new LinuxCPURanking();
00312 ranking = thisCPURanking.getRanking();
00313
00314 try {
00315 this.acquaintances = ((P2PService) P2PService.getLocalP2PService()).getAcquaintanceList();
00316 } catch (Exception e) {
00317 e.printStackTrace();
00318 }
00319
00320
00321 this.addToBalanceList(MAX_KNOWN_PEERS);
00322 this.addToStealList(MAX_KNOWN_PEERS);
00323
00324
00325 lm = new LoadMonitor(myThis,thisCPURanking);
00326 new Thread(lm).start();
00327
00328 }
00329
00330 }