-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSimpleCommManager.java
More file actions
148 lines (124 loc) · 3.43 KB
/
SimpleCommManager.java
File metadata and controls
148 lines (124 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
*
* Implements a simple Communication Manager
* with two apis: read and write.
*
*/
public class SimpleCommManager {
private final Random RAND = new Random();
private final int BOUND = 50;
public int read() {
return RAND.nextInt(BOUND);
}
public void write(int value) {
System.out.println("write "+value);
}
public static class ReadThread extends Thread {
volatile boolean run = true;
SimpleCommManager mgr;
LinkedBlockingQueue<Integer> sendqueue;
LinkedBlockingQueue<Integer> recvqueue;
public ReadThread(SimpleCommManager mgr, LinkedBlockingQueue<Integer> sendqueue,LinkedBlockingQueue<Integer> recvqueue) {
this.mgr=mgr;
this.sendqueue=sendqueue;
this.recvqueue=recvqueue;
}
public void run() {
while(run) {
int value = mgr.read();
if(value >= 10 && value <= 20)
sendqueue.offer(value);
else recvqueue.offer(value);
}
}
}
public static class WriteThread extends Thread {
public volatile boolean run = true;
SimpleCommManager mgr;
LinkedBlockingQueue<Integer> sendqueue;
public WriteThread(SimpleCommManager mgr, LinkedBlockingQueue<Integer> sendqueue) {
this.mgr=mgr;
this.sendqueue=sendqueue;
}
public void run() {
while(run) {
Integer value=null;
try {
value = sendqueue.poll(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(value!=null)
mgr.write(value);
}
}
}
public static class RunnerThread extends Thread {
volatile boolean run = true;
SimpleCommManager mgr;
LinkedBlockingQueue<Integer> sendqueue;
LinkedBlockingQueue<Integer> recvqueue;
public RunnerThread(SimpleCommManager mgr, LinkedBlockingQueue<Integer> sendqueue,LinkedBlockingQueue<Integer> recvqueue) {
this.mgr=mgr;
this.sendqueue=sendqueue;
this.recvqueue=recvqueue;
}
public void run() {
while(run) {
Integer value = null;
try {
value = recvqueue.poll(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(value == null) {
}else {
if(value < 10)
sendqueue.offer(value);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
SimpleCommManager mgr = new SimpleCommManager();
LinkedBlockingQueue<Integer> sendqueue = new LinkedBlockingQueue<>();
LinkedBlockingQueue<Integer> recvqueue = new LinkedBlockingQueue<>();
ReadThread rt = new ReadThread(mgr,sendqueue,recvqueue);
WriteThread wt = new WriteThread(mgr,sendqueue);
Thread WR = new Thread( rt );
Thread WS = new Thread( wt );
RunnerThread r = new RunnerThread(mgr,sendqueue,recvqueue);
Thread P = new Thread(r);
Thread GC = new Thread() {
volatile boolean run = true;
public void run() {
while(run) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("GC awake");
run=false;
}
rt.run=false;
r.run=false;
wt.run=false;
sendqueue.clear();
recvqueue.clear();
}
};
WR.start();
P.start();
WS.start();
GC.start();
GC.join();
System.out.println("size of recvqueue "+recvqueue.size()+": "+sendqueue.size());
}
}