001    package cnslab.cnsnetwork;
002
003    import java.io.*;
004
005    import jpvm.*;
006    import org.slf4j.Logger;
007    import org.slf4j.LoggerFactory;
008    
009    /***********************************************************************
010    * Trial Host class.
011    * 
012    * It listens to the message from mainhost and nethosts; it synchronizes
013    * the Nethosts at the beginning and end of the trial. 
014    * 
015    * @version
016    *   $Date: 2012-08-04 13:43:22 -0500 (Sat, 04 Aug 2012) $
017    *   $Rev: 104 $
018    *   $Author: croft $
019    * @author
020    *   Yi Dong
021    * @author
022    *   David Wallace Croft
023    ***********************************************************************/
024    public final class  TrialHost
025    ////////////////////////////////////////////////////////////////////////
026    ////////////////////////////////////////////////////////////////////////
027    {
028      
029    private static final Class<TrialHost>
030      CLASS = TrialHost.class;
031
032    private static final Logger
033      LOGGER = LoggerFactory.getLogger ( CLASS );
034    
035    //
036    
037    private boolean
038      stopthis = true;
039    
040    private double
041      rootTime,
042      tickTime;
043    
044    private double [ ]
045      localTime;
046
047    private int
048      counting,
049      expId,
050      trialId = -1;
051  
052    private JpvmInfo
053      info;
054  
055    ////////////////////////////////////////////////////////////////////////
056    ////////////////////////////////////////////////////////////////////////
057    
058    public static void main ( final String [ ]  args )
059    ////////////////////////////////////////////////////////////////////////
060    {
061      new TrialHost ( ).run ( );
062    }
063  
064    public void  run ( )
065    ////////////////////////////////////////////////////////////////////////
066    {
067      // Directing debug output to System.out since JPVM only prints to
068      // console messages directed to standard output, not standard error.
069      
070//    LoggingLib.enableDebugLogging ( System.out );
071      
072      LOGGER.trace ( "entering" );
073      
074      try
075      {
076        // int seedInt;
077        
078        // Enroll in the parallel virtual machine...
079        
080        info = new JpvmInfo ( );
081        
082        info.jpvm = new jpvmEnvironment ( );
083
084        info.myJpvmTaskId = info.jpvm.pvm_mytid ( );
085        
086        // Get my parent's task id...
087        
088        info.parentJpvmTaskId = info.jpvm.pvm_parent ( );
089
090        // receive information about its peers
091        
092        jpvmMessage  m = info.jpvm.pvm_recv ( NetMessageTag.sendTids );
093        
094        LOGGER.trace ( "Received message" );
095
096        // FileOutputStream outt = new FileOutputStream(
097        //   "log/preinfo"+iter_int+".txt");
098        //
099        // PrintStream p = new  PrintStream(outt);
100
101        int  peerNum = m.buffer.upkint ( ); // get # of peers
102        
103        final jpvmTaskId [ ]  peerTids = new jpvmTaskId [ peerNum ];
104        
105        m.buffer.unpack ( peerTids, peerNum, 1 );
106
107        int iter_int;
108        
109        for ( iter_int = 0; iter_int < peerNum; iter_int++ )
110        {
111          if ( info.myJpvmTaskId.equals ( peerTids [ iter_int ] ) )
112          {
113            break;
114          }
115        }
116        
117        info.idIndex = iter_int;
118
119        tickTime = m.buffer.upkdouble ( ) / 2.0;
120
121        m = info.jpvm.pvm_recv ( NetMessageTag.sendTids2 );
122        
123        info.numTasks = m.buffer.upkint();
124        
125        info.tids = new jpvmTaskId [ info.numTasks ];
126        
127        m.buffer.unpack (
128          info.tids,
129          info.numTasks,
130          1 );
131
132        //
133        
134        localTime = new double [ info.numTasks ];
135        
136        // FileOutputStream outt = new FileOutputStream(
137        //   "log/trialHost"+info.idIndex+".txt");
138        //
139        // PrintStream p = new  PrintStream(outt);
140
141        // Barrier Sync
142        
143        jpvmBuffer  buff = new jpvmBuffer ( );
144        
145        // send out ready info
146        
147        buff.pack ( "TrialHost " + info.jpvm.pvm_mytid ( ).toString ( )
148          +" is ready to go" );
149        
150        info.jpvm.pvm_send (
151          buff,
152          info.parentJpvmTaskId,
153          NetMessageTag.readySig );
154        
155        // m = info.jpvm.pvm_recv(NetMessageTag.readySig);
156        
157        // Barrier Sync
158
159        buff = new jpvmBuffer ( );
160        
161        // send the id who is available
162        
163        buff.pack ( info.idIndex );
164        
165        // p.println("T"+trialId+" is done");
166        
167        info.jpvm.pvm_send (
168          buff,
169          info.parentJpvmTaskId,
170          NetMessageTag.trialReady );
171
172        while ( stopthis )
173        {
174          // get info from root to start new trials
175          
176          m = info.jpvm.pvm_recv ( );
177          
178          switch ( m.messageTag )
179          {
180            case NetMessageTag.checkTime:
181              
182              processCheckTime ( );
183              
184              break;
185              
186            case NetMessageTag.oneTrial:
187              
188              processOneTrial ( m );
189              
190              break;
191              
192            case NetMessageTag.sendTick:
193              
194              processSendTick ( m );
195              
196              break;
197              
198            case NetMessageTag.stopSig:
199              
200              processStopSig ( );
201              
202              break;
203              
204            case NetMessageTag.tempStopSig:
205              
206              processTempStopSig ( );
207              
208              break;
209              
210            case NetMessageTag.trialDone:
211              
212              processTrialDone ( );
213              
214              break;
215          }
216        }
217        
218        // p.close();
219        
220        info.jpvm.pvm_exit ( );
221      } 
222      catch ( final jpvmException  jpe )
223      {
224        System.out.println ( "Error - jpvm exception" );
225        
226        try
227        {
228          FileOutputStream out = new FileOutputStream("log/myfile.txt");
229          
230          PrintStream p = new  PrintStream(out);
231          
232          p.println(jpe);
233          
234          p.close();
235        }
236        catch(Exception ex)
237        {
238          //
239        }
240      }
241      catch (Exception a)
242      {
243        try
244        {
245          FileOutputStream out = new FileOutputStream("log/myfile.txt");
246          
247          PrintStream p = new  PrintStream(out);
248          
249          a.printStackTrace(p);
250          
251          p.close();
252        }
253        catch(Exception ex)
254        {
255          //
256        }
257      }
258      
259      LOGGER.trace ( "exiting" );      
260    }
261    
262    ////////////////////////////////////////////////////////////////////////
263    // private methods
264    ////////////////////////////////////////////////////////////////////////
265    
266    private void  processCheckTime ( )
267      throws jpvmException
268    ////////////////////////////////////////////////////////////////////////
269    {
270      final jpvmBuffer  buff = new jpvmBuffer ( );
271      
272      // send the available Host id
273      
274      buff.pack ( info.idIndex );
275      
276      buff.pack ( expId );
277      
278      buff.pack ( rootTime );
279      
280      buff.pack ( trialId );
281      
282      info.jpvm.pvm_send (
283        buff,
284        info.parentJpvmTaskId,
285        NetMessageTag.checkTime );      
286    }
287    
288    private void  processOneTrial ( final jpvmMessage  m )
289      throws jpvmException
290    ////////////////////////////////////////////////////////////////////////
291    {    
292      trialId = m.buffer.upkint ( );
293    
294      expId = m.buffer.upkint ( );
295    
296      final jpvmBuffer  buff = new jpvmBuffer ( );
297    
298      // buff.pack(pas.exp.subExp[expId].trialLength);
299    
300      buff.pack ( trialId );
301    
302      buff.pack ( expId );
303    
304      // p.println("E"+expId+" T"+trialId);
305      // System.out.println("E"+expId+" T"+trialId);
306    
307      /*
308      jpvmMessage backM;
309      for(int i=0;i<info.numTasks;i++)
310      {
311      // get sig from sub nethosts for synchronization
312      backM =  info.jpvm.pvm_recv(NetMessageTag.syncTrialHost); 
313      }
314      */
315    
316      //Thread.sleep(500);
317    
318      // wait up hosts for new trials
319    
320      info.jpvm.pvm_mcast (
321        buff,
322        info.tids,
323        info.numTasks,
324        NetMessageTag.trialDone );
325    }
326    
327    private void  processSendTick ( final jpvmMessage  m )
328      throws jpvmException
329    ////////////////////////////////////////////////////////////////////////
330    {    
331      final double sysTime = m.buffer.upkdouble ( );
332    
333      final int hostId = m.buffer.upkint ( );
334    
335      boolean flag = true;
336    
337      localTime [ hostId ] = sysTime;
338    
339      for ( int i = 0; i < localTime.length; i++ )
340      {
341        if ( localTime [ i ] - tickTime < rootTime )
342        {
343          // if one of the host not satisfied, we can't broadcast
344        
345          flag = false; 
346        }
347      }
348    
349      if ( flag )
350      {
351        rootTime += tickTime;
352      
353        //buff = new jpvmBuffer();
354        //buff.pack(rootTime);
355        ////broadcast the root time
356        //info.jpvm.pvm_mcast(
357        //  buff,
358        //  info.tids,
359        //  info.numTasks,
360        //  NetMessageTag.syncRoot);
361      }
362    }
363    
364    private void  processStopSig ( )
365      throws jpvmException
366    ////////////////////////////////////////////////////////////////////////
367    {    
368      final jpvmBuffer  buff = new jpvmBuffer ( );
369    
370      // broadcast the root time
371    
372      info.jpvm.pvm_mcast (
373        buff,
374        info.tids,
375        info.numTasks,
376        NetMessageTag.stopSig );
377    
378      stopthis = false;
379    }
380    
381    private void  processTempStopSig ( )
382      throws jpvmException
383    ////////////////////////////////////////////////////////////////////////
384    {
385      final jpvmBuffer  buff = new jpvmBuffer ( );
386    
387      // p.println("tempStopSig is received");
388      // p.flush();
389    
390      //broadcast the root time
391    
392      info.jpvm.pvm_mcast (
393        buff,
394        info.tids,
395        info.numTasks,
396        NetMessageTag.tempStopSig );
397    
398      // p.println("sending to"+info.numTasks
399      //   + " tids"+info.tids[0]+" "+info.tids[1]);
400      // p.flush();
401    }
402    
403    private void  processTrialDone ( )
404      throws jpvmException
405    ////////////////////////////////////////////////////////////////////////
406    {    
407      counting++;
408    
409      // new trial should be begin
410    
411      if ( counting == info.numTasks )
412      {
413        // reset the counting
414        
415        counting = 0;
416        
417        // reset the root time;
418      
419        rootTime = 0.0;
420      
421        for ( int  i = 0; i < localTime.length; i++ )
422        {
423          //reset the hosts time
424          
425          localTime [ i ] = 0.0;
426        }
427      
428        final jpvmBuffer  buff = new jpvmBuffer ( );
429      
430        // send the id who is available
431      
432        buff.pack ( info.idIndex );
433      
434        // p.println("T"+trialId+" is done");
435      
436        info.jpvm.pvm_send (
437          buff,
438          info.parentJpvmTaskId,
439          NetMessageTag.trialReady );
440      }
441    }
442    
443    ////////////////////////////////////////////////////////////////////////
444    ////////////////////////////////////////////////////////////////////////
445    }