001/* jpvmRecvConnection.java
002 *
003 * The jpvmRecvConnection class implements objects that represent
004 * connections to remote jpvm processes from which messages may be
005 * received.
006 *
007 * Adam J Ferrari
008 * Sun 05-26-1996
009 *
010 * Copyright (C) 1996  Adam J Ferrari
011 * 
012 * This library is free software; you can redistribute it and/or
013 * modify it under the terms of the GNU Library General Public
014 * License as published by the Free Software Foundation; either
015 * version 2 of the License, or (at your option) any later version.
016 * 
017 * This library is distributed in the hope that it will be useful,
018 * but WITHOUT ANY WARRANTY; without even the implied warranty of
019 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
020 * Library General Public License for more details.
021 * 
022 * You should have received a copy of the GNU Library General Public
023 * License along with this library; if not, write to the
024 * Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
025 * MA 02139, USA.
026 */
027
028package jpvm;
029
030import java.net.*;
031import java.io.*;
032import java.util.*;
033import java.util.concurrent.BrokenBarrierException;
034import java.util.concurrent.CyclicBarrier;
035
036public
037class jpvmRecvBarrier extends Thread {
038        private InputStream     instrm;
039        public OutputStream     out;
040
041//      public static Map<Integer,Integer> key; 
042        public static Map<Integer,CyclicBarrier> number; 
043//      public static Map<Integer,Integer> totalNumber; 
044        public int tagId; //current tagId
045
046
047
048        public jpvmRecvBarrier(Socket sock) {
049                if(sock==null) return;
050//              if(key == null) key = new HashMap<Integer,Integer>();
051                if(number == null) number = new HashMap<Integer,CyclicBarrier>();
052//              if(totalNumber == null) totalNumber = new HashMap<Integer,Integer>();
053                try {
054                        instrm = sock.getInputStream();
055                        out = sock.getOutputStream();
056                        //System.out.println(sock.getLocalSocketAddress());     
057                        //System.out.println(sock.getRemoteSocketAddress());    
058//                      //System.out.println(instrm.read());
059                        /*
060
061                        if(!sock.isConnected())System.out.println("not connected");
062                        if(instrm ==null) System.out.println("empty sock input");
063                        BufferedInputStream intrm = new BufferedInputStream(instrm);
064                        if(intrm ==null) System.out.println("empty buffer input");
065
066                        strm = new DataInputStream(intrm);
067                        if(strm ==null) System.out.println("empty datainputstream input");
068                        */
069                        tagId = instrm.read(); //get tag id
070                        //System.out.println("receve tagid"+tagId);
071                        int total = instrm.read(); //get tag id
072                        //System.out.println("receve total"+total);
073                        /*
074                        if(!key.containsKey(tagId))
075                        {
076                         key.put(tagId,new Integer(tagId));
077                        }
078                        */
079                        CyclicBarrier cyc = number.get(tagId);
080                        if(cyc==null) 
081                        {
082                                number.put(tagId,new CyclicBarrier(total));
083                        }
084                        /*
085                        else
086                        {
087                                count = count+1;
088                                number.put(key.get(tagId),count);
089                        }
090                        */
091                        //totalNumber.put(key.get(tagId),total);
092                }
093                catch (IOException ioe) {
094                        //strm  = null;
095                        ioe.printStackTrace();
096                }
097//              if(strm == null) return;
098        }
099
100        public void run() {
101                //System.out.println("tag id"+tagId);
102                try{
103
104                        //System.out.println("now"+number.get(key.get(tagId))+" fu:"+totalNumber.get(key.get(tagId)));
105                        /*
106                        synchronized(key.get(tagId))
107                        {
108                                if(number.get(key.get(tagId)) == totalNumber.get(key.get(tagId))) key.get(tagId).notifyAll();
109                                while( number.get(key.get(tagId)) != totalNumber.get(key.get(tagId)) )
110                                {
111                                        key.get(tagId).wait();
112                                }
113
114                        }
115                        */
116                        number.get(tagId).await();
117
118                        out.write(0); // read the last element to un block the sender;
119                        out.flush();
120                        //System.out.println("read the last one");
121                        out.close();
122                        instrm.close();
123                }
124                catch(Exception e)
125                {
126                        e.printStackTrace();
127                }
128                /*
129                synchronized(key.get(tagId))
130                {
131                        Integer count = number.get(key.get(tagId));
132                        count=0;
133                        number.put(key.get(tagId),count);
134                }
135                */
136                number.remove(tagId);
137        }
138
139};