001/* jpvmEnvironment.java 002 * 003 * The jpvmEnvironment class implements the jpvm run time environment 004 * of a task enrolled in the jpvm parallel virtual machine. 005 * 006 * Adam J Ferrari 007 * Sat 05-25-1996 008 * 009 * Copyright (C) 1996 Adam J Ferrari 010 * 011 * This library is free software; you can redistribute it and/or 012 * modify it under the terms of the GNU Library General Public 013 * License as published by the Free Software Foundation; either 014 * version 2 of the License, or (at your option) any later version. 015 * 016 * This library is distributed in the hope that it will be useful, 017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 019 * Library General Public License for more details. 020 * 021 * You should have received a copy of the GNU Library General Public 022 * License along with this library; if not, write to the 023 * Free Software Foundation, Inc., 675 Mass Ave, Cambridge, 024 * MA 02139, USA. 025 */ 026 027package jpvm; 028import jpvm.jpvmTaskId; 029import jpvm.jpvmMessage; 030import jpvm.jpvmMessageQueue; 031import jpvm.jpvmConnectionSet; 032import jpvm.jpvmConnectionServer; 033import jpvm.jpvmConfiguration; 034import jpvm.jpvmTaskStatus; 035import java.io.*; 036import java.net.*; 037 038public 039class jpvmEnvironment { 040 public static final jpvmTaskId PvmNoParent = null; 041 042 private static jpvmTaskId myTid; 043 private static jpvmTaskId parentTid; 044 private static jpvmTaskId daemonTid; 045 private static jpvmMessageQueue myMessageQueue; 046 private static jpvmConnectionSet myConnectionSet; 047 private static jpvmConnectionServer connectionServer; 048 private static String myName; 049 private static int registrationNumber = -1; 050 051 public jpvmEnvironment() throws jpvmException { 052 init(false,null); 053 } 054 055 public jpvmEnvironment(String taskName) throws jpvmException { 056 init(false,taskName); 057 } 058 059 public jpvmEnvironment(boolean isDaemon) throws jpvmException { 060 init(isDaemon, null); 061 } 062 063 public jpvmEnvironment(boolean isDaemon, int port) throws jpvmException { 064 init(isDaemon, null, port); 065 } 066 067 public jpvmEnvironment(boolean isDaemon, String taskName) 068 throws jpvmException { 069 init(isDaemon, taskName); 070 } 071 072 public jpvmTaskId pvm_mytid() { 073 return myTid; 074 } 075 076 public jpvmTaskId pvm_parent() { 077 return parentTid; 078 } 079 080 public int pvm_spawn(String task_name, int num, jpvmTaskId tids[]) 081 throws jpvmException { 082 int ret = 0; 083 jpvmBuffer buf = new jpvmBuffer(); 084 buf.pack(num); 085 buf.pack(task_name); 086 buf.pack(512); //default maximum heap size; 087 pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdSpawnTask); 088 try { 089 jpvmMessage m = pvm_recv(jpvmDaemonMessageTag.jpvmdSpawnTask); 090 ret = m.buffer.upkint(); 091 m.buffer.unpack(tids,ret,1); 092 } 093 catch (jpvmException jpe) { 094 jpvmDebug.error("pvm_spawn, internal error"); 095 } 096 return ret; 097 } 098 099 public int pvm_spawn(String task_name, int num, jpvmTaskId tids[], int heapSize) 100 throws jpvmException { 101 int ret = 0; 102 jpvmBuffer buf = new jpvmBuffer(); 103 buf.pack(num); 104 buf.pack(task_name); 105 buf.pack(heapSize); 106 pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdSpawnTask); 107 try { 108 jpvmMessage m = pvm_recv(jpvmDaemonMessageTag.jpvmdSpawnTask); 109 ret = m.buffer.upkint(); 110 m.buffer.unpack(tids,ret,1); 111 } 112 catch (jpvmException jpe) { 113 jpvmDebug.error("pvm_spawn, internal error"); 114 } 115 return ret; 116 } 117 118 119 public synchronized void pvm_send(jpvmBuffer buf, jpvmTaskId tid, 120 int tag) throws jpvmException { 121 jpvmDebug.note("pvm_send, sending message to "+tid.toString()); 122 jpvmMessage message = new jpvmMessage(buf, tid, myTid, tag); 123 if(myTid.equals(tid)) { 124 // Just enqueue the message 125 myMessageQueue.enqueue(message); 126 } 127 else { 128 jpvmSendConnection conn = getConnection(tid); 129 message.send(conn); 130 } 131 } 132 133 public synchronized void pvm_barrier(String host, int tag, int num) throws jpvmException,Exception { 134 Socket sock = new Socket(host,11111); 135 try { 136 sock.setTcpNoDelay(true); 137 } 138 catch (NoSuchMethodError er) {} 139 { 140 } 141 142 OutputStream outstrm = sock.getOutputStream(); 143 InputStream input = sock.getInputStream(); 144 outstrm.write(tag); 145 outstrm.write(num); 146 input.read(); 147 outstrm.close(); 148 input.close(); 149 sock.close(); 150// System.out.println("over"); 151 /* 152 BufferedOutputStream oustrm = new BufferedOutputStream(outstrm); 153 DataOutputStream strm = new DataOutputStream(oustrm); 154 strm.writeInt(tag); 155 System.out.println("write tag"); 156 strm.writeInt(num); 157 System.out.println("write number"); 158 strm.writeInt(0); 159 System.out.println("write 0"); 160 */ 161 /* 162 strm.close(); 163 oustrm.close(); 164 outstrm.close(); 165 sock.close(); 166 */ 167 } 168 169 public synchronized void pvm_mcast(jpvmBuffer buf, jpvmTaskId tids[], 170 int ntids, int tag) throws jpvmException { 171 int exceptions = 0; 172 jpvmMessage message = new jpvmMessage(buf, null, myTid, tag); 173 for(int i=0;i<ntids; i++) { 174 jpvmTaskId tid = tids[i]; 175 message.destTid = tid; 176 jpvmDebug.note("pvm_mcast, sending message to "+ 177 tid.toString()); 178 try { 179 jpvmSendConnection conn = getConnection(tid); 180 message.send(conn); 181 } 182 catch (jpvmException jpe) { 183 exceptions++; 184 } 185 } 186 if(exceptions>0) 187 throw new jpvmException("pvm_mcast, some messages "+ 188 "failed"); 189 } 190 191 public jpvmMessage pvm_recv(jpvmTaskId tid, int tag) 192 throws jpvmException{ 193 // 194 // Thanks to Professor Thomas R. James of the Dept. of 195 // Mathematical Sciences at Otterbein College for finding 196 // and fixing race condition in the previous implementation 197 // of pvm_recv. 198 // 199 // Adam Ferrari - Mon Feb 1 13:11:12 EST 1999 200 // 201 return myMessageQueue.dequeue(tid,tag); 202 } 203 204 public jpvmMessage pvm_recv(jpvmTaskId tid) throws jpvmException { 205 return myMessageQueue.dequeue(tid); 206 } 207 208 public jpvmMessage pvm_recv(int tag) throws jpvmException { 209 return myMessageQueue.dequeue(tag); 210 } 211 212 public jpvmMessage pvm_recv() throws jpvmException { 213 return myMessageQueue.dequeue(); 214 } 215 216 public jpvmMessage pvm_nrecv(jpvmTaskId tid, int tag) 217 throws jpvmException { 218 return myMessageQueue.dequeueNonBlock(tid,tag); 219 } 220 221 public jpvmMessage pvm_nrecv(jpvmTaskId tid) throws jpvmException { 222 return myMessageQueue.dequeueNonBlock(tid); 223 } 224 225 public jpvmMessage pvm_nrecv(int tag) throws jpvmException { 226 return myMessageQueue.dequeueNonBlock(tag); 227 } 228 229 public jpvmMessage pvm_nrecv() throws jpvmException { 230 return myMessageQueue.dequeueNonBlock(); 231 } 232 233 public boolean pvm_probe(jpvmTaskId tid, int tag) throws jpvmException { 234 return myMessageQueue.probe(tid,tag); 235 } 236 237 public boolean pvm_probe(jpvmTaskId tid) throws jpvmException { 238 return myMessageQueue.probe(tid); 239 } 240 241 public boolean pvm_probe(int tag) throws jpvmException { 242 return myMessageQueue.probe(tag); 243 } 244 245 public boolean pvm_probe() throws jpvmException { 246 return myMessageQueue.probe(); 247 } 248 249 public void pvm_exit() throws jpvmException { 250 jpvmBuffer buf = new jpvmBuffer(); 251 try { 252 Thread.sleep(1000); 253 } 254 catch (InterruptedException ie) { 255 } 256 pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdDeleteTask); 257 } 258 259 public jpvmConfiguration pvm_config() { 260 jpvmBuffer buf = new jpvmBuffer(); 261 jpvmConfiguration ret = null; 262 try { 263 pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdHostStatus); 264 jpvmMessage m = pvm_recv(jpvmDaemonMessageTag.jpvmdHostStatus); 265 int n = m.buffer.upkint(); 266 ret = new jpvmConfiguration(n); 267 for(int i=0;i<n;i++) 268 ret.hostNames[i] = m.buffer.upkstr(); 269 m.buffer.unpack(ret.hostDaemonTids,n,1); 270 } 271 catch (jpvmException jpe) { 272 jpvmDebug.error("pvm_config, internal error"); 273 } 274 return ret; 275 } 276 277 public jpvmTaskStatus pvm_tasks(jpvmConfiguration conf, int which) { 278 jpvmTaskStatus ret = null; 279 if(conf==null || which<0 || which>=conf.numHosts) 280 return null; 281 282 try { 283 jpvmBuffer buf = new jpvmBuffer(); 284 pvm_send(buf,conf.hostDaemonTids[which], 285 jpvmDaemonMessageTag.jpvmdTaskStatus); 286 jpvmMessage m = pvm_recv(jpvmDaemonMessageTag.jpvmdTaskStatus); 287 ret = new jpvmTaskStatus(); 288 ret.hostName = conf.hostNames[which]; 289 ret.numTasks = m.buffer.upkint(); 290 if(ret.numTasks==0) { 291 ret.taskNames = null; 292 ret.taskTids = null; 293 return ret; 294 } 295 ret.taskNames = new String[ret.numTasks]; 296 ret.taskTids = new jpvmTaskId[ret.numTasks]; 297 for(int i=0;i<ret.numTasks;i++) 298 ret.taskNames[i] = m.buffer.upkstr(); 299 m.buffer.unpack(ret.taskTids,ret.numTasks,1); 300 } 301 catch (jpvmException jpe) { 302 jpvmDebug.error("pvm_tasks, internal error"); 303 } 304 return ret; 305 } 306 307 public void pvm_halt() throws jpvmException { 308 jpvmBuffer buf = new jpvmBuffer(); 309 pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdHalt); 310 } 311 312 public void pvm_deleteTasks() throws jpvmException { 313 jpvmBuffer buf = new jpvmBuffer(); 314 pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdDeleteAll); 315 } 316 317 public void pvm_addhosts(int nhosts, String hostnames[], 318 jpvmTaskId daemonTids[]) throws jpvmException { 319 jpvmBuffer buf = new jpvmBuffer(); 320 buf.pack(nhosts); 321 for(int i=0;i<nhosts;i++) 322 buf.pack(hostnames[i]); 323 buf.pack(daemonTids,nhosts,1); 324 pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdAddHost); 325 } 326 327 public void pvm_delhosts(String hostname) throws jpvmException { 328 jpvmBuffer buf = new jpvmBuffer(); 329 buf.pack(hostname); 330 pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdDelHost); 331 } 332 333 334 // Internal methods: 335 public void init(boolean isDaemon, String taskName) 336 throws jpvmException { 337 myMessageQueue = new jpvmMessageQueue(); 338 myConnectionSet = new jpvmConnectionSet(); 339 connectionServer= new jpvmConnectionServer(myConnectionSet, 340 myMessageQueue); 341 myTid = new jpvmTaskId(connectionServer.getConnectionPort()); 342 connectionServer.setDaemon(true); 343 connectionServer.start(); 344 if(!isDaemon) { 345 findDaemon(); 346 findParent(); 347 registerDaemon(taskName); 348 } 349 } 350 351 private void init(boolean isDaemon, String taskName, int port) 352 throws jpvmException { 353 myMessageQueue = new jpvmMessageQueue(); 354 myConnectionSet = new jpvmConnectionSet(); 355 connectionServer= new jpvmConnectionServer(myConnectionSet, 356 myMessageQueue, port); 357 myTid = new jpvmTaskId(connectionServer.getConnectionPort()); 358 connectionServer.setDaemon(true); 359 connectionServer.start(); 360 if(!isDaemon) { 361 findDaemon(); 362 findParent(); 363 registerDaemon(taskName); 364 } 365 } 366 private jpvmTaskId initTaskId() { 367 jpvmTaskId ret = null; 368 return ret; 369 } 370 371 private jpvmSendConnection getConnection(jpvmTaskId tid) 372 throws jpvmException { 373 jpvmSendConnection ret = null; 374 ret = myConnectionSet.lookupSendConnection(tid); 375 if(ret!=null) { 376 // Had a cached connection... 377 return ret; 378 } 379 // Must establish a new connection. 380 ret = jpvmSendConnection.connect(tid,myTid); 381 if(ret!=null) { 382 myConnectionSet.insertSendConnection(ret); 383 return ret; 384 } 385 throw new jpvmException("getConnection, connect failed"); 386 } 387 388 private void findDaemon() throws jpvmException{ 389 int daemonPort = -1; 390 String daemonPortStr = System.getProperty("jpvm.daemon"); 391 if(daemonPortStr != null) { 392 try { 393 daemonPort = Integer.valueOf(daemonPortStr).intValue(); 394 } 395 catch (NumberFormatException nfe) { 396 jpvmDebug.error("couldn't bind to daemon, " + 397 "jpvm.daemon not an integer"); 398 daemonPort = -1; 399 } 400 } 401 else { 402 try 403 { 404 daemonPort = readDaemonFile(); 405 } 406 catch (IOException ion) 407 { 408 throw new jpvmException(); 409 } 410 } 411 if(daemonPort==-1) { 412 jpvmDebug.error("couldn't bind to daemon, " + "jpvm.daemon not defined"); 413 throw new jpvmException(); 414 } 415 daemonTid = new jpvmTaskId(daemonPort); 416 } 417 418 private void findParent() throws jpvmException { 419 String parentHost = System.getProperty("jpvm.parhost"); 420 int parentPort = 0; 421 if(parentHost == null) { 422 parentTid = null; 423 return; 424 } 425 String parentPortStr = System.getProperty("jpvm.parport"); 426 try { 427 parentPort = Integer.valueOf(parentPortStr).intValue(); 428 } 429 catch (NumberFormatException nfe) { 430 jpvmDebug.error("couldn't bind to parent, " + 431 "jpvm.parport not an integer"); 432 } 433 parentTid = new jpvmTaskId(parentHost,parentPort); 434 435 // Since we have a parent, register with the daemon 436 String regStr = System.getProperty("jpvm.regnum"); 437 if(regStr==null) { 438 jpvmDebug.error("no task registration number"); 439 } 440 else { 441 try { 442 registrationNumber = 443 Integer.valueOf(regStr).intValue(); 444 } 445 catch (NumberFormatException nfe) { 446 jpvmDebug.error("invalid task registration number"); 447 } 448 } 449 jpvmBuffer buf = new jpvmBuffer(); 450 buf.pack(registrationNumber); 451 pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdRegisterChild); 452 } 453 454 private void registerDaemon(String taskName) throws jpvmException { 455 // Find out the name of this task 456 String point; 457 Integer a=-1; 458 if(taskName == null) { 459 myName = System.getProperty("jpvm.taskname"); 460 point = System.getProperty("jpvm.pointer"); 461 if(point != null) 462 { 463 a = Integer.parseInt(point); 464 } 465 else 466 { 467 a = -1; 468 } 469 if(myName == null) myName="(command line jpvm task)"; 470 } 471 else { 472 myName = new String(taskName); 473 } 474 475 // Register this task with the daemon 476 jpvmBuffer buf = new jpvmBuffer(); 477 buf.pack(myName); 478 buf.pack(a); 479 pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdRegisterTask); 480 } 481 482 public int readDaemonFile() throws IOException{ 483 int port = -1; 484 String fileName = pvm_daemon_file_name(); 485 try { 486 File f = new File(fileName); 487 FileInputStream fin = new FileInputStream(f); 488 DataInputStream din = new DataInputStream(fin); 489 port = din.readInt(); 490 } 491 catch (IOException ioe) { 492 //ioe.printStackTrace(); 493 //jpvmDebug.error("error reading \""+fileName+"\""); 494 port = -1; 495 throw new IOException(); 496 } 497 return port; 498 } 499 500 public static String pvm_daemon_file_name() { 501 String osName = System.getProperty("os.name"); 502 String userName = System.getProperty("user.name"); 503 String fileName = null; 504 //if(osName.equals("Windows 95") || osName.equals("Windows NT") || osName.equals("Windows 3.1") || osName.equals("Windows XP")) 505 String hostName = myTid.getHost(); 506 if(osName.startsWith("Windows")) 507 { 508 //fileName = "c:\\WINDOWS\\Temp\\jpvmd-"+userName+".txt"; 509 fileName = "log\\jpvmd-"+userName+"-"+hostName+".txt"; 510 } 511 else { 512 //fileName = "/tmp/jpvmd."+userName; 513 fileName = "log/jpvmd-"+userName+"-"+hostName+".txt"; 514 } 515 return fileName; 516 } 517};