org/objectweb/proactive/loadbalancing/LoadBalancer.java

00001 /* 
00002  * ################################################################
00003  * 
00004  * ProActive: The Java(TM) library for Parallel, Distributed, 
00005  *            Concurrent computing with Security and Mobility
00006  * 
00007  * Copyright (C) 1997-2007 INRIA/University of Nice-Sophia Antipolis
00008  * Contact: proactive@objectweb.org
00009  * 
00010  * This library is free software; you can redistribute it and/or
00011  * modify it under the terms of the GNU Lesser General Public
00012  * License as published by the Free Software Foundation; either
00013  * version 2.1 of the License, or any later version.
00014  *  
00015  * This library is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018  * Lesser General Public License for more details.
00019  * 
00020  * You should have received a copy of the GNU Lesser General Public
00021  * License along with this library; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00023  * USA
00024  *  
00025  *  Initial developer(s):               The ProActive Team
00026  *                        http://www.inria.fr/oasis/ProActive/contacts.html
00027  *  Contributor(s): 
00028  * 
00029  * ################################################################
00030  */ 
00031 package org.objectweb.proactive.loadbalancing;
00032 
00033 import java.util.ArrayList;
00034 import java.util.Iterator;
00035 import java.util.Random;
00036 import org.apache.log4j.Logger;
00037 import org.objectweb.proactive.Body;
00038 import org.objectweb.proactive.ProActive;
00039 import org.objectweb.proactive.ProActiveInternalObject;
00040 import org.objectweb.proactive.core.body.BodyMap;
00041 import org.objectweb.proactive.core.body.LocalBodyStore;
00042 import org.objectweb.proactive.core.body.migration.MigrationException;
00043 import org.objectweb.proactive.core.exceptions.NonFunctionalException;
00044 import org.objectweb.proactive.core.node.Node;
00045 import org.objectweb.proactive.core.node.NodeException;
00046 import org.objectweb.proactive.core.node.NodeFactory;
00047 import org.objectweb.proactive.core.util.log.Loggers;
00048 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00049 import org.objectweb.proactive.loadbalancing.metrics.Metric;
00050 import org.objectweb.proactive.loadbalancing.metrics.MetricFactory;
00051 
00052 
00069 public class LoadBalancer implements ProActiveInternalObject {
00070     public static Logger logger = ProActiveLogger.getLogger(Loggers.LOAD_BALANCING);
00071     
00072     protected LoadMonitor lm;
00073     protected Metric metric;
00074     protected Node myNode;
00075     protected ArrayList<LoadBalancer> loadBalancers;
00076     
00077     private static final int STEAL = 1;
00078     private static final int BALANCE = 2;
00079    
00080         protected String balancerName;
00081     protected Random randomizer;
00082         protected LoadBalancer myThis;
00083         protected InformationRecover informationRecover;
00084 
00085 
00086         public LoadBalancer(){}
00087         
00088         public LoadBalancer(MetricFactory mf){
00089         this.randomizer = new Random();
00090         this.metric = mf.getNewMetric();
00091 
00092        
00093         }
00094         
00095 
00096     public void startBalancing() {
00097         internalAction(BALANCE);
00098     }
00099 
00100     public void stealWork() {
00101         internalAction(STEAL);
00102     }
00103 
00104     
00105     public void sendActiveObjectsTo(Node remoteNode, double remoteRanking) {
00106         if (this.metric == null)
00107                 return;
00108         if (this.metric.getRanking() < remoteRanking * LoadBalancingConstants.STEAL_FACTOR) { // it's better than me!
00109                 sendActiveObjectsTo(remoteNode);
00110                 }
00111         }
00112     
00113     protected void getActiveObjectsFrom (LoadBalancer remoteBalancer, double remoteRanking){
00114         if (this.metric == null)
00115                 return;
00116                 if (remoteRanking < this.metric.getRanking() * LoadBalancingConstants.BALANCE_FACTOR) { // I'm better than him!
00117                         remoteBalancer.sendActiveObjectsTo(myNode);                     
00118                 }
00119         }
00120     
00128     public void sendActiveObjectsTo(Node destNode) {
00129         
00130         if (NodeFactory.isNodeLocal(destNode)) {
00131             return;
00132         }
00133         
00134        try {
00135                 
00136                 BodyMap knownBodies = LocalBodyStore.getInstance().getLocalBodies();
00137 
00138             if (knownBodies.size() < 1) {
00139                 return;
00140             }
00141 
00142             java.util.Iterator bodiesIterator = knownBodies.bodiesIterator();
00143 
00145             int minLength = Integer.MAX_VALUE;
00146             Body minBody = null;
00147             while (bodiesIterator.hasNext()) {
00148                 Body activeObjectBody = (Body) bodiesIterator.next();
00149                 Object testObject = activeObjectBody.getReifiedObject();
00150 
00151                 /********** Only some Active Objects can migrate *************/
00152                 boolean testSerialization = !(testObject instanceof ProActiveInternalObject) && !(testObject instanceof NotLoadBalanceableObject);
00153 
00154                 if (activeObjectBody.isAlive()) {
00155                     if (activeObjectBody.isActive() && testSerialization) {
00156                         int aoQueueLenght = activeObjectBody.getRequestQueue()
00157                                                             .size();
00158                         if (aoQueueLenght < minLength) {
00159                             minLength = aoQueueLenght;
00160                             minBody = activeObjectBody;
00161                         }
00162                     }
00163                 }
00164             }
00165 
00166             /***********  we have the Active Object with shortest queue, so we send the migration call ********/
00167             if ((minBody != null) && minBody.isActive()) {
00168                 /*logger.info("[Loadbalancer] Migrating ("+minBody.getReifiedObject().getClass().getName()+") from " +
00169                     myNode.getNodeInformation().getURL() + " to " +
00170                     destNode.getNodeInformation().getURL());
00171                     */
00172                 ProActive.migrateTo(minBody, destNode, false);  
00173                 informationRecover.register(this.getName(),this.metric.getLoad(), destNode.getNodeInformation().getURL(),minBody.getReifiedObject().getClass().getName());
00174             }
00175         } catch (IllegalArgumentException e) {
00176                 logger.error("[LoadBalancer] "+e.getLocalizedMessage());
00177         } catch (SecurityException e) {
00178                 logger.error("[LoadBalancer] Object doesn't have migrateTo method");
00179         } catch (MigrationException e) {
00180                 logger.error("[LoadBalancer] Object can't migrate (?)");
00182                 }
00183     }
00184 
00185     /*
00195     public String getName(){
00196         return balancerName;
00197     }
00198     
00199     public void init(ArrayList<LoadBalancer> loadBalancers, InformationRecover ir){
00200         try {
00201                         this.myNode = ProActive.getNode();
00202                         this.informationRecover = ir;
00203                 } catch (NodeException e) {
00204                         e.printStackTrace();
00205                 }
00206                 this.loadBalancers = loadBalancers;
00207         this.myThis = (LoadBalancer) ProActive.getStubOnThis();
00208         this.balancerName = myNode.getNodeInformation().getHostName();
00209         
00210         // by now we use only Linux
00211             lm = new LoadMonitor(myThis,metric);
00212             new Thread(lm).start();
00213     } 
00214     
00215     
00216     private void internalAction(int action) {
00217         int size = loadBalancers.size();
00218         if (size < 1) return;
00219         
00220         int first = randomizer.nextInt(size);
00221         for (int i = 0; i < LoadBalancingConstants.SUBSET_SIZE  && size > 0; i++) {
00222                 LoadBalancer remoteLb = loadBalancers.get((first+i)%size);
00223                 try {
00224                         switch (action) {
00225                                 case STEAL:
00226                                         remoteLb.sendActiveObjectsTo(myNode,this.metric.getRanking());
00227                                         break;
00228 
00229                                 case BALANCE:
00230                                         remoteLb.getActiveObjectsFrom(myThis,this.metric.getRanking());
00231                                         break;
00232                                 }
00233                         } catch (NonFunctionalException e) {
00234                                 loadBalancers.remove((first+i)%size);
00235                         size--;
00236                         }
00237         }
00238     }
00239     
00240     public void notifyLoadBalancers(){
00241         LoadBalancer lb;
00242         LoadBalancer myThis = (LoadBalancer)ProActive.getStubOnThis();
00243         Iterator<LoadBalancer> it = loadBalancers.iterator();
00244                 while (it.hasNext()) {
00245                         lb = it.next();
00246                         if(!lb.equals(this)){
00247                                 lb.addNewBalancer(myThis);
00248                         }
00249                         lb = null;
00250 
00251                 }
00252     }
00253     
00254     public void addNewBalancer(LoadBalancer lb){
00255         loadBalancers.add(lb);
00256     }
00257 
00258 }

Generated on Mon Jan 22 15:16:11 2007 for ProActive by  doxygen 1.5.1