1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.service.impl;
23
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.ResultSet;
27 import java.sql.SQLException;
28 import java.util.List;
29 import java.util.Set;
30
31 import org.jumpmind.symmetric.common.Constants;
32 import org.jumpmind.symmetric.model.BatchInfo;
33 import org.jumpmind.symmetric.model.BatchType;
34 import org.jumpmind.symmetric.model.Data;
35 import org.jumpmind.symmetric.model.DataEventType;
36 import org.jumpmind.symmetric.model.NodeChannel;
37 import org.jumpmind.symmetric.model.OutgoingBatch;
38 import org.jumpmind.symmetric.model.TriggerHistory;
39 import org.jumpmind.symmetric.model.OutgoingBatch.Status;
40 import org.jumpmind.symmetric.service.IAcknowledgeService;
41 import org.jumpmind.symmetric.service.IConfigurationService;
42 import org.jumpmind.symmetric.service.IDataService;
43 import org.jumpmind.symmetric.service.IOutgoingBatchService;
44 import org.jumpmind.symmetric.test.AbstractDatabaseTest;
45 import org.jumpmind.symmetric.test.TestConstants;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.springframework.dao.DataAccessException;
49 import org.springframework.jdbc.core.ConnectionCallback;
50
51 public class OutgoingBatchServiceTest extends AbstractDatabaseTest {
52
53 private IOutgoingBatchService batchService;
54
55 private IDataService dataService;
56
57 private IAcknowledgeService ackService;
58
59 private IConfigurationService configService;
60
61 private int triggerHistId;
62
63 public OutgoingBatchServiceTest() throws Exception {
64 super();
65 }
66
67 public OutgoingBatchServiceTest(String dbName) {
68 super(dbName);
69 }
70
71 @Before
72 public void setUp() {
73 ackService = (IAcknowledgeService) find(Constants.ACKNOWLEDGE_SERVICE);
74 batchService = (IOutgoingBatchService) find(Constants.OUTGOING_BATCH_SERVICE);
75 dataService = (IDataService) find(Constants.DATA_SERVICE);
76 configService = (IConfigurationService) find(Constants.CONFIG_SERVICE);
77 Set<Long> histKeys = configService.getHistoryRecords().keySet();
78 assertFalse(histKeys.isEmpty());
79 triggerHistId = histKeys.iterator().next().intValue();
80 }
81
82 @SuppressWarnings("deprecation")
83 @Test
84 public void test() {
85 List<NodeChannel> channels = configService.getChannels();
86 cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
87 TestConstants.TEST_PREFIX + "outgoing_batch");
88
89 createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT,
90 TestConstants.TEST_CLIENT_EXTERNAL_ID);
91 batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
92 List<OutgoingBatch> list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
93 assertTrue(list != null);
94 assertEquals(list.size(), 1);
95 assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
96
97
98 createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT,
99 TestConstants.TEST_CLIENT_EXTERNAL_ID);
100 createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT,
101 TestConstants.TEST_CLIENT_EXTERNAL_ID);
102 batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
103 list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
104 assertTrue(list != null);
105 assertTrue(list.size() == 2);
106 assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
107 assertTrue(list.get(1).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
108
109
110 batchService.markOutgoingBatchSent(list.get(0));
111 list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
112 assertTrue(list.size() == 2);
113 assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
114 assertTrue(list.get(1).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
115
116
117 batchService.setBatchStatus(list.get(0).getBatchId(), Status.OK);
118 list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
119 assertTrue(list.size() == 1);
120 assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
121
122
123 OutgoingBatch ilBatch = new OutgoingBatch();
124 ilBatch.setNodeId(TestConstants.TEST_CLIENT_EXTERNAL_ID);
125 ilBatch.setChannelId(TestConstants.TEST_CHANNEL_ID);
126 ilBatch.setBatchType(BatchType.INITIAL_LOAD);
127 batchService.insertOutgoingBatch(ilBatch);
128 list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
129 assertTrue(list.size() == 2);
130 assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
131 assertTrue(list.get(1).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
132
133
134 batchService.setBatchStatus(ilBatch.getBatchId(), Status.OK);
135 list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
136 assertTrue(list.size() == 1);
137 assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
138 }
139
140 @SuppressWarnings("deprecation")
141 @Test
142 public void testBatchBoundary() {
143 List<NodeChannel> channels = configService.getChannels();
144
145 cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
146 TestConstants.TEST_PREFIX + "outgoing_batch");
147 int size = 50;
148 int count = 3;
149 assertTrue(count <= size);
150
151 for (int i = 0; i < size * count; i++) {
152 createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT,
153 TestConstants.TEST_CLIENT_EXTERNAL_ID);
154 }
155
156 for (int i = 0; i < count; i++) {
157 batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
158 }
159
160 List<OutgoingBatch> list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
161 assertNotNull(list);
162 assertEquals(list.size(), count);
163
164 for (int i = 0; i < count; i++) {
165 assertTrue(getBatchSize(list.get(i).getBatchId()) <= size + 1);
166 }
167 }
168
169 @SuppressWarnings("deprecation")
170 @Test
171 public void testMultipleChannels() {
172 List<NodeChannel> channels = configService.getChannels();
173
174 cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
175 TestConstants.TEST_PREFIX + "outgoing_batch");
176 createDataEvent("Foo", triggerHistId, "testchannel", DataEventType.INSERT,
177 TestConstants.TEST_CLIENT_EXTERNAL_ID);
178 createDataEvent("Foo", triggerHistId, "config", DataEventType.INSERT, TestConstants.TEST_CLIENT_EXTERNAL_ID);
179
180 batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
181
182 List<OutgoingBatch> list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
183 assertNotNull(list);
184 assertEquals(list.size(), 2);
185 }
186
187 @Test
188 public void testDisabledChannel() {
189 cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
190 TestConstants.TEST_PREFIX + "outgoing_batch");
191 int size = 50;
192 int count = 3;
193 assertTrue(count <= size);
194
195 for (int i = 0; i < size * count; i++) {
196 createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT,
197 TestConstants.TEST_CLIENT_EXTERNAL_ID);
198 }
199
200 List<OutgoingBatch> list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
201 assertNotNull(list);
202 assertEquals(list.size(), 0);
203 }
204
205 @SuppressWarnings("deprecation")
206 @Test
207 public void testErrorChannel() {
208 IConfigurationService configService = (IConfigurationService) find(Constants.CONFIG_SERVICE);
209 List<NodeChannel> channels = configService.getChannels();
210
211 cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
212 TestConstants.TEST_PREFIX + "outgoing_batch");
213
214 createDataEvent("TestTable1", triggerHistId, "testchannel", DataEventType.INSERT,
215 TestConstants.TEST_CLIENT_EXTERNAL_ID);
216
217
218 batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
219
220 createDataEvent("TestTable1", triggerHistId, "testchannel", DataEventType.INSERT,
221 TestConstants.TEST_CLIENT_EXTERNAL_ID);
222 createDataEvent("TestTable2", triggerHistId, "config", DataEventType.INSERT,
223 TestConstants.TEST_CLIENT_EXTERNAL_ID);
224
225
226 batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
227
228
229 List<OutgoingBatch> batches = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
230 assertNotNull(batches);
231 assertEquals(batches.size(), 3);
232 long firstBatchId = batches.get(0).getBatchId();
233 long secondBatchId = batches.get(1).getBatchId();
234 long thirdBatchId = batches.get(2).getBatchId();
235
236
237 ackService.ack(new BatchInfo(firstBatchId, 1));
238
239
240 batches = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
241 assertNotNull(batches);
242 assertEquals(batches.size(), 3);
243 assertEquals(batches.get(0).getBatchId(), secondBatchId,
244 "Channel in error should have batches last - missing new batch");
245 assertEquals(batches.get(1).getBatchId(), thirdBatchId,
246 "Channel in error should have batches last - missing error batch");
247 assertEquals(batches.get(2).getBatchId(), firstBatchId,
248 "Channel in error should have batches last - missing new batch");
249
250 }
251
252 protected void createDataEvent(String tableName, int auditId, String channelId, DataEventType type, String nodeId) {
253 TriggerHistory audit = new TriggerHistory();
254 audit.setTriggerHistoryId(auditId);
255 Data data = new Data(tableName, type, "r.o.w., dat-a", "p-k d.a.t.a", audit);
256 dataService.insertDataEvent(data, channelId, nodeId);
257 }
258
259 protected int getBatchSize(final long batchId) {
260 return (Integer) getJdbcTemplate().execute(new ConnectionCallback() {
261 public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
262 PreparedStatement s = conn.prepareStatement("select count(*) " + "from " + TestConstants.TEST_PREFIX
263 + "data_event where batch_id = ?");
264 s.setLong(1, batchId);
265 ResultSet rs = s.executeQuery();
266 rs.next();
267 return rs.getInt(1);
268 }
269 });
270 }
271
272 }