001/* jpvmDaemon.java 002 * 003 * The jpvm Daemon is a special jpvm task that runs on every 004 * host in a jpvm parallel virtual machine. The jpvm Daemon 005 * is responsible for servicing requests to create new jpvm 006 * tasks. Unlike standard PVM, the jpvm Daemon is never 007 * responsible for routing message communication. 008 * 009 * Adam J Ferrari 010 * Sun 05-26-1996 011 * 012 * Copyright (C) 1996 Adam J Ferrari 013 * 014 * This library is free software; you can redistribute it and/or 015 * modify it under the terms of the GNU Library General Public 016 * License as published by the Free Software Foundation; either 017 * version 2 of the License, or (at your option) any later version. 018 * 019 * This library is distributed in the hope that it will be useful, 020 * but WITHOUT ANY WARRANTY; without even the implied warranty of 021 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 022 * Library General Public License for more details. 023 * 024 * You should have received a copy of the GNU Library General Public 025 * License along with this library; if not, write to the 026 * Free Software Foundation, Inc., 675 Mass Ave, Cambridge, 027 * MA 02139, USA. 028 */ 029 030package jpvm; 031import jpvm.jpvmEnvironment; 032import jpvm.jpvmDaemonMessageTag; 033import java.util.*; 034import java.io.*; 035 036class jpvmTaskListRecord { 037 public jpvmTaskId tid; 038 public String name; 039 public Process pro; //process recorded 040 public jpvmTaskListRecord next; 041 042 public jpvmTaskListRecord(jpvmTaskId t, String n, Process p) { 043 tid = t; 044 name = n; 045 next = null; 046 pro=p; 047 } 048 049 public jpvmTaskListRecord(jpvmTaskId t, String n) { 050 tid = t; 051 name = n; 052 next = null; 053 pro=null; 054 } 055}; 056 057class jpvmTaskList { 058 jpvmTaskListRecord tasks; 059 int num_tasks; 060 jpvmTaskListRecord iter; 061 062 public jpvmTaskList() { 063 tasks = null; 064 num_tasks = 0; 065 iter = null; 066 } 067 068 public int numTasks() { 069 return num_tasks; 070 } 071 072 public void addTask(jpvmTaskId tid, String name) { 073 if(find(tid)!=null) { 074 // Already know about this task... 075 return; 076 } 077 jpvmTaskListRecord nw = new jpvmTaskListRecord(tid,name); 078 nw.next = tasks; 079 tasks = nw; 080 num_tasks++; 081 } 082 083 public void addTask(jpvmTaskId tid, String name, Process p) { 084 if(find(tid)!=null) { 085 // Already know about this task... 086 return; 087 } 088 jpvmTaskListRecord nw = new jpvmTaskListRecord(tid,name,p); 089 nw.next = tasks; 090 tasks = nw; 091 num_tasks++; 092 } 093 094 public void deleteTask(jpvmTaskId tid) { 095 if(tasks == null) return; 096 jpvmTaskListRecord tmp = tasks; 097 098 // Check head 099 if(tmp.tid.equals(tid)) { 100 if (iter == tmp) iter=tmp.next; 101 tasks = tasks.next; 102 num_tasks --; 103 return; 104 } 105 // Check body 106 while(tmp.next != null) { 107 if(tmp.next.tid.equals(tid)) { 108 if(iter==tmp.next) iter=tmp.next.next; 109 tmp.next = tmp.next.next; 110 num_tasks --; 111 return; 112 } 113 tmp = tmp.next; 114 } 115 } 116 117 public void deleteAllTask() { 118 jpvmTaskListRecord tmp = firstIter(); 119 if(tmp!=null) 120 { 121 while(tmp != null) { 122 if(tmp.pro!=null) 123 { 124 if(!tmp.name.equals("(command line jpvm task)")) 125 { 126// System.out.println(tmp.name.equals("(command line jpvm task)")); 127// System.out.println(tmp.name+" tid:"+tmp.tid.toString()); 128 tmp.pro.destroy(); //destroy zombies 129 } 130 } 131 tmp = nextIter(); 132 } 133 } 134 tasks=null; 135 num_tasks=0; 136 } 137 138 139 public jpvmTaskListRecord find(jpvmTaskId tid) { 140 jpvmTaskListRecord tmp = tasks; 141 while(tmp != null) { 142 if(tmp.tid.equals(tid)) 143 return tmp; 144 tmp = tmp.next; 145 } 146 return tmp; 147 } 148 149 public jpvmTaskListRecord find(String name) { 150 jpvmTaskListRecord tmp = tasks; 151 while(tmp != null) { 152 if(tmp.name.equals(name)) 153 return tmp; 154 tmp = tmp.next; 155 } 156 return tmp; 157 } 158 159 public jpvmTaskListRecord firstIter() { 160 if (tasks == null) return null; 161 jpvmTaskListRecord ret = tasks; 162 iter = tasks.next; 163 return ret; 164 } 165 166 public jpvmTaskListRecord nextIter() { 167 if (iter == null) return null; 168 jpvmTaskListRecord ret = iter; 169 iter = iter.next; 170 return ret; 171 } 172}; 173 174class jpvmCreateWorkOrder { 175 public int order; 176 public jpvmTaskId client; 177 public boolean outstanding; 178 179 jpvmCreateWorkOrder() { 180 order = 0; 181 client = null; 182 outstanding = false; 183 } 184}; 185 186class jpvmSpawnWorkOrder { 187 public int order; // Which partially completed spawn 188 public jpvmTaskId tids[]; // Tids spawned 189 public int num; // Number to spawn 190 public int numDone;// Number actually done 191 public jpvmTaskId client; // Who placed the order? 192 jpvmSpawnWorkOrder next; // Linked list 193 194 jpvmSpawnWorkOrder(int o, int n) { 195 order = o; //yi changed it 196 //order = 0; 197 num = n; 198 tids = new jpvmTaskId[n]; 199 numDone = 0; 200 client = null; 201 next = null; 202 } 203}; 204 205class jpvmSpawnWorkOrderList { 206 private jpvmSpawnWorkOrder list = null; 207 private int nextOrder = 1; 208 209 jpvmSpawnWorkOrderList() { 210 list = null; 211 nextOrder = 1; 212 } 213 214 public jpvmSpawnWorkOrder newOrder(int num) { 215 jpvmSpawnWorkOrder ret; 216 217 ret = new jpvmSpawnWorkOrder(nextOrder, num); 218 nextOrder ++; 219 ret.next = list; 220 list = ret; 221 return ret; 222 } 223 224 public jpvmSpawnWorkOrder lookup(int order) { 225 jpvmSpawnWorkOrder ret; 226 227 ret = list; 228 229 while (ret != null) { 230 if(ret.order == order) 231 return ret; 232 ret = ret.next; 233 } 234 return null; 235 } 236 237 public void doneOrder(jpvmSpawnWorkOrder order) { 238 jpvmSpawnWorkOrder tmp; 239 if(list == null || order == null) 240 return; 241 if(order==list) { 242 list = list.next; 243 return; 244 } 245 tmp = list; 246 while(tmp.next != null) { 247 if(tmp.next == order) { 248 tmp.next = order.next; 249 return; 250 } 251 tmp = tmp.next; 252 } 253 } 254 255 public void doneOrder(int order) { 256 jpvmSpawnWorkOrder tmp; 257 if(list == null) 258 return; 259 if(order==list.order) { 260 list = list.next; 261 return; 262 } 263 tmp = list; 264 while(tmp.next != null) { 265 if(tmp.next.order == order) { 266 tmp.next = tmp.next.next; 267 return; 268 } 269 tmp = tmp.next; 270 } 271 } 272}; 273 274class jpvmDaemonWaiter extends Thread { 275 private Process process; 276 jpvmDaemonWaiter(Process p) { 277 process = p; 278 } 279 public void run() { 280 boolean wait = true; 281 while(wait) { 282 try { 283 process.waitFor(); 284 wait = false; 285 } 286 catch (InterruptedException ie) { 287 } 288 } 289 } 290}; 291 292class jpvmOutputListen extends Thread { 293 private InputStream in; 294 295 jpvmOutputListen(InputStream in) 296 { 297 this.in = in; 298 } 299 300 public void run() { 301 try { 302 System.out.println("listening output starts"); 303 BufferedReader input = new BufferedReader(new InputStreamReader(in)); 304 String a= input.readLine(); 305 while (a!=null) 306 { 307 // Print file line to screen 308 System.out.println(a); 309 a=input.readLine(); 310 } 311 input.close(); 312 System.out.println("listening over"); 313 } 314 catch(Exception ex) { 315 ex.printStackTrace(); 316 } 317 } 318}; 319 320class jpvmExecTaskThread extends Thread { 321 private jpvmEnvironment jpvm = null; 322 private Process process; 323 private jpvmTaskId client; 324 private String args[]; 325 private boolean log_on = true; 326 private boolean debug_on = false; 327 private jpvmCreateWorkOrder order; 328 329 jpvmExecTaskThread(jpvmEnvironment j, jpvmTaskId c, String a[], 330 jpvmCreateWorkOrder o) { 331 jpvm = j; 332 client = c; 333 args = a; 334 order = o; 335 } 336 337 private boolean doExec() { 338 try { 339 process = Runtime.getRuntime().exec(args); 340 //if(process ==null) throw new RuntimeException("process null"); 341 342 jpvmDaemon.int2P[jpvmDaemon.pointer]=process; 343// System.out.println(process+" id:"+jpvmDaemon.pointer+" pro:"+jpvmDaemon.int2P[jpvmDaemon.pointer]); 344 jpvmDaemon.pointer++; 345 if(jpvmDaemon.pointer==jpvmDaemon.maxHosts)jpvmDaemon.pointer=0; 346 jpvmOutputListen output = new jpvmOutputListen(process.getInputStream()); 347 output.start(); 348 } 349 catch (IOException ioe) { 350 ioe.printStackTrace(); 351 perror("i/o exception on exec"); 352 order.outstanding = false; 353 jpvmBuffer buf = new jpvmBuffer(); 354 buf.pack(order.order); 355 buf.pack(-1); 356 try { 357 jpvm.pvm_send(buf,client, 358 jpvmDaemonMessageTag.jpvmdCreatedTask); 359 } 360 catch (jpvmException jpe) { 361 perror("CreateTask, \""+jpe+" sending " 362 + "to client " + client.toString()); 363 } 364 return false; 365 } 366 catch (Exception ex) 367 { 368 ex.printStackTrace(); 369 } 370 return true; 371 } 372 373 public void run() { 374 boolean wait = doExec(); 375 while(wait) { 376 try { 377 process.waitFor(); 378 wait = false; 379 } 380 catch (InterruptedException ie) { 381 } 382 } 383 } 384 385 private void log(String message) { 386 if(log_on) { 387 System.out.println("jpvm daemon: "+message); 388 System.out.flush(); 389 } 390 } 391 392 private void perror(String message) { 393 System.err.println("jpvm daemon: "+message); 394 System.err.flush(); 395 } 396}; 397 398public class jpvmDaemon { 399 private static jpvmEnvironment jpvm = null; 400 private static jpvmTaskId myTid = null; 401 402 private static jpvmTaskList tasks = null; 403 private static jpvmTaskList hosts = null; 404 private static jpvmSpawnWorkOrderList spawnOrders = null; 405 private static int maxCreateOrders = 256; 406 private static jpvmCreateWorkOrder createOrders[]; 407 private static int nextCreateOrder = 0; 408 409 private static boolean log_on = true; 410 private static boolean debug_on = false; 411 private static String my_host_name = null; 412 413 private static jpvmTaskListRecord curr = null; 414 415 public static Process[] int2P; 416 public static int maxHosts=256; 417 public static int pointer=0; 418 419 // Which version of the JVM should be used to host tasks? 420// private static String java_exec="java -agentlib:hprof=cpu=samples,depth=10 "; 421 private static String java_exec="java"; 422 // private static String java_exec="kaffe"; 423 private static jpvmBarrierServer jbs; 424 425 public static void main(String args[]) { 426 try { 427 int2P = new Process[maxHosts]; 428 int i; 429 // Initialize data structures 430 if (args.length != 1) { 431 jpvm = new jpvmEnvironment(true); 432 } 433 else 434 { 435 jpvm = new jpvmEnvironment(true,Integer.valueOf(args[0])); 436 } 437 myTid = jpvm.pvm_mytid(); 438 tasks = new jpvmTaskList(); 439 hosts = new jpvmTaskList(); 440 spawnOrders = new jpvmSpawnWorkOrderList(); 441 createOrders = new jpvmCreateWorkOrder[maxCreateOrders]; 442 443 //creat barrier server; 444 jbs = new jpvmBarrierServer(11111); 445 jbs.start(); 446 447 448 // Announce location 449 log(jpvm.pvm_mytid().toString()); 450 my_host_name = jpvm.pvm_mytid().getHost(); 451 hosts.addTask(jpvm.pvm_mytid(), my_host_name); 452 453 writeDaemonFile(); 454 455 // Main server loop... 456 while(true) { 457 jpvmMessage req = jpvm.pvm_recv(); 458 jpvmTaskId client = req.sourceTid; 459 int request = req.messageTag; 460 String reqName; 461 462 switch (request) { 463 case (jpvmDaemonMessageTag.jpvmdPingRequest) : 464 reqName = "Ping"; 465 break; 466 case (jpvmDaemonMessageTag.jpvmdRegisterTask) : 467 reqName = "RegisterTask"; 468 break; 469 case (jpvmDaemonMessageTag.jpvmdRegisterChild) : 470 reqName = "RegisterChild"; 471 break; 472 case jpvmDaemonMessageTag.jpvmdSpawnTask: 473 reqName = "SpawnTask"; 474 break; 475 case jpvmDaemonMessageTag.jpvmdCreateTask: 476 reqName = "CreateTask"; 477 break; 478 case jpvmDaemonMessageTag.jpvmdCreatedTask: 479 reqName = "CreateTaskReturn"; 480 break; 481 case jpvmDaemonMessageTag.jpvmdDeleteTask: 482 reqName = "DeleteTask"; 483 break; 484 case jpvmDaemonMessageTag.jpvmdTaskStatus: 485 reqName = "TaskStatus"; 486 break; 487 case jpvmDaemonMessageTag.jpvmdAddHost: 488 reqName = "AddHost"; 489 break; 490 case jpvmDaemonMessageTag.jpvmdAddHostBcast: 491 reqName = "AddHostNotify"; 492 break; 493 case jpvmDaemonMessageTag.jpvmdDeleteHost: 494 reqName = "DeleteHost"; 495 break; 496 case jpvmDaemonMessageTag.jpvmdDeleteHostBcast: 497 reqName = "DeleteHostNotify"; 498 break; 499 case jpvmDaemonMessageTag.jpvmdHostStatus: 500 reqName = "HostStatus"; 501 break; 502 case jpvmDaemonMessageTag.jpvmdHostHalt: 503 reqName = "HostHalt"; 504 break; 505 case jpvmDaemonMessageTag.jpvmdHalt: 506 reqName = "Halt"; 507 break; 508 case jpvmDaemonMessageTag.jpvmdHostDeleteAll: //delete all task records 509 reqName = "HostDeleteAll"; 510 break; 511 case jpvmDaemonMessageTag.jpvmdDeleteAll: //delete task records 512 reqName = "DeleteAll"; 513 break; 514 default: 515 reqName = "Unknown Request"; 516 } 517 518 if(debug_on) { 519 log("new request type="+request+ 520 ",\""+reqName+"\", from "+ 521 client.toString()); 522 } 523 524 switch (request) { 525 case (jpvmDaemonMessageTag.jpvmdPingRequest) : 526 Ping(client,req.buffer); 527 break; 528 case (jpvmDaemonMessageTag.jpvmdRegisterTask) : 529 RegisterTask(client,req.buffer); 530 break; 531 case (jpvmDaemonMessageTag.jpvmdRegisterChild) : 532 RegisterChild(client,req.buffer); 533 break; 534 case jpvmDaemonMessageTag.jpvmdSpawnTask: 535 SpawnTask(client,req.buffer); 536 break; 537 case jpvmDaemonMessageTag.jpvmdCreateTask: 538 CreateTask(client,req.buffer); 539 break; 540 case jpvmDaemonMessageTag.jpvmdCreatedTask: 541 CreatedTask(client,req.buffer); 542 break; 543 case jpvmDaemonMessageTag.jpvmdDeleteTask: 544 DeleteTask(client,req.buffer); 545 break; 546 case jpvmDaemonMessageTag.jpvmdTaskStatus: 547 TaskStatus(client,req.buffer); 548 break; 549 case jpvmDaemonMessageTag.jpvmdAddHost: 550 AddHost(client,req.buffer); 551 break; 552 case jpvmDaemonMessageTag.jpvmdDelHost: 553 DeleteHost(client,req.buffer); 554 break; 555 case jpvmDaemonMessageTag.jpvmdAddHostBcast: 556 AddHostBcast(client,req.buffer); 557 break; 558 case jpvmDaemonMessageTag.jpvmdDelHostBcast: 559 DelHostBcast(client,req.buffer); 560 break; 561 case jpvmDaemonMessageTag.jpvmdDeleteHost: 562 DeleteHost(client,req.buffer); 563 break; 564 case jpvmDaemonMessageTag.jpvmdDeleteHostBcast: 565 DeleteHostBcast(client,req.buffer); 566 break; 567 case jpvmDaemonMessageTag.jpvmdHostStatus: 568 HostStatus(client,req.buffer); 569 break; 570 case jpvmDaemonMessageTag.jpvmdHostHalt: 571 HostHalt(client,req.buffer); 572 break; 573 case jpvmDaemonMessageTag.jpvmdHalt: 574 Halt(client,req.buffer); 575 break; 576 case jpvmDaemonMessageTag.jpvmdHostDeleteAll: 577 HostDeleteAll(client,req.buffer); 578 break; 579 case jpvmDaemonMessageTag.jpvmdDeleteAll: 580 DeleteAll(client,req.buffer); 581 break; 582 default: 583 perror("unknown request type"); 584 } 585 586 } 587 } 588 catch (jpvmException jpe) { 589 jpvmDebug.note("jpvmDaemon, internal jpvm error."); 590 } 591 } 592 593 private static void daemonBcast(jpvmBuffer buf, int tag) { 594 jpvmTaskListRecord tmp = hosts.firstIter(); 595 while(tmp != null) { 596 try { 597// System.out.println("send to"+tmp.tid.toString()); 598 jpvm.pvm_send(buf,tmp.tid,tag); 599 tmp = hosts.nextIter(); 600 } 601 catch (jpvmException jpe) { 602 perror("problem sending to daemon "+tmp.tid.toString()); 603 hosts.deleteTask(tmp.tid); 604 } 605 } 606 } 607 608 private static void RegisterTask(jpvmTaskId client, jpvmBuffer req) { 609 try { 610 String name = req.upkstr(); 611 int id = req.upkint(); 612 if(id!=-1) 613 { 614 if(int2P[id]==null)throw new RuntimeException("empty id:"+id+" name:"+name+" pointer:"+jpvmDaemon.pointer); 615 tasks.addTask(client,name, int2P[id]); 616 } 617 else 618 { 619 tasks.addTask(client,name); 620 } 621 } 622 catch (jpvmException jpe) { 623 log("bad RegisterTask invocation"); 624 } 625 } 626 627 private static void RegisterChild(jpvmTaskId client, jpvmBuffer req) { 628 int regNum = -1; 629 630 // Child process reporting in. Notify the remote client that 631 // requested this local task creation. 632 633 try { 634 regNum = req.upkint(); 635 } 636 catch (jpvmException jpe) { 637 perror("bad RegisterChild invocation"); 638 return; 639 } 640 641 if(regNum<0 || regNum>=maxCreateOrders) { 642 perror("RegisterChild, child registration number " + 643 regNum + "out of bounds"); 644 return; 645 } 646 647 jpvmCreateWorkOrder order = createOrders[regNum]; 648 if(order==null) { 649 perror("RegisterChild, child registration number " + 650 regNum + "not expected"); 651 return; 652 } 653 if(!order.outstanding) { 654 perror("RegisterChild, child registration number " + 655 regNum + "unexpected"); 656 return; 657 } 658 order.outstanding = false; 659 660 // Return the blessed event to the requester 661 try { 662 jpvmBuffer buf = new jpvmBuffer(); 663 buf.pack(1); 664 buf.pack(client); 665 buf.pack(order.order); 666 jpvm.pvm_send(buf,order.client, 667 jpvmDaemonMessageTag.jpvmdCreatedTask); 668 } 669 catch (jpvmException jpe) { 670 perror("RegisterChild, \""+jpe+"\" sending to client " + 671 order.client.toString()); 672 } 673 } 674 675 private static void SpawnTask(jpvmTaskId client, jpvmBuffer req) { 676 int num = 0; 677 String name = null; 678 jpvmBuffer buf = new jpvmBuffer(); 679 int heapSize=0; 680 try { 681 num = req.upkint(); 682 name = req.upkstr(); 683 heapSize = req.upkint(); 684 } 685 catch (jpvmException jpe) { 686 perror("bad SpawnTask invocation"); 687 return; 688 } 689 if(num==0) { //0 means no task is spawned 690 buf.pack(num); 691 try { 692 jpvm.pvm_send(buf,client, 693 jpvmDaemonMessageTag.jpvmdSpawnTask); 694 } 695 catch (jpvmException jpe) { 696 perror("SpawnTask, problem sending " 697 + "to client " + 698 client.toString()); 699 } 700 return; 701 } 702 703 // Create a work order for the spawn 704 jpvmSpawnWorkOrder order; 705 order = spawnOrders.newOrder(num); 706 order.client = client; 707 708 // Create a request to create a task on a remote host 709 jpvmBuffer creq = new jpvmBuffer(); 710 creq.pack(name); // Pack the class to create 711 creq.pack(client); // Pack parent of the created tasks 712 creq.pack(order.order); 713 creq.pack(heapSize); 714 715 // Schedule on known hosts round robin style 716// jpvmTaskListRecord target = null; 717 jpvmTaskListRecord target = curr; 718 719 for(int i=0; i<num ;i++) { 720 if(target==null) {target = hosts.firstIter(); curr= target;} 721 if(target==null) { 722 perror("no hosts in SpawnTask invocation"); 723 return; 724 } 725 if(target.tid.equals(jpvm.pvm_mytid())) { 726 creq.rewind(); 727 CreateTask(jpvm.pvm_mytid(),creq); 728 } 729 else { 730 try { 731 jpvm.pvm_send(creq,target.tid, 732 jpvmDaemonMessageTag.jpvmdCreateTask); 733 } 734 catch (jpvmException jpe) { 735 perror("SpawnTask, error scheduling " + 736 "on host " + target.tid.toString()); 737 } 738 } 739 target = hosts.nextIter(); 740 curr=target; 741 } 742 } 743 744 private static void CreatedTask(jpvmTaskId client, jpvmBuffer req) { 745 int count = -1; 746 jpvmTaskId child = null; 747 int orderNum = -1; 748 749 try { 750 count = req.upkint(); 751 if(count==1) { 752 child = req.upktid(); 753 orderNum = req.upkint(); 754 } 755 } 756 catch (jpvmException jpe) { 757 perror("CreatedTask, bad report from "+ 758 client.toString()); 759 } 760 761 // Look up which spawn order this is in regards to 762 jpvmSpawnWorkOrder order; 763 order = spawnOrders.lookup(orderNum); 764 765 if(order==null) { 766 perror("CreatedTask, order number " + orderNum + 767 " is not valid"); 768 return; 769 } 770 771 // Update the status of the order 772 order.tids[order.numDone] = child; 773 order.numDone ++; 774 775 if(order.numDone == order.num) { 776 // The order is complete - return the good 777 // news to the original client 778 jpvmBuffer buf = new jpvmBuffer(); 779 buf.pack(order.numDone); 780 buf.pack(order.tids,order.numDone,1); 781 try { 782 jpvm.pvm_send(buf,order.client, 783 jpvmDaemonMessageTag.jpvmdSpawnTask); 784 } 785 catch (jpvmException jpe) { 786 perror("CreatedTask, \""+jpe+"\" sending to client " 787 + order.client.toString()); 788 } 789 790 // Throw away the order 791 spawnOrders.doneOrder(order); 792 } 793 } 794 795 private static void CreateTask(jpvmTaskId client, jpvmBuffer req) { 796 String name = null; 797 jpvmTaskId parent = null; 798 int order; 799 int heapSize=0; 800 801 try { 802 name = req.upkstr(); 803 parent = req.upktid(); 804 order = req.upkint(); 805 heapSize = req.upkint(); 806 } 807 catch (jpvmException jpe) { 808 perror("bad CreateTask invocation"); 809 return; 810 } 811 812 if(createOrders[nextCreateOrder]==null) 813 createOrders[nextCreateOrder]=new jpvmCreateWorkOrder(); 814 815 if(createOrders[nextCreateOrder].outstanding) { 816 perror("too many outstanding task creation requests"); 817 return; 818 } 819 820 // Log the task creation request so when the task reports 821 // in we'll know it was expected 822 createOrders[nextCreateOrder].order = order; 823 createOrders[nextCreateOrder].client = client; 824 createOrders[nextCreateOrder].outstanding = true; 825 826 827 // Create a thread to execute the new task 828 String args[] = new String[10]; 829 args[0] = java_exec; 830// args[1] = "-cp /home/doyen/jpvm/simulator.jar"; 831// args[1] = "-cp simulator.jar"; 832 args[1] = "-Djpvm.daemon="+jpvm.pvm_mytid().getPort(); 833 args[2] = "-Djpvm.parhost="+parent.getHost(); 834 args[3] = "-Djpvm.parport="+parent.getPort(); 835 args[4] = "-Djpvm.taskname="+name; 836 args[5] = "-Djpvm.regnum="+nextCreateOrder; 837 args[6] = "-Djpvm.pointer="+pointer; 838 args[7] = "-Xms"+(int)((double)heapSize*0.2)+"m"; 839 args[8] = "-Xmx"+heapSize+"m"; 840 args[9] = name; 841 /* 842 String args[] = new String[10]; 843 args[0] = java_exec; 844 args[1] = "-Djpvm.daemon="+jpvm.pvm_mytid().getPort(); 845 args[2] = "-Djpvm.parhost="+parent.getHost(); 846 args[3] = "-Djpvm.parport="+parent.getPort(); 847 args[4] = "-Djpvm.taskname="+name; 848 args[5] = "-Djpvm.regnum="+nextCreateOrder; 849 args[6] = "-Xms"+(int)((double)heapSize*0.2)+"m"; 850 args[7] = "-Xmx"+heapSize+"m"; 851// args[8] = "-agentlib:hprof=cpu=samples,depth=20,file="+name+".txt"; 852// args[8] = "-agentlib:hprof=cpu=times,depth=20,file="+name+".txt"; 853 args[8] = "-javaagent:shiftone-jrat.jar"; 854// args[8] = "-javaagent:profile.jar"; 855// args[9] = "-Dprofile.properties="+name+".profile"; 856 args[9] = name; 857 */ 858 /* 859 String args[] = new String[10]; 860 args[0] = java_exec; 861 args[1] = "-Djpvm.daemon="+jpvm.pvm_mytid().getPort(); 862 args[2] = "-Djpvm.parhost="+parent.getHost(); 863 args[3] = "-Djpvm.parport="+parent.getPort(); 864 args[4] = "-Djpvm.taskname="+name; 865 args[5] = "-Djpvm.regnum="+nextCreateOrder; 866 args[6] = "-Xms"+(int)((double)heapSize*0.2)+"m"; 867 args[7] = "-Xmx"+heapSize+"m"; 868 Random rn = new Random(); 869 Integer portt = 30000+ (Math.abs(rn.nextInt()) % 1000); 870 args[8] = "-agentlib:jdwp=transport=dt_socket,address="+portt+",server=y,suspend=n"; 871 args[9] = name; 872 */ 873// System.out.println("exec( "+args[0]+" "+args[1]+" "+args[2]+" "+ args[3]+" "+args[4]+" "+args[5]+" "+args[6]+" "+args[7]+" "+args[8]+" "+args[9]+" )"); 874 if(debug_on) 875 log("exec( "+args[0]+" "+args[1]+" "+args[2]+" "+ 876 args[3]+" "+args[4]+" "+args[5]+" )"); 877 878 jpvmExecTaskThread spawnThread; 879 spawnThread = new jpvmExecTaskThread(jpvm, client, 880 args,createOrders[nextCreateOrder]); 881 nextCreateOrder++; 882 spawnThread.start(); 883 } 884 885 private static void DeleteTask(jpvmTaskId client, jpvmBuffer req) { 886 tasks.deleteTask(client); 887 } 888 889 private static void TaskStatus(jpvmTaskId client, jpvmBuffer req) { 890 jpvmBuffer buf = new jpvmBuffer(); 891 int n = tasks.numTasks(); 892 jpvmTaskId tids[] = new jpvmTaskId[n]; 893 buf.pack(n); 894 jpvmTaskListRecord tmp = tasks.firstIter(); 895 int i = 0; 896 while(tmp != null) { 897 tids[i] = tmp.tid; 898 buf.pack(tmp.name); 899 tmp = tasks.nextIter(); 900 i++; 901 } 902 buf.pack(tids,n,1); 903 try { 904 jpvm.pvm_send(buf,client,jpvmDaemonMessageTag.jpvmdTaskStatus); 905 } 906 catch(jpvmException jpe) { 907 perror("TaskStatus, \""+jpe+"\" sending to client " + 908 client.toString()); 909 } 910 } 911 912 private static void AddHost(jpvmTaskId client, jpvmBuffer req) { 913 jpvmBuffer buf = internalAddHosts(req); 914 if(buf==null) { 915// perror("AddHost, problem adding hosts"); 916 return; 917 } 918 daemonBcast(buf,jpvmDaemonMessageTag.jpvmdAddHostBcast); 919 } 920 921 private static void Ping(jpvmTaskId client, jpvmBuffer req) { 922 try { 923 jpvm.pvm_send(req,client, 924 jpvmDaemonMessageTag.jpvmdPingReply); 925 } 926 catch (jpvmException jpe) { 927 perror("ping, \""+jpe+"\" sending to client " + 928 client.toString()); 929 } 930 } 931 932 private static void AddHostBcast(jpvmTaskId client, jpvmBuffer req) { 933 if(client.equals(myTid)) return; 934 try { 935 int i; 936 int num = req.upkint(); 937 String names[] = new String[num]; 938 jpvmTaskId daemonTids[] = new jpvmTaskId[num]; 939 for(i=0;i<num; i++) 940 names[i] = req.upkstr(); 941 req.unpack(daemonTids,num,1); 942 for(i=0;i<num; i++) 943 hosts.addTask(daemonTids[i],names[i]); 944 } 945 catch (jpvmException jpe) { 946 log("bad AddHost invocation"); 947 } 948 } 949 950 private static void DelHostBcast(jpvmTaskId client, jpvmBuffer req) { 951 if(client.equals(myTid)) return; 952 try { 953// int i; 954// int num = req.upkint(); 955// String names[] = new String[num]; 956// jpvmTaskId daemonTids[] = new jpvmTaskId[num]; 957// for(i=0;i<num; i++) 958// names[i] = req.upkstr(); 959// req.unpack(daemonTids,num,1); 960// for(i=0;i<num; i++) 961 jpvmTaskId tid = req.upktid(); 962 hosts.deleteTask(tid); 963 } 964 catch (jpvmException jpe) { 965 log("bad AddHost invocation"); 966 } 967 } 968 969 private static jpvmBuffer internalDelHosts(jpvmBuffer req) { 970 jpvmBuffer ret = new jpvmBuffer(); 971 String hostName=null; 972 try { 973 hostName = req.upkstr(); 974 } 975 catch (jpvmException jpe) { 976 log("bad AddHost invocation"); 977 } 978 979 jpvmTaskListRecord tmp = hosts.find(hostName); 980 981 if(tmp !=null) //find the host 982 { 983 if(tmp.tid.equals(myTid))//delete all the others except myself 984 { 985 jpvmTaskListRecord tmp2 = hosts.firstIter(); 986 while(tmp2 != null) { 987 if(!tmp2.tid.equals(myTid)) 988 { 989 hosts.deleteTask(tmp2.tid); 990 } 991 tmp2 = hosts.nextIter(); 992 } 993 } 994 else 995 { 996 hosts.deleteTask(tmp.tid); //remove it from the list 997 } 998 ret.pack(tmp.tid); //pack it 999 return ret; 1000 } 1001 else 1002 { 1003 return null; 1004 } 1005 } 1006 1007 private static jpvmBuffer internalAddHosts(jpvmBuffer req) { 1008 int i,j; 1009 jpvmBuffer ret = new jpvmBuffer(); 1010 1011 int newNum = 0; 1012 String newNames[] = null; 1013 jpvmTaskId newTids[] = null; 1014 boolean newValid[] = null; 1015 int newValidNum = 0; 1016 try { 1017 // First, get the addresses of all new daemons 1018 newNum = req.upkint(); 1019 newNames = new String[newNum]; 1020 newTids = new jpvmTaskId[newNum]; 1021 newValid = new boolean[newNum]; 1022 for(i=0;i<newNum; i++) newNames[i] = req.upkstr(); 1023 req.unpack(newTids,newNum,1); 1024 } 1025 catch (jpvmException jpe) { 1026 log("bad AddHost call"); 1027 perror("internalAddHost, "+jpe); 1028 return null; 1029 } 1030 1031 // Check the validity of all new names 1032 jpvmBuffer pingBuf = new jpvmBuffer(); 1033 newValidNum = newNum; 1034 for(i=0;i<newNum; i++) { 1035 boolean valid = true; 1036 try { 1037 if(!newTids[i].equals(myTid)) 1038 { 1039 jpvm.pvm_send(pingBuf,newTids[i], jpvmDaemonMessageTag.jpvmdPingRequest); 1040 jpvmMessage pingMess = jpvm.pvm_recv(newTids[i], jpvmDaemonMessageTag.jpvmdPingReply); 1041 } 1042 else 1043 { 1044 valid = false; 1045 newValidNum--; 1046 } 1047 } 1048 catch (jpvmException jpe) { 1049 valid = false; 1050 newValidNum--; 1051 perror("internalAddHost, ping, "+jpe); 1052 } 1053 newValid[i] = valid; 1054 if(valid) 1055 hosts.addTask(newTids[i],newNames[i]); 1056 } 1057 1058 if (newValidNum<1) { 1059 // no hosts to add! 1060 ret = null; 1061 // perror("ValidNum "+newValidNum+" internalAddHost, no hosts added"); 1062 return ret; 1063 } 1064 1065 // Create the message to add all daemons, new and old 1066 int oldNum = hosts.numTasks(); 1067 int totalNum = oldNum + newValidNum; 1068 jpvmTaskId allTids[] = new jpvmTaskId[totalNum]; 1069 jpvmTaskListRecord tmp = hosts.firstIter(); 1070 1071 // Pack in the old names... 1072 ret.pack(totalNum); 1073 i = 0; 1074 while(tmp != null) { 1075 allTids[i] = tmp.tid; 1076 ret.pack(tmp.name); 1077 tmp = hosts.nextIter(); 1078 i++; 1079 } 1080 // Pack in the new names... 1081 for(j=0;j<newNum;j++) if(newValid[j]) { 1082 allTids[i] = newTids[j]; 1083 ret.pack(newNames[j]); 1084 i++; 1085 } 1086 1087 // Pack in all of the tids... 1088 ret.pack(allTids,totalNum,1); 1089 return ret; 1090 } 1091 1092 private static void DeleteHost(jpvmTaskId client, jpvmBuffer req) { 1093 jpvmBuffer ret = new jpvmBuffer(); 1094 String hostName=null; 1095 try { 1096 hostName = req.upkstr(); 1097 } 1098 catch (jpvmException jpe) { 1099 log("bad AddHost invocation"); 1100 } 1101 1102 jpvmTaskListRecord tmp = hosts.find(hostName); 1103 1104 if(tmp !=null) //find the host 1105 { 1106 ret.pack(tmp.tid); //pack it 1107 } 1108 else 1109 { 1110 perror("DelHost, their is no such host registered"); 1111 return; 1112 } 1113 1114 daemonBcast(ret,jpvmDaemonMessageTag.jpvmdDeleteHostBcast); 1115 1116 if(tmp.tid.equals(myTid))//delete all the others except myself 1117 { 1118 jpvmTaskListRecord tmp2 = hosts.firstIter(); 1119 while(tmp2 != null) { 1120 if(!tmp2.tid.equals(myTid)) 1121 { 1122 hosts.deleteTask(tmp2.tid); 1123 } 1124 tmp2 = hosts.nextIter(); 1125 } 1126 } 1127 else 1128 { 1129 hosts.deleteTask(tmp.tid); //remove it from the list 1130 } 1131 1132 } 1133 1134 private static void DeleteHostBcast(jpvmTaskId client, jpvmBuffer req) { 1135 if(client.equals(myTid)) 1136 { 1137 return; 1138 } 1139 1140 try { 1141 jpvmTaskId tid = req.upktid(); 1142 1143 // perror("myid: "+myTid+" deletingid: "+tid); 1144 1145 if(tid.equals(myTid))//delete all the others except myself 1146 { 1147 // perror("myid: "+myTid+" deletingid: "+tid); 1148 jpvmTaskListRecord tmp = hosts.firstIter(); 1149 while(tmp != null) { 1150 if(!tmp.tid.equals(myTid)) 1151 { 1152 hosts.deleteTask(tmp.tid); 1153 } 1154 tmp = hosts.nextIter(); 1155 } 1156 } 1157 else 1158 { 1159 hosts.deleteTask(tid); 1160 } 1161 } 1162 catch (jpvmException jpe) { 1163 log("bad AddHost invocation"); 1164 } 1165 } 1166 1167 private static void HostStatus(jpvmTaskId client, jpvmBuffer req) { 1168 jpvmBuffer buf = new jpvmBuffer(); 1169 int nhosts = hosts.numTasks(); 1170 buf.pack(nhosts); 1171 jpvmTaskId dTids[] = new jpvmTaskId[nhosts]; 1172 jpvmTaskListRecord tmp = hosts.firstIter(); 1173 int i = 0; 1174 while(tmp != null) { 1175 dTids[i] = tmp.tid; 1176 buf.pack(tmp.name); 1177 tmp = hosts.nextIter(); 1178 i++; 1179 } 1180 buf.pack(dTids,nhosts,1); 1181 try { 1182 jpvm.pvm_send(buf,client,jpvmDaemonMessageTag.jpvmdHostStatus); 1183 } 1184 catch (jpvmException jpe) { 1185 perror("HostStatus, \""+jpe+"\" sending to client " + 1186 client.toString()); 1187 } 1188 } 1189 1190 private static void HostHalt(jpvmTaskId client, jpvmBuffer req) { 1191 log("shutting down"); 1192 System.exit(0); 1193 } 1194 1195 private static void Halt(jpvmTaskId client, jpvmBuffer req) { 1196 jpvmBuffer buf = new jpvmBuffer(); 1197 daemonBcast(buf,jpvmDaemonMessageTag.jpvmdHostHalt); 1198 } 1199 1200 private static void HostDeleteAll(jpvmTaskId client, jpvmBuffer req) { 1201 tasks.deleteAllTask(); 1202// task.taksalldelete(); 1203// log("shutting down"); 1204// System.exit(0); 1205 } 1206 1207 private static void DeleteAll(jpvmTaskId client, jpvmBuffer req) { 1208 jpvmBuffer buf = new jpvmBuffer(); 1209 daemonBcast(buf,jpvmDaemonMessageTag.jpvmdHostDeleteAll); 1210 } 1211 1212 1213 private static void log(String message) { 1214 if(log_on) { 1215 System.out.println("jpvm daemon: "+message); 1216 System.out.flush(); 1217 } 1218 } 1219 1220 private static void perror(String message) { 1221 System.err.println("jpvm daemon: "+message); 1222 System.err.flush(); 1223 } 1224 1225 private static void writeDaemonFile() { 1226 String fileName = jpvmEnvironment.pvm_daemon_file_name(); 1227 try { 1228 File f = new File(fileName); 1229 FileOutputStream fout = new FileOutputStream(f); 1230 DataOutputStream dout = new DataOutputStream(fout); 1231 int port = myTid.getPort(); 1232 dout.writeInt(port); 1233 dout.flush(); 1234 } 1235 catch (IOException ioe) { 1236 perror("error writing \""+fileName+"\""); 1237 } 1238 } 1239};