001/* jpvmEnvironment.java
002 *
003 * The jpvmEnvironment class implements the jpvm run time environment
004 * of a task enrolled in the jpvm parallel virtual machine.
005 *
006 * Adam J Ferrari
007 * Sat 05-25-1996
008 *
009 * Copyright (C) 1996  Adam J Ferrari
010 * 
011 * This library is free software; you can redistribute it and/or
012 * modify it under the terms of the GNU Library General Public
013 * License as published by the Free Software Foundation; either
014 * version 2 of the License, or (at your option) any later version.
015 * 
016 * This library is distributed in the hope that it will be useful,
017 * but WITHOUT ANY WARRANTY; without even the implied warranty of
018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
019 * Library General Public License for more details.
020 * 
021 * You should have received a copy of the GNU Library General Public
022 * License along with this library; if not, write to the
023 * Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
024 * MA 02139, USA.
025 */
026
027package jpvm;
028import jpvm.jpvmTaskId;
029import jpvm.jpvmMessage;
030import jpvm.jpvmMessageQueue;
031import jpvm.jpvmConnectionSet;
032import jpvm.jpvmConnectionServer;
033import jpvm.jpvmConfiguration;
034import jpvm.jpvmTaskStatus;
035import java.io.*;
036import java.net.*;
037
038public
039class jpvmEnvironment {
040        public  static final jpvmTaskId         PvmNoParent     = null;
041
042        private static jpvmTaskId               myTid;
043        private static jpvmTaskId               parentTid;
044        private static jpvmTaskId               daemonTid;
045        private static jpvmMessageQueue         myMessageQueue;
046        private static jpvmConnectionSet        myConnectionSet;
047        private static jpvmConnectionServer     connectionServer;
048        private static String                   myName;
049        private static int                      registrationNumber = -1;
050
051        public jpvmEnvironment() throws jpvmException {
052                init(false,null);
053        }
054
055        public jpvmEnvironment(String taskName) throws jpvmException {
056                init(false,taskName);
057        }
058
059        public jpvmEnvironment(boolean isDaemon) throws jpvmException {
060                init(isDaemon, null);
061        }
062
063        public jpvmEnvironment(boolean isDaemon, int port) throws jpvmException {
064                init(isDaemon, null, port);
065        }
066
067        public jpvmEnvironment(boolean isDaemon, String taskName) 
068                throws jpvmException {
069                init(isDaemon, taskName);
070        }
071
072        public jpvmTaskId pvm_mytid() {
073                return myTid;
074        }
075
076        public jpvmTaskId pvm_parent() {
077                return parentTid;
078        }
079
080        public int pvm_spawn(String task_name, int num, jpvmTaskId tids[]) 
081          throws jpvmException {
082            int ret = 0;
083            jpvmBuffer buf = new jpvmBuffer();
084            buf.pack(num);
085            buf.pack(task_name);
086            buf.pack(512); //default maximum heap size;
087            pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdSpawnTask);
088            try {
089                jpvmMessage m = pvm_recv(jpvmDaemonMessageTag.jpvmdSpawnTask);
090                ret = m.buffer.upkint();
091                m.buffer.unpack(tids,ret,1);
092            }
093            catch (jpvmException jpe) {
094                jpvmDebug.error("pvm_spawn, internal error");
095            }
096            return ret;
097        }
098
099        public int pvm_spawn(String task_name, int num, jpvmTaskId tids[], int heapSize) 
100          throws jpvmException {
101            int ret = 0;
102            jpvmBuffer buf = new jpvmBuffer();
103            buf.pack(num);
104            buf.pack(task_name);
105            buf.pack(heapSize);
106            pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdSpawnTask);
107            try {
108                jpvmMessage m = pvm_recv(jpvmDaemonMessageTag.jpvmdSpawnTask);
109                ret = m.buffer.upkint();
110                m.buffer.unpack(tids,ret,1);
111            }
112            catch (jpvmException jpe) {
113                jpvmDebug.error("pvm_spawn, internal error");
114            }
115            return ret;
116        }
117
118
119        public synchronized void pvm_send(jpvmBuffer buf, jpvmTaskId tid,
120            int tag) throws jpvmException {
121                jpvmDebug.note("pvm_send, sending message to "+tid.toString());
122                jpvmMessage message = new jpvmMessage(buf, tid, myTid, tag);
123                if(myTid.equals(tid)) {
124                        // Just enqueue the message
125                        myMessageQueue.enqueue(message);
126                }
127                else {
128                        jpvmSendConnection conn = getConnection(tid);
129                        message.send(conn);
130                }
131        }
132
133        public synchronized void pvm_barrier(String host, int tag, int num) throws jpvmException,Exception {
134                Socket sock = new Socket(host,11111);
135                try {
136                        sock.setTcpNoDelay(true);
137                }
138                catch (NoSuchMethodError er) {}
139                {
140                }
141
142                OutputStream outstrm = sock.getOutputStream();
143                InputStream input = sock.getInputStream();
144                outstrm.write(tag);
145                outstrm.write(num);
146                input.read();
147                outstrm.close();
148                input.close();
149                sock.close();
150//              System.out.println("over");
151                /*
152                BufferedOutputStream oustrm = new BufferedOutputStream(outstrm);
153                DataOutputStream strm = new DataOutputStream(oustrm);
154                strm.writeInt(tag);
155                System.out.println("write tag");
156                strm.writeInt(num);
157                System.out.println("write number");
158                strm.writeInt(0);
159                System.out.println("write 0");
160                */
161                /*
162                strm.close();
163                oustrm.close();
164                outstrm.close();
165                sock.close();
166                */
167        }
168
169        public synchronized void pvm_mcast(jpvmBuffer buf, jpvmTaskId tids[],
170            int ntids, int tag) throws jpvmException {
171                int exceptions = 0;
172                jpvmMessage message = new jpvmMessage(buf, null, myTid, tag);
173                for(int i=0;i<ntids; i++) {
174                        jpvmTaskId tid = tids[i];
175                        message.destTid = tid;
176                        jpvmDebug.note("pvm_mcast, sending message to "+
177                                tid.toString());
178                        try {
179                            jpvmSendConnection conn = getConnection(tid);
180                            message.send(conn);
181                        }
182                        catch (jpvmException jpe) {
183                                exceptions++;
184                        }
185                }
186                if(exceptions>0) 
187                        throw new jpvmException("pvm_mcast, some messages "+
188                                "failed");
189        }
190
191        public jpvmMessage pvm_recv(jpvmTaskId tid, int tag)
192            throws jpvmException{
193                //
194                // Thanks to Professor Thomas R. James of the Dept. of
195                // Mathematical Sciences at Otterbein College for finding
196                // and fixing race condition in the previous implementation
197                // of pvm_recv.
198                //
199                // Adam Ferrari - Mon Feb  1 13:11:12 EST 1999
200                //
201                return myMessageQueue.dequeue(tid,tag);
202        }
203
204        public jpvmMessage pvm_recv(jpvmTaskId tid) throws jpvmException {
205                return myMessageQueue.dequeue(tid);
206        }
207
208        public jpvmMessage pvm_recv(int tag) throws jpvmException {
209                return myMessageQueue.dequeue(tag);
210        }
211
212        public jpvmMessage pvm_recv() throws jpvmException {
213                return myMessageQueue.dequeue();
214        }
215
216        public jpvmMessage pvm_nrecv(jpvmTaskId tid, int tag) 
217            throws jpvmException {
218                return myMessageQueue.dequeueNonBlock(tid,tag);
219        }
220
221        public jpvmMessage pvm_nrecv(jpvmTaskId tid) throws jpvmException {
222                return myMessageQueue.dequeueNonBlock(tid);
223        }
224
225        public jpvmMessage pvm_nrecv(int tag) throws jpvmException {
226                return myMessageQueue.dequeueNonBlock(tag);
227        }
228
229        public jpvmMessage pvm_nrecv() throws jpvmException {
230                return myMessageQueue.dequeueNonBlock();
231        }
232
233        public boolean pvm_probe(jpvmTaskId tid, int tag) throws jpvmException {
234                return myMessageQueue.probe(tid,tag);
235        }
236
237        public boolean pvm_probe(jpvmTaskId tid) throws jpvmException {
238                return myMessageQueue.probe(tid);
239        }
240
241        public boolean pvm_probe(int tag) throws jpvmException {
242                return myMessageQueue.probe(tag);
243        }
244
245        public boolean pvm_probe() throws jpvmException {
246                return myMessageQueue.probe();
247        }
248
249        public void pvm_exit() throws jpvmException {
250                jpvmBuffer buf = new jpvmBuffer();
251                try {
252                        Thread.sleep(1000);
253                }
254                catch (InterruptedException ie) {
255                }
256                pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdDeleteTask);
257        }
258
259        public jpvmConfiguration pvm_config() {
260            jpvmBuffer buf = new jpvmBuffer();
261            jpvmConfiguration ret = null;
262            try {
263                pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdHostStatus);
264                jpvmMessage m = pvm_recv(jpvmDaemonMessageTag.jpvmdHostStatus);
265                int n = m.buffer.upkint();
266                ret = new jpvmConfiguration(n);
267                for(int i=0;i<n;i++)
268                        ret.hostNames[i] = m.buffer.upkstr();
269                m.buffer.unpack(ret.hostDaemonTids,n,1);
270            }
271            catch (jpvmException jpe) {
272                jpvmDebug.error("pvm_config, internal error");
273            }
274            return ret;
275        }
276
277        public jpvmTaskStatus pvm_tasks(jpvmConfiguration conf, int which) {
278            jpvmTaskStatus ret = null;
279            if(conf==null || which<0 || which>=conf.numHosts)
280                return null;
281
282            try {
283                jpvmBuffer buf = new jpvmBuffer();
284                pvm_send(buf,conf.hostDaemonTids[which],
285                        jpvmDaemonMessageTag.jpvmdTaskStatus);
286                jpvmMessage m = pvm_recv(jpvmDaemonMessageTag.jpvmdTaskStatus);
287                ret = new jpvmTaskStatus();
288                ret.hostName = conf.hostNames[which];
289                ret.numTasks = m.buffer.upkint();
290                if(ret.numTasks==0) {
291                        ret.taskNames = null;
292                        ret.taskTids = null;
293                        return ret;
294                }
295                ret.taskNames = new String[ret.numTasks];
296                ret.taskTids = new jpvmTaskId[ret.numTasks];
297                for(int i=0;i<ret.numTasks;i++)
298                        ret.taskNames[i] = m.buffer.upkstr();
299                m.buffer.unpack(ret.taskTids,ret.numTasks,1);
300            }
301            catch (jpvmException jpe) {
302                jpvmDebug.error("pvm_tasks, internal error");
303            }
304            return ret;
305        } 
306
307        public void pvm_halt() throws jpvmException {
308                jpvmBuffer buf = new jpvmBuffer();
309                pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdHalt);
310        }
311
312        public void pvm_deleteTasks() throws jpvmException {
313                jpvmBuffer buf = new jpvmBuffer();
314                pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdDeleteAll);
315        }
316
317        public void pvm_addhosts(int nhosts, String hostnames[],
318            jpvmTaskId daemonTids[]) throws jpvmException {
319                jpvmBuffer buf = new jpvmBuffer();
320                buf.pack(nhosts);
321                for(int i=0;i<nhosts;i++)
322                        buf.pack(hostnames[i]);
323                buf.pack(daemonTids,nhosts,1);
324                pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdAddHost);
325        }
326
327        public void pvm_delhosts(String hostname) throws jpvmException {
328                jpvmBuffer buf = new jpvmBuffer();
329                buf.pack(hostname);
330                pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdDelHost);
331        }
332
333
334        // Internal methods:
335        public void init(boolean isDaemon, String taskName) 
336            throws jpvmException {
337                myMessageQueue  = new jpvmMessageQueue();
338                myConnectionSet = new jpvmConnectionSet();
339                connectionServer= new jpvmConnectionServer(myConnectionSet,
340                        myMessageQueue);
341                myTid = new jpvmTaskId(connectionServer.getConnectionPort());
342                connectionServer.setDaemon(true);
343                connectionServer.start();
344                if(!isDaemon) {
345                        findDaemon();
346                        findParent();
347                        registerDaemon(taskName);
348                }
349        }
350
351        private void init(boolean isDaemon, String taskName, int port) 
352            throws jpvmException {
353                myMessageQueue  = new jpvmMessageQueue();
354                myConnectionSet = new jpvmConnectionSet();
355                connectionServer= new jpvmConnectionServer(myConnectionSet,
356                        myMessageQueue, port);
357                myTid = new jpvmTaskId(connectionServer.getConnectionPort());
358                connectionServer.setDaemon(true);
359                connectionServer.start();
360                if(!isDaemon) {
361                        findDaemon();
362                        findParent();
363                        registerDaemon(taskName);
364                }
365        }
366        private jpvmTaskId initTaskId() {
367                jpvmTaskId ret = null;
368                return ret;
369        }
370
371        private jpvmSendConnection getConnection(jpvmTaskId tid) 
372            throws jpvmException {
373                jpvmSendConnection ret = null;
374                ret = myConnectionSet.lookupSendConnection(tid);
375                if(ret!=null) {
376                        // Had a cached connection...
377                        return ret;
378                }
379                // Must establish a new connection.
380                ret = jpvmSendConnection.connect(tid,myTid);
381                if(ret!=null) {
382                        myConnectionSet.insertSendConnection(ret);
383                        return ret;
384                }
385                throw new jpvmException("getConnection, connect failed");
386        }
387
388        private void findDaemon() throws jpvmException{
389                int daemonPort = -1;
390                String daemonPortStr = System.getProperty("jpvm.daemon");
391                if(daemonPortStr != null) {
392                    try {
393                        daemonPort = Integer.valueOf(daemonPortStr).intValue();
394                    }
395                    catch (NumberFormatException nfe) {
396                        jpvmDebug.error("couldn't bind to daemon, " +
397                                "jpvm.daemon not an integer");
398                        daemonPort = -1;
399                    }
400                }
401                else {
402                        try
403                        {
404                        daemonPort = readDaemonFile();
405                        }
406                        catch (IOException ion)
407                        {
408                        throw new jpvmException();
409                        }
410                }
411                if(daemonPort==-1) {
412                        jpvmDebug.error("couldn't bind to daemon, " + "jpvm.daemon not defined");
413                        throw new jpvmException();
414                }
415                daemonTid = new jpvmTaskId(daemonPort);
416        }
417
418        private void findParent() throws jpvmException {
419                String parentHost = System.getProperty("jpvm.parhost");
420                int parentPort = 0;
421                if(parentHost == null) {
422                        parentTid = null;
423                        return;
424                }
425                String parentPortStr = System.getProperty("jpvm.parport");
426                try {
427                        parentPort = Integer.valueOf(parentPortStr).intValue();
428                }
429                catch (NumberFormatException nfe) {
430                        jpvmDebug.error("couldn't bind to parent, " +
431                                "jpvm.parport not an integer");
432                }
433                parentTid = new jpvmTaskId(parentHost,parentPort);
434
435                // Since we have a parent, register with the daemon
436                String regStr = System.getProperty("jpvm.regnum");
437                if(regStr==null) {
438                        jpvmDebug.error("no task registration number");
439                }
440                else {
441                        try {
442                                registrationNumber =
443                                        Integer.valueOf(regStr).intValue();
444                        }
445                        catch (NumberFormatException nfe) {
446                            jpvmDebug.error("invalid task registration number");
447                        }
448                }
449                jpvmBuffer buf = new jpvmBuffer();
450                buf.pack(registrationNumber);
451                pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdRegisterChild);
452        }
453
454        private void registerDaemon(String taskName) throws jpvmException {
455                // Find out the name of this task
456                String point;
457                Integer a=-1;
458                if(taskName == null) {
459                        myName = System.getProperty("jpvm.taskname");
460                        point =  System.getProperty("jpvm.pointer");
461                        if(point != null)
462                        {
463                                a = Integer.parseInt(point);
464                        }
465                        else
466                        {
467                                a = -1;
468                        }
469                        if(myName == null) myName="(command line jpvm task)";
470                }
471                else {  
472                        myName = new String(taskName);
473                }
474
475                // Register this task with the daemon
476                jpvmBuffer buf = new jpvmBuffer();
477                buf.pack(myName);
478                buf.pack(a);
479                pvm_send(buf,daemonTid,jpvmDaemonMessageTag.jpvmdRegisterTask);
480        }
481
482        public int readDaemonFile() throws IOException{
483                int port = -1;
484                String fileName = pvm_daemon_file_name();
485                try {
486                        File f = new File(fileName);
487                        FileInputStream fin = new FileInputStream(f);
488                        DataInputStream din = new DataInputStream(fin);
489                        port = din.readInt();
490                }
491                catch (IOException ioe) {
492                        //ioe.printStackTrace();
493                        //jpvmDebug.error("error reading \""+fileName+"\"");
494                        port = -1;
495                        throw new IOException();
496                }
497                return port;
498        }
499
500        public static String pvm_daemon_file_name() {
501                String osName   = System.getProperty("os.name");
502                String userName = System.getProperty("user.name");
503                String fileName = null;
504                //if(osName.equals("Windows 95") || osName.equals("Windows NT") || osName.equals("Windows 3.1") || osName.equals("Windows XP")) 
505                String hostName = myTid.getHost();
506                if(osName.startsWith("Windows"))
507                {
508                        //fileName = "c:\\WINDOWS\\Temp\\jpvmd-"+userName+".txt";
509                        fileName = "log\\jpvmd-"+userName+"-"+hostName+".txt";
510                }
511                else {
512                        //fileName = "/tmp/jpvmd."+userName;
513                        fileName = "log/jpvmd-"+userName+"-"+hostName+".txt";
514                }
515                return fileName;
516        }
517};