001    package cnslab.cnsnetwork;
002    
003    import java.io.*;
004    import java.util.concurrent.*;
005
006    import jpvm.*;
007
008    import cnslab.cnsmath.Seed;
009    import edu.jhu.mb.ernst.engine.DiscreteEvent;
010    import edu.jhu.mb.ernst.engine.DiscreteEventQueue;
011    import edu.jhu.mb.ernst.engine.queue.DiscreteEventQueueLib;
012
013    /***********************************************************************
014    * The thread used by NetHost to do the real work, i.e. process the queue
015    * events.  It also checks whether process proceeding is allowed or not.
016    * 
017    * @version
018    *   $Date: 2012-08-04 13:43:22 -0500 (Sat, 04 Aug 2012) $
019    *   $Rev: 104 $
020    *   $Author: croft $
021    * @author
022    *   Yi Dong
023    * @author
024    *   David Wallace Croft
025    ***********************************************************************/
026    public final class  PRun
027      implements Runnable
028    ////////////////////////////////////////////////////////////////////////
029    ////////////////////////////////////////////////////////////////////////
030    {
031      
032    /** Fire event id */
033    public static final int  FIRE_EVENT = 0;
034    
035    /** internal Input event id  */
036    public static final int  INPUT_EVENT = 1;
037    
038    /** Poisson Input event id */
039    public static final int  POI_EVENT = 2;
040    
041    /** Reset  id */
042    @Deprecated
043    public static final int  RESET = 3;
044    
045    public static final int  DISCRETE_EVENT = 4;
046    
047    //
048
049    /** Minimum of synaptic delay */
050    public double  minTime;
051
052    //
053
054    private final Network        network;
055
056    private final Object         lock;
057
058    private final CyclicBarrier  barrier;
059    
060    private final DiscreteEventQueue
061      discreteEventQueue;
062    
063    //
064
065    private FireEvent      firstFireEvent;
066    
067    private InputEvent     firstInputEvent;
068    
069    private PInputEvent    pInputEvent;
070    
071    private int            type;
072
073    ////////////////////////////////////////////////////////////////////////
074    ////////////////////////////////////////////////////////////////////////
075    
076    public  PRun (
077      final DiscreteEventQueue  discreteEventQueue,
078      final Network             network,
079      final Object              lock,
080      final CyclicBarrier       barrier )
081    ////////////////////////////////////////////////////////////////////////
082    {
083      this.discreteEventQueue = discreteEventQueue;
084      
085      this.network = network;
086      
087      this.lock = lock;
088      
089      this.barrier = barrier;
090    }
091
092    ////////////////////////////////////////////////////////////////////////
093    ////////////////////////////////////////////////////////////////////////
094    
095    /***********************************************************************
096    * @return
097    *   true if the communication thread is allowed to proceed
098    ***********************************************************************/
099    public boolean  runAllowed ( )
100    ////////////////////////////////////////////////////////////////////////
101    {
102      // if(network.stop) return true;
103      
104      // take out the first fire element
105      
106      firstFireEvent = network.getFirstFireEvent ( );
107      
108       // take out the first input element
109      
110      firstInputEvent = network.getFirstInputEvent ( );
111      
112      // take out the first poisson input element
113      
114      pInputEvent = network.getFirstPInputEvent ( );
115      
116      final DiscreteEvent  discreteEvent = discreteEventQueue.peek ( );
117
118      double
119        fireTime,
120        inputTime,
121        pInputTime,
122        discreteEventTime;
123
124      // System.out.println("running...");
125      
126      if ( firstFireEvent != null )
127      {
128        fireTime = firstFireEvent.time;
129      }
130      else
131      {
132        fireTime = Double.MAX_VALUE;
133      }
134
135      if ( firstInputEvent != null )
136      {
137        inputTime = firstInputEvent.time;
138      }
139      else
140      {
141        inputTime = Double.MAX_VALUE;
142      }
143
144      if ( pInputEvent != null )
145      {
146        pInputTime = pInputEvent.time;
147      }
148      else
149      {
150        pInputTime = Double.MAX_VALUE;
151      }
152      
153      if ( discreteEvent != null )
154      {
155        discreteEventTime = discreteEvent.getTime ( );
156      }
157      else
158      {
159        discreteEventTime = Double.MAX_VALUE;
160      }
161
162      if ( fireTime < inputTime )
163      {
164        type = PRun.FIRE_EVENT;
165        
166        minTime = fireTime;
167      }
168      else
169      {
170        type = PRun.INPUT_EVENT;
171        
172        minTime = inputTime;
173      }
174
175      if ( minTime > pInputTime )
176      {
177        type = PRun.POI_EVENT;
178        
179        minTime = pInputTime;
180      }
181      
182      if ( minTime > discreteEventTime )
183      {
184        type = PRun.DISCRETE_EVENT;
185        
186        minTime = discreteEventTime;
187        
188        return true;
189      }
190
191      /*
192      if(poiInput==null && firstInput==null && firstFire==null)
193      {
194        type=PRun.RESET;
195        network.p.println("network reset");
196        return true;
197      }
198      */
199
200      // if(network.rootTime+network.minDelay>network.endOfTrial)
201      // {
202      //   return true;
203      // }
204      
205      if ( network.info.numTasks == 1
206        && network.experiment.recorder.intraEle.size ( ) == 0 )
207      {
208        network.spikeState = true;
209        
210        return true;
211      }
212      
213      /*  test it
214      if(network.rootTime+network.minDelay<minTime)
215      {
216        // network.p.println(
217        //   "root time"+network.rootTime+" systime:"+minTime);
218       
219        return false;
220      }
221      */
222      
223      /*
224      if ( type == PRun.FIRE_EVENT
225        && firstFire.index == network.neurons.length + network.base - 1
226        && minTime > network.endOfTrial
227        && network.rootTime + network.minDelay/2.0 < network.endOfTrial)
228      {
229        return false;
230      }
231      */
232
233      if ( type == PRun.FIRE_EVENT
234        && firstFireEvent.index
235          == network.neurons.length + network.base - 1 )
236      {
237        // tick neuron
238        
239        network.spikeState = true;
240        
241        for ( int  iter = 0; iter < network.info.numTasks; iter++ )
242        {       
243          if ( iter != network.info.idIndex
244            && network.received [ iter ] == 0 )
245          {
246            network.spikeState = false;
247          }
248        }
249
250        // network.p.println("state "+ network.spikeState);
251        //
252        // for(int it=0;it<network.info.numTasks;it++)
253        // {       
254        //   if(it!=network.info.idIndex && !network.received[it].empty())              
255        //   {                                                                  
256        //     network.p.println(
257        //       "host:"+it+" value"+(Double)(network.received[it].peek()));
258        //   }
259        // }
260
261        if ( !network.spikeState )
262        {
263          // if(network.rootTime>network.endOfTrial)return true;
264          // network.p.println(
265          //   "tick neuron stuck at "
266          //   + minTime + "root Time"+network.rootTime);
267          // network.p.println( "type "+type+" spkikeState"
268          //   +network.spikeState + " neuron id: "+ firstFire.index);
269          // for(int iter=0;iter<network.info.numTasks;iter++)
270          // {
271          //   if(iter!=network.info.idIndex)
272          //   {
273          //     network.p.print(
274          //       "host"+iter+" "+network.received[iter].empty());
275          //   }
276          // }
277          
278          return false;
279        }
280        
281        // network.p.println("tick neuron run through "+minTime);
282        
283        for ( int  iter = 0; iter < network.info.numTasks; iter++ )
284        {
285          if ( iter != network.info.idIndex && !network.trialDone )
286          {
287            // network.p.println("pop"+iter);
288            
289            ( network.received [ iter ] )--;
290          }
291        }
292        
293        network.spikeState = false;
294        
295        if ( network.info.numTasks == 1 )
296        {
297          network.spikeState = true;
298        }
299        
300        return true;
301      }
302      
303      return true;
304    }
305
306    @Override
307    public void  run ( )
308    ////////////////////////////////////////////////////////////////////////
309    {
310      try
311      {
312        // Thread.sleep(100);
313        
314        // double tmp_firetime;
315        
316        // boolean isFire;
317        
318        final Seed  seed = network.seed;
319        
320        network.p.println ( "seed " + seed.seed );
321        
322        // network.saveSeed = network.idum.seed;
323        // synchronized (synLock)
324        // {
325
326        while ( !network.stop )
327        {
328          // network.info.jpvm.pvm_barrier(
329          //   network.info.parent.getHost(),
330          //   network.subExpId*network.exp.subExp.length+network.trialId,
331          //   network.info.numTasks);
332
333          // synchronized (synLock)
334          // {
335          //   network.p.println("trial wait");
336          //
337          //   synLock.wait(); //synchronizing lock
338          //
339          //   network.p.println("begin");
340          // }
341          //
342          // lock.wait();
343          //
344          // synchronized(synLock)
345          // {
346          //   network.p.println(" wait begin");
347          //   network.p.flush();
348          //   network.idum.seed = network.saveSeed;
349          //   network.p.println("Run over seed "+network.idum.seed);
350          //
351          //   while(!network.startSig)
352          //   {
353          //     jpvmBuffer buff = new jpvmBuffer();
354          //
355          //     network.info.jpvm.pvm_send(
356          //       buff,
357          //       network.info.parent,
358          //       NetMessageTag.syncTrialHost); //send to trialHost
359          
360          barrier.await ( );
361          
362          //   }
363          //
364          //   network.startSig=false;
365          //
366          //   network.p.println("Run start seed "+network.idum.seed);
367          //
368          //   network.p.println(" wait end");
369          //
370          //   network.p.flush();
371          // }
372
373
374          synchronized (lock)
375          {
376            // network.p.print("trial startting");
377            // network.p.flush();
378            // jpvmBuffer buf = new jpvmBuffer();
379            // buf.pack(network.recordBuff);
380            // network.info.jpvm.pvm_send(
381            //   buf,
382            //   network.info.parent,
383            //   NetMessageTag.trialDone); 
384            // network.p.print(
385            //   "after wait "+"E"+network.subExpId+" T"+network.trialId);
386            // network.p.flush();
387            // System.out.println("out");
388            // network.p.println("before init");
389            
390            network.init ( );
391            
392            // Thread.sleep(100);
393            // network.p.println("after init");
394            // network.p.flush();
395
396            network.rootTime = 0.0;
397
398            network.recordBuff.buff.clear ( );
399            
400            network.intraRecBuffers.init ( );
401            
402            // network.p.println("after init seed "+network.idum.seed);
403            // network.p.println("all initialized");
404
405            // network.p.println(
406            //   "ini done and trialDone is "+ network.trialDone);
407            // network.p.flush();
408            
409            /*
410            network.rootTime=0.0;
411            network.fireQueue.clear();
412            network.inputQueue.clear();
413            network.poissonQueue.clear();
414            network.init();
415            */
416            //          
417
418            // lock.notify();
419
420
421            // network.p.println("Communication is running");
422            
423            while ( !network.trialDone )
424            {
425              // network.p.println("Communication inside");
426
427              // while ( !communicationAllowed(
428              //   firstFire=( ((tmpFire=network.fireQueue.firstItem())
429              //   !=null) ?
430              //   new FireEvent(tmpFire.index, tmpFire.time) : null )))
431              
432              while ( !runAllowed ( ) && !network.trialDone )
433              {
434                // network.p.println("lock at"+minTime );
435                // network.p.flush();
436                // System.out.println("lock at"+minTime);
437
438                lock.wait ( 1000 );
439                
440                if ( network.trialDone )
441                {
442                  break;
443                }
444              }
445
446              if ( !network.trialDone )
447              {
448                // network.p.println("process "+type );
449                // network.p.flush();
450
451                switch ( type ) 
452                {
453                  case PRun.DISCRETE_EVENT:
454                    
455                    DiscreteEventQueueLib.process ( discreteEventQueue );
456                    
457                    break;
458                    
459                  case PRun.FIRE_EVENT:
460                    
461                    // network.p.println("fire event process "+minTime );
462                    
463                    network.pFireProcess ( firstFireEvent );
464                    
465                    break;
466                    
467                  case PRun.INPUT_EVENT:
468                    
469                    // network.p.println("input event process "+minTime );
470                    
471                    network.pInputProcess ( firstInputEvent );
472                    
473                    break;
474                    
475                  case PRun.POI_EVENT:
476                    
477                    //network.p.println("Poisson event process "+minTime );
478                    
479                    network.pPoissonProcess ( pInputEvent );
480                    
481                    break;
482                    
483                  default:
484                    
485                    break;
486                }
487              }
488            } // END: while
489          }
490        } // END: while
491      }
492      catch ( jpvmException  exx )
493      {
494        try
495        {
496          final FileOutputStream  out = new FileOutputStream (
497            "log/"+network.info.jpvm.pvm_mytid().getHost()
498              + "error"+network.info.idIndex+".txt" );
499          
500          final PrintStream  p = new  PrintStream ( out );
501          
502          exx.printStackTrace ( p );
503
504          p.close ( );
505          
506          // System.exit(0);
507        }
508        catch ( Exception  ex )
509        {
510          //
511        }
512      }
513      catch(InterruptedException ex) 
514      {
515        //TODO: Add Exception handler here
516        
517        ex.printStackTrace();
518      }
519      catch(Exception a)
520      {
521        a.printStackTrace ( );
522        
523        try
524        {
525          FileOutputStream  out = new FileOutputStream (
526            "log/"+network.info.jpvm.pvm_mytid().getHost()
527            +"error"+network.info.idIndex+".txt");
528          
529          final PrintStream  printStream = new PrintStream ( out );
530          
531          a.printStackTrace(printStream);
532          
533          printStream.println("input from neuron");
534          
535          printStream.println(firstInputEvent);
536          
537          printStream.println("input from external source");
538          
539          printStream.println(pInputEvent);
540          
541          printStream.close();
542          
543          System.exit(0);
544        }
545        catch(Exception ex)
546        {
547          ex.printStackTrace ( );
548        }
549      }
550    }
551    
552    ////////////////////////////////////////////////////////////////////////
553    ////////////////////////////////////////////////////////////////////////
554    }