001/* jpvmRecvConnection.java 002 * 003 * The jpvmRecvConnection class implements objects that represent 004 * connections to remote jpvm processes from which messages may be 005 * received. 006 * 007 * Adam J Ferrari 008 * Sun 05-26-1996 009 * 010 * Copyright (C) 1996 Adam J Ferrari 011 * 012 * This library is free software; you can redistribute it and/or 013 * modify it under the terms of the GNU Library General Public 014 * License as published by the Free Software Foundation; either 015 * version 2 of the License, or (at your option) any later version. 016 * 017 * This library is distributed in the hope that it will be useful, 018 * but WITHOUT ANY WARRANTY; without even the implied warranty of 019 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 020 * Library General Public License for more details. 021 * 022 * You should have received a copy of the GNU Library General Public 023 * License along with this library; if not, write to the 024 * Free Software Foundation, Inc., 675 Mass Ave, Cambridge, 025 * MA 02139, USA. 026 */ 027 028package jpvm; 029 030import java.net.*; 031import java.io.*; 032import java.util.*; 033import java.util.concurrent.BrokenBarrierException; 034import java.util.concurrent.CyclicBarrier; 035 036public 037class jpvmRecvBarrier extends Thread { 038 private InputStream instrm; 039 public OutputStream out; 040 041// public static Map<Integer,Integer> key; 042 public static Map<Integer,CyclicBarrier> number; 043// public static Map<Integer,Integer> totalNumber; 044 public int tagId; //current tagId 045 046 047 048 public jpvmRecvBarrier(Socket sock) { 049 if(sock==null) return; 050// if(key == null) key = new HashMap<Integer,Integer>(); 051 if(number == null) number = new HashMap<Integer,CyclicBarrier>(); 052// if(totalNumber == null) totalNumber = new HashMap<Integer,Integer>(); 053 try { 054 instrm = sock.getInputStream(); 055 out = sock.getOutputStream(); 056 //System.out.println(sock.getLocalSocketAddress()); 057 //System.out.println(sock.getRemoteSocketAddress()); 058// //System.out.println(instrm.read()); 059 /* 060 061 if(!sock.isConnected())System.out.println("not connected"); 062 if(instrm ==null) System.out.println("empty sock input"); 063 BufferedInputStream intrm = new BufferedInputStream(instrm); 064 if(intrm ==null) System.out.println("empty buffer input"); 065 066 strm = new DataInputStream(intrm); 067 if(strm ==null) System.out.println("empty datainputstream input"); 068 */ 069 tagId = instrm.read(); //get tag id 070 //System.out.println("receve tagid"+tagId); 071 int total = instrm.read(); //get tag id 072 //System.out.println("receve total"+total); 073 /* 074 if(!key.containsKey(tagId)) 075 { 076 key.put(tagId,new Integer(tagId)); 077 } 078 */ 079 CyclicBarrier cyc = number.get(tagId); 080 if(cyc==null) 081 { 082 number.put(tagId,new CyclicBarrier(total)); 083 } 084 /* 085 else 086 { 087 count = count+1; 088 number.put(key.get(tagId),count); 089 } 090 */ 091 //totalNumber.put(key.get(tagId),total); 092 } 093 catch (IOException ioe) { 094 //strm = null; 095 ioe.printStackTrace(); 096 } 097// if(strm == null) return; 098 } 099 100 public void run() { 101 //System.out.println("tag id"+tagId); 102 try{ 103 104 //System.out.println("now"+number.get(key.get(tagId))+" fu:"+totalNumber.get(key.get(tagId))); 105 /* 106 synchronized(key.get(tagId)) 107 { 108 if(number.get(key.get(tagId)) == totalNumber.get(key.get(tagId))) key.get(tagId).notifyAll(); 109 while( number.get(key.get(tagId)) != totalNumber.get(key.get(tagId)) ) 110 { 111 key.get(tagId).wait(); 112 } 113 114 } 115 */ 116 number.get(tagId).await(); 117 118 out.write(0); // read the last element to un block the sender; 119 out.flush(); 120 //System.out.println("read the last one"); 121 out.close(); 122 instrm.close(); 123 } 124 catch(Exception e) 125 { 126 e.printStackTrace(); 127 } 128 /* 129 synchronized(key.get(tagId)) 130 { 131 Integer count = number.get(key.get(tagId)); 132 count=0; 133 number.put(key.get(tagId),count); 134 } 135 */ 136 number.remove(tagId); 137 } 138 139};