00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package org.objectweb.proactive.p2p.service;
00032
00033 import java.io.BufferedReader;
00034 import java.io.FileNotFoundException;
00035 import java.io.FileReader;
00036 import java.io.IOException;
00037 import java.net.UnknownHostException;
00038 import java.rmi.AlreadyBoundException;
00039 import java.util.Vector;
00040
00041 import org.apache.log4j.Logger;
00042 import org.objectweb.proactive.ProActive;
00043 import org.objectweb.proactive.core.ProActiveException;
00044 import org.objectweb.proactive.core.config.ProActiveConfiguration;
00045 import org.objectweb.proactive.core.runtime.ProActiveRuntime;
00046 import org.objectweb.proactive.core.runtime.RuntimeFactory;
00047 import org.objectweb.proactive.core.util.UrlBuilder;
00048 import org.objectweb.proactive.core.util.log.Loggers;
00049 import org.objectweb.proactive.core.util.log.ProActiveLogger;
00050 import org.objectweb.proactive.p2p.service.util.P2PConstants;
00051
00052
00058 public class StartP2PService implements P2PConstants {
00059
00060 private static final Logger logger = ProActiveLogger.getLogger(Loggers.P2P_STARTSERVICE);
00061
00062 static {
00063 ProActiveConfiguration.load();
00064 }
00065
00066
00067
00068
00069 private static final String USAGE = "java " +
00070 StartP2PService.class.getName() + " [-acq acquisitionMethod]" +
00071 " [-port portNumber]" + " [-s Peer ...] [-f PeerListFile]\n" +
00072 "More options:\n" + " -noa NOA is in number of peers\n" +
00073 " -ttu TTU is in minutes\n" + " -ttl TTL is in hop\n" +
00074 " -capacity List capacity of message sequence number\n" +
00075 " -exploring Percentage of agree response to determine response to exploring messag\n" +
00076 " -booking Expiration time for booking without use a shared node\n" +
00077 " -node_acq Timeout for node acquisition\n" +
00078 " -lookup Lookup frequency for nodes\n" +
00079 " -no_multi_proc_nodes to share only a node. Otherwise, 1 node by CPU\n" +
00080 " -xml_path Deployment descriptor path\n" +
00081 " -no_sharing Start the service with none shared nodes";
00082
00083
00084
00085
00086 private Vector peers = new Vector();
00087 private P2PService p2pService = null;
00088
00089
00090
00091
00092 private StartP2PService(Args parsed) {
00093
00094 this.peers.addAll(parsed.peers);
00095
00096
00097 if (parsed.peerListFile != null) {
00098 this.peerFileParser(parsed.peerListFile);
00099 }
00100 }
00101
00111 public StartP2PService() {
00112
00113 }
00114
00126 public StartP2PService(String peerListFile) {
00127 this.peerFileParser(peerListFile);
00128 }
00129
00141 public StartP2PService(Vector peers) {
00142 if (peers != null) {
00143 this.peers.addAll(peers);
00144 }
00145 }
00146
00151 private static void usage(String msg) {
00152 if (msg != null) {
00153 System.err.println(msg);
00154 }
00155
00156 System.err.println("Usage:\n" + USAGE);
00157 System.exit(1);
00158 }
00159
00165 private static Args parseArgs(String[] args) {
00166 Args parsed = new Args();
00167 int index = 0;
00168
00169 while (index < args.length) {
00170 String argname = args[index];
00171
00172 if ("-acq".equalsIgnoreCase(argname)) {
00173 index++;
00174
00175 if (index > args.length) {
00176 usage(argname);
00177 }
00178
00179 parsed.acquisitionMethod = args[index];
00180 index++;
00181
00182 continue;
00183 } else if ("-port".equalsIgnoreCase(argname)) {
00184 index++;
00185
00186 if (index > args.length) {
00187 usage(argname);
00188 }
00189
00190 parsed.portNumber = args[index];
00191 index++;
00192
00193 continue;
00194 } else if ("-s".equalsIgnoreCase(argname)) {
00195 index++;
00196
00197 if (index > args.length) {
00198 usage(argname);
00199 }
00200
00201 parsed.peers.add(args[index]);
00202 index++;
00203
00204 while ((index < args.length) && !args[index].startsWith("-")) {
00205 parsed.peers.add(args[index]);
00206 index++;
00207 }
00208
00209 continue;
00210 } else if ("-f".equalsIgnoreCase(argname)) {
00211 index++;
00212
00213 if (index > args.length) {
00214 usage(argname);
00215 }
00216
00217 parsed.peerListFile = args[index];
00218 index++;
00219
00220 continue;
00221 } else if ("-noa".equalsIgnoreCase(argname)) {
00222 index++;
00223
00224 if (index > args.length) {
00225 usage(argname);
00226 }
00227
00228 parsed.noa = args[index];
00229 index++;
00230
00231 continue;
00232 } else if ("-ttu".equalsIgnoreCase(argname)) {
00233 index++;
00234
00235 if (index > args.length) {
00236 usage(argname);
00237 }
00238
00239 parsed.ttu = args[index];
00240 index++;
00241
00242 continue;
00243 } else if ("-ttl".equalsIgnoreCase(argname)) {
00244 index++;
00245
00246 if (index > args.length) {
00247 usage(argname);
00248 }
00249
00250 parsed.ttl = args[index];
00251 index++;
00252
00253 continue;
00254 } else if ("-capacity".equalsIgnoreCase(argname)) {
00255 index++;
00256
00257 if (index > args.length) {
00258 usage(argname);
00259 }
00260
00261 parsed.msg_capacity = args[index];
00262 index++;
00263
00264 continue;
00265 } else if ("-exploring".equalsIgnoreCase(argname)) {
00266 index++;
00267
00268 if (index > args.length) {
00269 usage(argname);
00270 }
00271
00272 parsed.expl_msg = args[index];
00273 index++;
00274
00275 continue;
00276 } else if ("-node_acq".equalsIgnoreCase(argname)) {
00277 index++;
00278
00279 if (index > args.length) {
00280 usage(argname);
00281 }
00282
00283 parsed.nodes_acq_to = args[index];
00284 index++;
00285
00286 continue;
00287 } else if ("-lookup".equalsIgnoreCase(argname)) {
00288 index++;
00289
00290 if (index > args.length) {
00291 usage(argname);
00292 }
00293
00294 parsed.lookup_freq = args[index];
00295 index++;
00296
00297 continue;
00298 } else if ("-xml_path".equalsIgnoreCase(argname)) {
00299 index++;
00300
00301 if (index > args.length) {
00302 usage(argname);
00303 }
00304
00305 parsed.xml_path = args[index];
00306 index++;
00307
00308 continue;
00309 } else if ("-no_multi_proc_nodes".equalsIgnoreCase(argname)) {
00310 parsed.multi_proc_nodes = "false";
00311 index++;
00312
00313 continue;
00314 } else if ("-no_sharing".equalsIgnoreCase(argname)) {
00315 parsed.no_sharing = "true";
00316 index++;
00317
00318 continue;
00319 }
00320
00321 usage("Unknow argumnent " + argname);
00322 }
00323
00324 return parsed;
00325 }
00326
00332 private static void initP2PProperties(Args parsed) {
00333 System.setProperty(PROPERTY_ACQUISITION, parsed.acquisitionMethod);
00334 System.setProperty(PROPERTY_PORT, parsed.portNumber);
00335 System.setProperty(PROPERTY_NOA, parsed.noa);
00336 System.setProperty(PROPERTY_TTU, parsed.ttu);
00337 System.setProperty(PROPERTY_TTL, parsed.ttl);
00338 System.setProperty(PROPERTY_MSG_MEMORY, parsed.msg_capacity);
00339 System.setProperty(PROPERTY_EXPLORING_MSG, parsed.expl_msg);
00340 System.setProperty(PROPERTY_NODES_ACQUISITION_T0, parsed.nodes_acq_to);
00341 System.setProperty(PROPERTY_LOOKUP_FREQ, parsed.lookup_freq);
00342 System.setProperty(PROPERTY_MULTI_PROC_NODES, parsed.multi_proc_nodes);
00343
00344 if (parsed.xml_path != null) {
00345 System.setProperty(PROPERPY_XML_PATH, parsed.xml_path);
00346 }
00347
00348 System.setProperty(PROPERTY_NO_SHARING,
00349 (parsed.no_sharing == null) ? "flase" : parsed.no_sharing);
00350 }
00351
00352
00353
00354
00355 public static void main(String[] args) {
00356
00357 Args parsed = parseArgs(args);
00358
00359
00360 initP2PProperties(parsed);
00361
00362
00363 try {
00364 logger.info("**** Starting jvm on " +
00365 UrlBuilder.getHostNameorIP(java.net.InetAddress.getLocalHost()));
00366 } catch (UnknownHostException e) {
00367 logger.warn("Couldn't get local host name", e);
00368 }
00369
00370
00371 if (logger.isDebugEnabled()) {
00372 logger.debug("**** Starting jvm with classpath " +
00373 System.getProperty("java.class.path"));
00374 logger.debug("**** with bootclasspath " +
00375 System.getProperty("sun.boot.class.path"));
00376 logger.debug("Acquisition method: " +
00377 System.getProperty("proactive.p2p.acq"));
00378 logger.debug("Port number: " +
00379 System.getProperty("proactive.p2p.port"));
00380
00381 if (parsed.peerListFile != null) {
00382 logger.debug("Peer list file: " + parsed.peerListFile);
00383 }
00384
00385 if (parsed.peers.size() != 0) {
00386 String servers = "";
00387
00388 for (int i = 0; i < parsed.peers.size(); i++) {
00389 servers += (" " + parsed.peers.get(i));
00390 }
00391
00392 logger.debug("Peers:" + servers);
00393 }
00394 }
00395
00396 StartP2PService service = new StartP2PService(parsed);
00397
00398 try {
00399
00400 service.start();
00401 } catch (ProActiveException e) {
00402 logger.fatal("Failed to active the P2P service", e);
00403 }
00404 }
00405
00406
00407
00408
00409 private void peerFileParser(String fileURL) {
00410 try {
00411 FileReader serverList = new FileReader(fileURL);
00412 BufferedReader in = new BufferedReader(serverList);
00413 String line;
00414
00415 while ((line = in.readLine()) != null) {
00416 this.peers.add(line);
00417 }
00418
00419 in.close();
00420 } catch (FileNotFoundException e) {
00421 logger.warn("Couldn't open peer list file", e);
00422 } catch (IOException e) {
00423 logger.warn("Problem appear during treating peer list file", e);
00424 }
00425 }
00426
00430 public static Vector checkingPeersUrl(Vector peerList) {
00431 int nbUrls = peerList.size();
00432 Vector newPeerList = new Vector(nbUrls);
00433
00434 for (int i = 0; i < nbUrls; i++) {
00435 String url = (String) peerList.get(i);
00436
00437 if (url.indexOf("//") < 0) {
00438 url = System.getProperty(P2PConstants.PROPERTY_ACQUISITION) +
00439 "://" + url;
00440 }
00441
00442 if (!url.matches(".*:[0-9]+.*")) {
00443 url += (":" + System.getProperty(P2PConstants.PROPERTY_PORT));
00444 }
00445
00446 newPeerList.add(url);
00447 }
00448
00449 return newPeerList;
00450 }
00451
00457 public void start() throws ProActiveException {
00458
00459 this.peers = StartP2PService.checkingPeersUrl(this.peers);
00460
00461
00462 String acquisitionMethod = System.getProperty(P2PConstants.PROPERTY_ACQUISITION);
00463 String portNumber = System.getProperty(P2PConstants.PROPERTY_PORT);
00464
00465
00466 String bckPortValue = null;
00467
00468 if (!acquisitionMethod.equals("ibis")) {
00469 bckPortValue = System.getProperty("proactive." + acquisitionMethod +
00470 ".port");
00471 System.setProperty("proactive." + acquisitionMethod + ".port",
00472 portNumber);
00473 } else {
00474 bckPortValue = System.getProperty("proactive.rmi.port");
00475 System.setProperty("proactive.rmi.port", portNumber);
00476 }
00477
00478
00479 ProActiveRuntime paRuntime = RuntimeFactory.getProtocolSpecificRuntime(acquisitionMethod +
00480 ":");
00481
00482
00483 if (bckPortValue != null) {
00484 if (!acquisitionMethod.equals("ibis")) {
00485 System.setProperty("proactive." + acquisitionMethod + ".port",
00486 bckPortValue);
00487 } else {
00488 System.setProperty("proactive.rmi.port", bckPortValue);
00489 }
00490 }
00491
00492
00493 String url = null;
00494
00495 try {
00496 url = paRuntime.createLocalNode(P2PConstants.P2P_NODE_NAME, false,
00497 null, paRuntime.getVMInformation().getName(),
00498 paRuntime.getJobID());
00499 } catch (AlreadyBoundException e) {
00500 logger.warn("This name " + P2PConstants.P2P_NODE_NAME +
00501 " is already bound in the registry", e);
00502 }
00503
00504
00505 this.p2pService = (P2PService) ProActive.newActive(P2PService.class.getName(),
00506 null, url);
00507
00508 try {
00509 ProActive.enableAC(this.p2pService);
00510 } catch (IOException e) {
00511 if (logger.isDebugEnabled()) {
00512 logger.debug("Couldn't enable AC for the P2P service", e);
00513 }
00514 }
00515
00516 logger.info(
00517 "///////////////// STARTING P2P SERVICE //////////////////");
00518
00519
00520 if (!this.peers.isEmpty()) {
00521 this.p2pService.firstContact(this.peers);
00522 }
00523 }
00524
00525
00526
00527
00528 public P2PService getP2PService() {
00529 return this.p2pService;
00530 }
00531
00532 private static class Args {
00533 private String acquisitionMethod = System.getProperty(PROPERTY_ACQUISITION);
00534 private String portNumber = System.getProperty(PROPERTY_PORT);
00535 private String noa = System.getProperty(PROPERTY_NOA);
00536 private String ttu = System.getProperty(PROPERTY_TTU);
00537 private String ttl = System.getProperty(PROPERTY_TTL);
00538 private String msg_capacity = System.getProperty(PROPERTY_MSG_MEMORY);
00539 private String expl_msg = System.getProperty(PROPERTY_EXPLORING_MSG);
00540 private String nodes_acq_to = System.getProperty(PROPERTY_NODES_ACQUISITION_T0);
00541 private String lookup_freq = System.getProperty(PROPERTY_LOOKUP_FREQ);
00542 private String multi_proc_nodes = System.getProperty(PROPERTY_MULTI_PROC_NODES);
00543 private String xml_path = System.getProperty(PROPERPY_XML_PATH);
00544 private String peerListFile = null;
00545 private Vector peers = new Vector();
00546 private String no_sharing = System.getProperty(PROPERTY_NO_SHARING);
00547 }
00548 }