Skip to content

Commit 41d06f0

Browse files
committed
Add ChainedThreadPoolExecutor: tasks marked with the same owner will be
run one by one, tasks with different owners will be run in different threads Fixing bug that HttpRequest has the wrong response code.
1 parent df98c14 commit 41d06f0

File tree

4 files changed

+333
-2
lines changed

4 files changed

+333
-2
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package net.sf.j2s.ajax;
2+
3+
import java.util.LinkedList;
4+
import java.util.Queue;
5+
6+
public class ChainedRunnable implements Runnable {
7+
8+
private Runnable task;
9+
10+
private ChainedRunnable next;
11+
12+
private Object owner;
13+
14+
private volatile boolean done;
15+
16+
public ChainedRunnable(Object owner, Runnable task) {
17+
super();
18+
this.owner = owner;
19+
this.task = task;
20+
done = false;
21+
}
22+
23+
public void runTask() {
24+
if (task != null) {
25+
task.run();
26+
}
27+
}
28+
29+
public void run() {
30+
runTask();
31+
// May run into stack overflow!
32+
//if (next != null) {
33+
// next.run();
34+
//}
35+
ChainedRunnable n = next;
36+
Queue<ChainedRunnable> queue = new LinkedList<ChainedRunnable>();
37+
while (n != null) {
38+
queue.add(n);
39+
n.runTask();
40+
n = n.next;
41+
}
42+
// mark task done one by one
43+
while (!queue.isEmpty()) {
44+
ChainedRunnable r = queue.poll();
45+
if (r != null) r.done = true;
46+
}
47+
done = true;
48+
}
49+
50+
public boolean isDone() {
51+
return done;
52+
}
53+
54+
public ChainedRunnable getNext() {
55+
return next;
56+
}
57+
58+
public Runnable getTask() {
59+
return task;
60+
}
61+
62+
public void addNext(ChainedRunnable task) {
63+
ChainedRunnable oThis = this;
64+
do {
65+
if (oThis.next == null) {
66+
oThis.next = task;
67+
return;
68+
} else {
69+
oThis = oThis.next;
70+
}
71+
} while (true);
72+
}
73+
74+
public Object getOwner() {
75+
return owner;
76+
}
77+
78+
}
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package net.sf.j2s.ajax;
2+
3+
import java.lang.reflect.Field;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
import java.util.concurrent.RejectedExecutionHandler;
6+
import java.util.concurrent.ThreadFactory;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.locks.*;
9+
import java.util.concurrent.*;
10+
import java.util.*;
11+
12+
public class ChainedThreadPoolExecutor extends SimpleThreadPoolExecutor {
13+
14+
private ReentrantLock internalMainLock = null;
15+
16+
private Set<Runnable> internalWorkers = null;
17+
18+
private Field fieldWorkerThread = null;
19+
20+
private Field fieldWorkerFirstTask = null;
21+
22+
private Map<Runnable, ChainedRunnable> runningTasks = new ConcurrentHashMap<Runnable, ChainedRunnable>();
23+
24+
private Map<Object, ChainedRunnable> lastTasks = new ConcurrentHashMap<Object, ChainedRunnable>();
25+
26+
@SuppressWarnings("unchecked")
27+
private void fetchInternalFields() {
28+
try {
29+
Field fieldWorkers = ThreadPoolExecutor.class.getDeclaredField("workers");
30+
if (fieldWorkers != null) {
31+
fieldWorkers.setAccessible(true);
32+
Object value = fieldWorkers.get(this);
33+
if (value instanceof Set) {
34+
internalWorkers = (Set<Runnable>) value;
35+
}
36+
}
37+
Field fieldMainLock = ThreadPoolExecutor.class.getDeclaredField("mainLock");
38+
if (fieldMainLock != null) {
39+
fieldMainLock.setAccessible(true);
40+
Object value = fieldMainLock.get(this);
41+
if (value instanceof ReentrantLock) {
42+
internalMainLock = (ReentrantLock) value;
43+
}
44+
}
45+
} catch (SecurityException e) {
46+
e.printStackTrace();
47+
} catch (IllegalArgumentException e) {
48+
e.printStackTrace();
49+
} catch (NoSuchFieldException e) {
50+
e.printStackTrace();
51+
} catch (IllegalAccessException e) {
52+
e.printStackTrace();
53+
}
54+
}
55+
56+
public ChainedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int idlePoolSize, long keepAliveTime,
57+
TimeUnit unit, int queueSize, RejectedExecutionHandler handler) {
58+
super(corePoolSize, maximumPoolSize, idlePoolSize, keepAliveTime, unit, queueSize, handler);
59+
fetchInternalFields();
60+
}
61+
62+
public ChainedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int idlePoolSize, long keepAliveTime,
63+
TimeUnit unit, int queueSize, String poolName) {
64+
super(corePoolSize, maximumPoolSize, idlePoolSize, keepAliveTime, unit, queueSize, poolName);
65+
fetchInternalFields();
66+
}
67+
68+
public ChainedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int idlePoolSize, long keepAliveTime,
69+
TimeUnit unit, int queueSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
70+
super(corePoolSize, maximumPoolSize, idlePoolSize, keepAliveTime, unit, queueSize, threadFactory, handler);
71+
fetchInternalFields();
72+
}
73+
74+
public ChainedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int idlePoolSize, long keepAliveTime,
75+
TimeUnit unit, int queueSize, ThreadFactory threadFactory) {
76+
super(corePoolSize, maximumPoolSize, idlePoolSize, keepAliveTime, unit, queueSize, threadFactory);
77+
fetchInternalFields();
78+
}
79+
80+
public ChainedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int idlePoolSize, long keepAliveTime,
81+
TimeUnit unit, int queueSize) {
82+
super(corePoolSize, maximumPoolSize, idlePoolSize, keepAliveTime, unit, queueSize);
83+
fetchInternalFields();
84+
}
85+
86+
87+
private boolean addIfInQueue(ChainedRunnable task) {
88+
Object owner = task.getOwner();
89+
final ReentrantLock mainLock = this.internalMainLock;
90+
mainLock.lock();
91+
try {
92+
//System.out.println("Worker size = " + internalWorkers.size());
93+
for (Iterator<Runnable> itr = internalWorkers.iterator(); itr.hasNext();) {
94+
Runnable worker = (Runnable) itr.next();
95+
ChainedRunnable runningTask = runningTasks.get(worker);
96+
if (runningTask != null && runningTask.getOwner() == owner) {
97+
//System.out.println("Appending task " + task + " to running task " + runningTask + " on worker " + worker);
98+
runningTask.addNext(task);
99+
lastTasks.put(owner, task);
100+
return true;
101+
}
102+
ChainedRunnable firstTask = null; // worker.firstTask;
103+
try {
104+
if (fieldWorkerFirstTask == null) {
105+
Field f = worker.getClass().getDeclaredField("firstTask");
106+
if (f != null) {
107+
f.setAccessible(true);
108+
}
109+
fieldWorkerFirstTask = f;
110+
}
111+
if (fieldWorkerFirstTask != null) {
112+
Object value = fieldWorkerFirstTask.get(worker);
113+
if (value instanceof ChainedRunnable) {
114+
firstTask = (ChainedRunnable) value;
115+
}
116+
//System.out.println("Checking firstTask " + value + " for worker " + worker);
117+
}
118+
} catch (SecurityException e) {
119+
e.printStackTrace();
120+
} catch (IllegalArgumentException e) {
121+
e.printStackTrace();
122+
} catch (NoSuchFieldException e) {
123+
e.printStackTrace();
124+
} catch (IllegalAccessException e) {
125+
e.printStackTrace();
126+
}
127+
if (firstTask != null && firstTask.getOwner() == owner) {
128+
//System.out.println("Appending task " + task + " to first task " + runningTask + " on worker " + worker);
129+
firstTask.addNext(task);
130+
lastTasks.put(owner, task);
131+
return true;
132+
}
133+
}
134+
for (Iterator<Runnable> itr = getQueue().iterator(); itr.hasNext();) {
135+
Runnable next = itr.next();
136+
if (next instanceof ChainedRunnable) {
137+
ChainedRunnable r = (ChainedRunnable) next;
138+
if (r.getOwner() == owner) {
139+
//System.out.println("Appending task " + task + " to queued task " + r);
140+
r.addNext(task);
141+
lastTasks.put(owner, task);
142+
return true;
143+
}
144+
}
145+
}
146+
ChainedRunnable last = lastTasks.get(owner);
147+
if (last != null && !last.isDone()) {
148+
//System.out.println("Appending task " + task + " to last task " + last);
149+
last.addNext(task);
150+
lastTasks.put(owner, task);
151+
return true;
152+
}
153+
154+
//System.out.println("Not in queue, starting new worker for " + task);
155+
lastTasks.put(owner, task);
156+
return false;
157+
} finally {
158+
mainLock.unlock();
159+
}
160+
}
161+
162+
public void execute(Object owner, Runnable command) {
163+
ChainedRunnable task = new ChainedRunnable(owner, command);
164+
execute(task);
165+
}
166+
167+
@Override
168+
public void execute(Runnable command) {
169+
if (command == null)
170+
throw new NullPointerException();
171+
if (!(command instanceof ChainedRunnable)) {
172+
throw new RuntimeException("Not a chained runnable task");
173+
}
174+
ChainedRunnable chainCommand = (ChainedRunnable) command;
175+
if (addIfInQueue(chainCommand)) {
176+
return;
177+
}
178+
super.execute(command);
179+
}
180+
181+
@Override
182+
protected void afterExecute(Runnable r, Throwable t) {
183+
final ReentrantLock mainLock = this.internalMainLock;
184+
boolean removeError = false;
185+
mainLock.lock();
186+
try {
187+
for (Map.Entry<Runnable, ChainedRunnable> entry : runningTasks.entrySet()) {
188+
if (entry.getValue() == r) {
189+
runningTasks.remove(entry.getKey());
190+
break;
191+
}
192+
}
193+
if (r instanceof ChainedRunnable) {
194+
ChainedRunnable task = (ChainedRunnable) r;
195+
Object owner = task.getOwner();
196+
ChainedRunnable lastTask = lastTasks.get(owner);
197+
if (lastTask == task) {
198+
ChainedRunnable last = lastTasks.remove(owner);
199+
removeError = last != task;
200+
}
201+
}
202+
} finally {
203+
mainLock.unlock();
204+
}
205+
if (removeError) {
206+
System.out.println("Removed updated last task " + r);
207+
}
208+
super.afterExecute(r, t);
209+
}
210+
211+
@Override
212+
protected void beforeExecute(Thread t, Runnable r) {
213+
final ReentrantLock mainLock = this.internalMainLock;
214+
mainLock.lock();
215+
try {
216+
for (Runnable worker : internalWorkers) {
217+
try {
218+
if (fieldWorkerThread == null) {
219+
Field f = worker.getClass().getDeclaredField("thread");
220+
if (f != null) {
221+
f.setAccessible(true);
222+
}
223+
fieldWorkerThread = f;
224+
}
225+
if (fieldWorkerThread != null) {
226+
Object value = fieldWorkerThread.get(worker);
227+
if (t == value) {
228+
if (r instanceof ChainedRunnable) {
229+
runningTasks.put(worker, (ChainedRunnable) r);
230+
break;
231+
} // else dummy task
232+
}
233+
}
234+
} catch (SecurityException e) {
235+
e.printStackTrace();
236+
} catch (IllegalArgumentException e) {
237+
e.printStackTrace();
238+
} catch (NoSuchFieldException e) {
239+
e.printStackTrace();
240+
} catch (IllegalAccessException e) {
241+
e.printStackTrace();
242+
}
243+
}
244+
} finally {
245+
mainLock.unlock();
246+
}
247+
super.beforeExecute(t, r);
248+
}
249+
250+
}

sources/net.sf.j2s.ajax/ajaxcore/net/sf/j2s/ajax/HttpRequest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ public void setRequestHeader(String key, String value) {
337337
* @return String the all response header value.
338338
*/
339339
public String getAllResponseHeaders() {
340+
if (connection != null) return null;
340341
StringBuilder builder = new StringBuilder();
341342
int i = 1;
342343
while (true) {
@@ -362,6 +363,7 @@ public String getAllResponseHeaders() {
362363
* @return String the response header value.
363364
*/
364365
public String getResponseHeader(String key) {
366+
if (connection == null) return null;
365367
Map<String, List<String>> headerFields = connection.getHeaderFields();
366368
List<String> list = headerFields.get(key);
367369
if (list == null) {
@@ -549,6 +551,7 @@ private void request() {
549551
} catch (IOException e) {
550552
if (checkAbort()) return; // exception caused by abort action
551553
//e.printStackTrace();
554+
status = connection.getResponseCode();
552555
readyState = 4;
553556
if (onreadystatechange != null) {
554557
onreadystatechange.onLoaded();

sources/net.sf.j2s.ajax/ajaxrpc/net/sf/j2s/ajax/SimpleSerializable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@
4242
*/
4343
public class SimpleSerializable implements Cloneable {
4444

45-
public static SimpleSerializable UNKNOWN = new SimpleSerializable();
45+
public static final SimpleSerializable UNKNOWN = new SimpleSerializable();
4646

47-
public static SimpleSerializable ERROR = new SimpleSerializable(); // Used to indicate that format error!
47+
public static final SimpleSerializable ERROR = new SimpleSerializable(); // Used to indicate that format error!
4848

4949
@J2SIgnore
5050
public static SimpleFactory fallbackFactory = null;

0 commit comments

Comments
 (0)