001 package cnslab.cnsnetwork; 002 003 import java.io.*; 004 005 import jpvm.*; 006 import org.slf4j.Logger; 007 import org.slf4j.LoggerFactory; 008 009 /*********************************************************************** 010 * Trial Host class. 011 * 012 * It listens to the message from mainhost and nethosts; it synchronizes 013 * the Nethosts at the beginning and end of the trial. 014 * 015 * @version 016 * $Date: 2012-08-04 13:43:22 -0500 (Sat, 04 Aug 2012) $ 017 * $Rev: 104 $ 018 * $Author: croft $ 019 * @author 020 * Yi Dong 021 * @author 022 * David Wallace Croft 023 ***********************************************************************/ 024 public final class TrialHost 025 //////////////////////////////////////////////////////////////////////// 026 //////////////////////////////////////////////////////////////////////// 027 { 028 029 private static final Class<TrialHost> 030 CLASS = TrialHost.class; 031 032 private static final Logger 033 LOGGER = LoggerFactory.getLogger ( CLASS ); 034 035 // 036 037 private boolean 038 stopthis = true; 039 040 private double 041 rootTime, 042 tickTime; 043 044 private double [ ] 045 localTime; 046 047 private int 048 counting, 049 expId, 050 trialId = -1; 051 052 private JpvmInfo 053 info; 054 055 //////////////////////////////////////////////////////////////////////// 056 //////////////////////////////////////////////////////////////////////// 057 058 public static void main ( final String [ ] args ) 059 //////////////////////////////////////////////////////////////////////// 060 { 061 new TrialHost ( ).run ( ); 062 } 063 064 public void run ( ) 065 //////////////////////////////////////////////////////////////////////// 066 { 067 // Directing debug output to System.out since JPVM only prints to 068 // console messages directed to standard output, not standard error. 069 070// LoggingLib.enableDebugLogging ( System.out ); 071 072 LOGGER.trace ( "entering" ); 073 074 try 075 { 076 // int seedInt; 077 078 // Enroll in the parallel virtual machine... 079 080 info = new JpvmInfo ( ); 081 082 info.jpvm = new jpvmEnvironment ( ); 083 084 info.myJpvmTaskId = info.jpvm.pvm_mytid ( ); 085 086 // Get my parent's task id... 087 088 info.parentJpvmTaskId = info.jpvm.pvm_parent ( ); 089 090 // receive information about its peers 091 092 jpvmMessage m = info.jpvm.pvm_recv ( NetMessageTag.sendTids ); 093 094 LOGGER.trace ( "Received message" ); 095 096 // FileOutputStream outt = new FileOutputStream( 097 // "log/preinfo"+iter_int+".txt"); 098 // 099 // PrintStream p = new PrintStream(outt); 100 101 int peerNum = m.buffer.upkint ( ); // get # of peers 102 103 final jpvmTaskId [ ] peerTids = new jpvmTaskId [ peerNum ]; 104 105 m.buffer.unpack ( peerTids, peerNum, 1 ); 106 107 int iter_int; 108 109 for ( iter_int = 0; iter_int < peerNum; iter_int++ ) 110 { 111 if ( info.myJpvmTaskId.equals ( peerTids [ iter_int ] ) ) 112 { 113 break; 114 } 115 } 116 117 info.idIndex = iter_int; 118 119 tickTime = m.buffer.upkdouble ( ) / 2.0; 120 121 m = info.jpvm.pvm_recv ( NetMessageTag.sendTids2 ); 122 123 info.numTasks = m.buffer.upkint(); 124 125 info.tids = new jpvmTaskId [ info.numTasks ]; 126 127 m.buffer.unpack ( 128 info.tids, 129 info.numTasks, 130 1 ); 131 132 // 133 134 localTime = new double [ info.numTasks ]; 135 136 // FileOutputStream outt = new FileOutputStream( 137 // "log/trialHost"+info.idIndex+".txt"); 138 // 139 // PrintStream p = new PrintStream(outt); 140 141 // Barrier Sync 142 143 jpvmBuffer buff = new jpvmBuffer ( ); 144 145 // send out ready info 146 147 buff.pack ( "TrialHost " + info.jpvm.pvm_mytid ( ).toString ( ) 148 +" is ready to go" ); 149 150 info.jpvm.pvm_send ( 151 buff, 152 info.parentJpvmTaskId, 153 NetMessageTag.readySig ); 154 155 // m = info.jpvm.pvm_recv(NetMessageTag.readySig); 156 157 // Barrier Sync 158 159 buff = new jpvmBuffer ( ); 160 161 // send the id who is available 162 163 buff.pack ( info.idIndex ); 164 165 // p.println("T"+trialId+" is done"); 166 167 info.jpvm.pvm_send ( 168 buff, 169 info.parentJpvmTaskId, 170 NetMessageTag.trialReady ); 171 172 while ( stopthis ) 173 { 174 // get info from root to start new trials 175 176 m = info.jpvm.pvm_recv ( ); 177 178 switch ( m.messageTag ) 179 { 180 case NetMessageTag.checkTime: 181 182 processCheckTime ( ); 183 184 break; 185 186 case NetMessageTag.oneTrial: 187 188 processOneTrial ( m ); 189 190 break; 191 192 case NetMessageTag.sendTick: 193 194 processSendTick ( m ); 195 196 break; 197 198 case NetMessageTag.stopSig: 199 200 processStopSig ( ); 201 202 break; 203 204 case NetMessageTag.tempStopSig: 205 206 processTempStopSig ( ); 207 208 break; 209 210 case NetMessageTag.trialDone: 211 212 processTrialDone ( ); 213 214 break; 215 } 216 } 217 218 // p.close(); 219 220 info.jpvm.pvm_exit ( ); 221 } 222 catch ( final jpvmException jpe ) 223 { 224 System.out.println ( "Error - jpvm exception" ); 225 226 try 227 { 228 FileOutputStream out = new FileOutputStream("log/myfile.txt"); 229 230 PrintStream p = new PrintStream(out); 231 232 p.println(jpe); 233 234 p.close(); 235 } 236 catch(Exception ex) 237 { 238 // 239 } 240 } 241 catch (Exception a) 242 { 243 try 244 { 245 FileOutputStream out = new FileOutputStream("log/myfile.txt"); 246 247 PrintStream p = new PrintStream(out); 248 249 a.printStackTrace(p); 250 251 p.close(); 252 } 253 catch(Exception ex) 254 { 255 // 256 } 257 } 258 259 LOGGER.trace ( "exiting" ); 260 } 261 262 //////////////////////////////////////////////////////////////////////// 263 // private methods 264 //////////////////////////////////////////////////////////////////////// 265 266 private void processCheckTime ( ) 267 throws jpvmException 268 //////////////////////////////////////////////////////////////////////// 269 { 270 final jpvmBuffer buff = new jpvmBuffer ( ); 271 272 // send the available Host id 273 274 buff.pack ( info.idIndex ); 275 276 buff.pack ( expId ); 277 278 buff.pack ( rootTime ); 279 280 buff.pack ( trialId ); 281 282 info.jpvm.pvm_send ( 283 buff, 284 info.parentJpvmTaskId, 285 NetMessageTag.checkTime ); 286 } 287 288 private void processOneTrial ( final jpvmMessage m ) 289 throws jpvmException 290 //////////////////////////////////////////////////////////////////////// 291 { 292 trialId = m.buffer.upkint ( ); 293 294 expId = m.buffer.upkint ( ); 295 296 final jpvmBuffer buff = new jpvmBuffer ( ); 297 298 // buff.pack(pas.exp.subExp[expId].trialLength); 299 300 buff.pack ( trialId ); 301 302 buff.pack ( expId ); 303 304 // p.println("E"+expId+" T"+trialId); 305 // System.out.println("E"+expId+" T"+trialId); 306 307 /* 308 jpvmMessage backM; 309 for(int i=0;i<info.numTasks;i++) 310 { 311 // get sig from sub nethosts for synchronization 312 backM = info.jpvm.pvm_recv(NetMessageTag.syncTrialHost); 313 } 314 */ 315 316 //Thread.sleep(500); 317 318 // wait up hosts for new trials 319 320 info.jpvm.pvm_mcast ( 321 buff, 322 info.tids, 323 info.numTasks, 324 NetMessageTag.trialDone ); 325 } 326 327 private void processSendTick ( final jpvmMessage m ) 328 throws jpvmException 329 //////////////////////////////////////////////////////////////////////// 330 { 331 final double sysTime = m.buffer.upkdouble ( ); 332 333 final int hostId = m.buffer.upkint ( ); 334 335 boolean flag = true; 336 337 localTime [ hostId ] = sysTime; 338 339 for ( int i = 0; i < localTime.length; i++ ) 340 { 341 if ( localTime [ i ] - tickTime < rootTime ) 342 { 343 // if one of the host not satisfied, we can't broadcast 344 345 flag = false; 346 } 347 } 348 349 if ( flag ) 350 { 351 rootTime += tickTime; 352 353 //buff = new jpvmBuffer(); 354 //buff.pack(rootTime); 355 ////broadcast the root time 356 //info.jpvm.pvm_mcast( 357 // buff, 358 // info.tids, 359 // info.numTasks, 360 // NetMessageTag.syncRoot); 361 } 362 } 363 364 private void processStopSig ( ) 365 throws jpvmException 366 //////////////////////////////////////////////////////////////////////// 367 { 368 final jpvmBuffer buff = new jpvmBuffer ( ); 369 370 // broadcast the root time 371 372 info.jpvm.pvm_mcast ( 373 buff, 374 info.tids, 375 info.numTasks, 376 NetMessageTag.stopSig ); 377 378 stopthis = false; 379 } 380 381 private void processTempStopSig ( ) 382 throws jpvmException 383 //////////////////////////////////////////////////////////////////////// 384 { 385 final jpvmBuffer buff = new jpvmBuffer ( ); 386 387 // p.println("tempStopSig is received"); 388 // p.flush(); 389 390 //broadcast the root time 391 392 info.jpvm.pvm_mcast ( 393 buff, 394 info.tids, 395 info.numTasks, 396 NetMessageTag.tempStopSig ); 397 398 // p.println("sending to"+info.numTasks 399 // + " tids"+info.tids[0]+" "+info.tids[1]); 400 // p.flush(); 401 } 402 403 private void processTrialDone ( ) 404 throws jpvmException 405 //////////////////////////////////////////////////////////////////////// 406 { 407 counting++; 408 409 // new trial should be begin 410 411 if ( counting == info.numTasks ) 412 { 413 // reset the counting 414 415 counting = 0; 416 417 // reset the root time; 418 419 rootTime = 0.0; 420 421 for ( int i = 0; i < localTime.length; i++ ) 422 { 423 //reset the hosts time 424 425 localTime [ i ] = 0.0; 426 } 427 428 final jpvmBuffer buff = new jpvmBuffer ( ); 429 430 // send the id who is available 431 432 buff.pack ( info.idIndex ); 433 434 // p.println("T"+trialId+" is done"); 435 436 info.jpvm.pvm_send ( 437 buff, 438 info.parentJpvmTaskId, 439 NetMessageTag.trialReady ); 440 } 441 } 442 443 //////////////////////////////////////////////////////////////////////// 444 //////////////////////////////////////////////////////////////////////// 445 }