1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.jumpmind.symmetric.web;
21
22 import java.io.IOException;
23
24 import javax.servlet.FilterChain;
25 import javax.servlet.ServletException;
26 import javax.servlet.ServletRequest;
27 import javax.servlet.ServletResponse;
28 import javax.servlet.http.HttpServletResponse;
29
30 import org.jumpmind.symmetric.common.Constants;
31 import org.jumpmind.symmetric.common.ParameterConstants;
32 import org.jumpmind.symmetric.service.IParameterService;
33 import org.jumpmind.symmetric.test.AbstractDatabaseTest;
34 import org.jumpmind.symmetric.transport.IConcurrentConnectionManager;
35 import org.junit.Assert;
36 import org.junit.Test;
37 import org.springframework.mock.web.MockHttpServletRequest;
38 import org.springframework.mock.web.MockHttpServletResponse;
39
40 public class NodeConcurrencyFilterTest extends AbstractDatabaseTest {
41
42 public NodeConcurrencyFilterTest() throws Exception {
43 super();
44 }
45
46 public NodeConcurrencyFilterTest(String dbName) {
47 super(dbName);
48 }
49
50 @Test(timeout = 60000)
51 public void testPullConcurrency() throws Exception {
52 IParameterService parameterService = getParameterService();
53 parameterService.saveParameter(ParameterConstants.CONCURRENT_WORKERS, 3);
54
55 NodeConcurrencyFilter filter = (NodeConcurrencyFilter) find(Constants.NODE_CONCURRENCY_FILTER);
56
57 MockWorker one = new MockWorker("00001", filter, "pull", "GET");
58 MockWorker two = new MockWorker("00002", filter, "pull", "GET");
59 MockWorker three = new MockWorker("00003", filter, "pull", "GET");
60 MockWorker four = new MockWorker("00004", filter, "pull", "GET");
61
62 one.start();
63 Thread.sleep(500);
64
65 two.start();
66 Thread.sleep(500);
67
68 three.start();
69 Thread.sleep(500);
70
71 four.start();
72 Thread.sleep(500);
73
74 Assert.assertEquals(one.reached, true);
75 Assert.assertEquals(two.reached, true);
76 Assert.assertEquals(three.reached, true);
77 Assert.assertEquals(four.reached, false);
78
79 one.hold = false;
80 two.hold = false;
81 three.hold = false;
82 four.hold = false;
83
84 Thread.sleep(500);
85
86 Assert.assertEquals(one.success, true);
87 Assert.assertEquals(two.success, true);
88 Assert.assertEquals(three.success, true);
89 Assert.assertEquals(four.success, false);
90
91 MockWorker five = new MockWorker("00005", filter, "pull", "GET");
92 five.hold = false;
93 five.start();
94 Thread.sleep(500);
95
96 Assert.assertEquals(five.success, true);
97
98 }
99
100 @Test(timeout = 60000)
101 public void testPushConcurrency() throws Exception {
102 IParameterService parameterService = getParameterService();
103 parameterService.saveParameter(ParameterConstants.CONCURRENT_WORKERS, 2);
104
105 NodeConcurrencyFilter filter = (NodeConcurrencyFilter) find(Constants.NODE_CONCURRENCY_FILTER);
106
107 IConcurrentConnectionManager manager = (IConcurrentConnectionManager) find(Constants.CONCURRENT_CONNECTION_MANGER);
108
109 MockWorker one = new MockWorker("00001", filter, "push", "HEAD");
110 MockWorker two = new MockWorker("00002", filter, "push", "HEAD");
111
112 one.start();
113 two.start();
114 Thread.sleep(500);
115
116 Assert.assertEquals(manager.getReservationCount("/sync/push"), 2);
117
118 one = new MockWorker("00001", filter, "push", "PUT");
119 two = new MockWorker("00002", filter, "push", "PUT");
120
121 one.start();
122 two.start();
123 Thread.sleep(500);
124
125 Assert.assertEquals(one.reached, true);
126 Assert.assertEquals(two.reached, true);
127
128 Assert.assertEquals(manager.getReservationCount("/sync/push"), 2);
129
130 MockWorker five = new MockWorker("00005", filter, "push", "PUT");
131 five.hold = false;
132 five.start();
133 Thread.sleep(500);
134
135 Assert.assertEquals(five.reached, false);
136 Assert.assertEquals(manager.getReservationCount("/sync/push"), 2);
137
138 one.hold = false;
139 two.hold = false;
140 Thread.sleep(500);
141
142 Assert.assertEquals(manager.getReservationCount("/sync/push"), 0);
143
144 }
145
146 class MockWorker extends Thread {
147
148 private String servletPath;
149 private String httpMethod;
150 Exception inError;
151 NodeConcurrencyFilter filter;
152 String nodeId;
153 boolean success = false;
154 boolean hold = true;
155 boolean reached = false;
156
157 MockWorker(String nodeId, NodeConcurrencyFilter filter, String path, String httpMethod) {
158 this.setDaemon(true);
159 this.nodeId = nodeId;
160 this.filter = filter;
161 this.httpMethod = httpMethod;
162 this.servletPath = path;
163 }
164
165 public void run() {
166
167 MockHttpServletRequest req = new MockHttpServletRequest(httpMethod, "/sync/" + servletPath);
168 req.addParameter(WebConstants.NODE_ID, nodeId);
169 req.setServletPath(servletPath);
170
171 HttpServletResponse resp = new MockHttpServletResponse();
172 try {
173 filter.doFilter(req, resp, new FilterChain() {
174 public void doFilter(ServletRequest request, ServletResponse response) throws IOException,
175 ServletException {
176 reached = true;
177
178 while (hold) {
179 try {
180 Thread.sleep(50);
181 } catch (InterruptedException e) {
182 }
183 }
184
185 success = true;
186 }
187 });
188 } catch (Exception e) {
189 this.inError = e;
190 }
191
192 }
193
194 }
195 }