View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  package org.jumpmind.symmetric.web;
21  
22  import java.io.IOException;
23  
24  import javax.servlet.FilterChain;
25  import javax.servlet.ServletException;
26  import javax.servlet.ServletRequest;
27  import javax.servlet.ServletResponse;
28  import javax.servlet.http.HttpServletResponse;
29  
30  import org.jumpmind.symmetric.common.Constants;
31  import org.jumpmind.symmetric.common.ParameterConstants;
32  import org.jumpmind.symmetric.service.IParameterService;
33  import org.jumpmind.symmetric.test.AbstractDatabaseTest;
34  import org.jumpmind.symmetric.transport.IConcurrentConnectionManager;
35  import org.junit.Assert;
36  import org.junit.Test;
37  import org.springframework.mock.web.MockHttpServletRequest;
38  import org.springframework.mock.web.MockHttpServletResponse;
39  
40  public class NodeConcurrencyFilterTest extends AbstractDatabaseTest {
41  
42      public NodeConcurrencyFilterTest() throws Exception {
43          super();
44      }
45  
46      public NodeConcurrencyFilterTest(String dbName) {
47          super(dbName);
48      }
49  
50      @Test(timeout = 60000)
51      public void testPullConcurrency() throws Exception {
52          IParameterService parameterService = getParameterService();
53          parameterService.saveParameter(ParameterConstants.CONCURRENT_WORKERS, 3);
54  
55          NodeConcurrencyFilter filter = (NodeConcurrencyFilter) find(Constants.NODE_CONCURRENCY_FILTER);
56  
57          MockWorker one = new MockWorker("00001", filter, "pull", "GET");
58          MockWorker two = new MockWorker("00002", filter, "pull", "GET");
59          MockWorker three = new MockWorker("00003", filter, "pull", "GET");
60          MockWorker four = new MockWorker("00004", filter, "pull", "GET");
61  
62          one.start();
63          Thread.sleep(500);
64  
65          two.start();
66          Thread.sleep(500);
67  
68          three.start();
69          Thread.sleep(500);
70  
71          four.start();
72          Thread.sleep(500);
73  
74          Assert.assertEquals(one.reached, true);
75          Assert.assertEquals(two.reached, true);
76          Assert.assertEquals(three.reached, true);
77          Assert.assertEquals(four.reached, false);
78  
79          one.hold = false;
80          two.hold = false;
81          three.hold = false;
82          four.hold = false;
83  
84          Thread.sleep(500);
85  
86          Assert.assertEquals(one.success, true);
87          Assert.assertEquals(two.success, true);
88          Assert.assertEquals(three.success, true);
89          Assert.assertEquals(four.success, false);
90  
91          MockWorker five = new MockWorker("00005", filter, "pull", "GET");
92          five.hold = false;
93          five.start();
94          Thread.sleep(500);
95  
96          Assert.assertEquals(five.success, true);
97  
98      }
99  
100     @Test(timeout = 60000)
101     public void testPushConcurrency() throws Exception {
102         IParameterService parameterService = getParameterService();
103         parameterService.saveParameter(ParameterConstants.CONCURRENT_WORKERS, 2);
104 
105         NodeConcurrencyFilter filter = (NodeConcurrencyFilter) find(Constants.NODE_CONCURRENCY_FILTER);
106 
107         IConcurrentConnectionManager manager = (IConcurrentConnectionManager) find(Constants.CONCURRENT_CONNECTION_MANGER);
108 
109         MockWorker one = new MockWorker("00001", filter, "push", "HEAD");
110         MockWorker two = new MockWorker("00002", filter, "push", "HEAD");
111 
112         one.start();
113         two.start();
114         Thread.sleep(500);
115 
116         Assert.assertEquals(manager.getReservationCount("/sync/push"), 2);
117 
118         one = new MockWorker("00001", filter, "push", "PUT");
119         two = new MockWorker("00002", filter, "push", "PUT");
120 
121         one.start();
122         two.start();
123         Thread.sleep(500);
124 
125         Assert.assertEquals(one.reached, true);
126         Assert.assertEquals(two.reached, true);
127 
128         Assert.assertEquals(manager.getReservationCount("/sync/push"), 2);
129 
130         MockWorker five = new MockWorker("00005", filter, "push", "PUT");
131         five.hold = false;
132         five.start();
133         Thread.sleep(500);
134 
135         Assert.assertEquals(five.reached, false);
136         Assert.assertEquals(manager.getReservationCount("/sync/push"), 2);
137 
138         one.hold = false;
139         two.hold = false;
140         Thread.sleep(500);
141 
142         Assert.assertEquals(manager.getReservationCount("/sync/push"), 0);
143 
144     }
145 
146     class MockWorker extends Thread {
147 
148         private String servletPath;
149         private String httpMethod;
150         Exception inError;
151         NodeConcurrencyFilter filter;
152         String nodeId;
153         boolean success = false;
154         boolean hold = true;
155         boolean reached = false;
156 
157         MockWorker(String nodeId, NodeConcurrencyFilter filter, String path, String httpMethod) {
158             this.setDaemon(true);
159             this.nodeId = nodeId;
160             this.filter = filter;
161             this.httpMethod = httpMethod;
162             this.servletPath = path;
163         }
164 
165         public void run() {
166 
167             MockHttpServletRequest req = new MockHttpServletRequest(httpMethod, "/sync/" + servletPath);
168             req.addParameter(WebConstants.NODE_ID, nodeId);
169             req.setServletPath(servletPath);
170 
171             HttpServletResponse resp = new MockHttpServletResponse();
172             try {
173                 filter.doFilter(req, resp, new FilterChain() {
174                     public void doFilter(ServletRequest request, ServletResponse response) throws IOException,
175                             ServletException {
176                         reached = true;
177 
178                         while (hold) {
179                             try {
180                                 Thread.sleep(50);
181                             } catch (InterruptedException e) {
182                             }
183                         }
184 
185                         success = true;
186                     }
187                 });
188             } catch (Exception e) {
189                 this.inError = e;
190             }
191 
192         }
193 
194     }
195 }