View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *               Eric Long <erilong@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.service.impl;
23  
24  import java.sql.Connection;
25  import java.sql.PreparedStatement;
26  import java.sql.ResultSet;
27  import java.sql.SQLException;
28  import java.util.List;
29  import java.util.Set;
30  
31  import org.jumpmind.symmetric.common.Constants;
32  import org.jumpmind.symmetric.model.BatchInfo;
33  import org.jumpmind.symmetric.model.BatchType;
34  import org.jumpmind.symmetric.model.Data;
35  import org.jumpmind.symmetric.model.DataEventType;
36  import org.jumpmind.symmetric.model.NodeChannel;
37  import org.jumpmind.symmetric.model.OutgoingBatch;
38  import org.jumpmind.symmetric.model.TriggerHistory;
39  import org.jumpmind.symmetric.model.OutgoingBatch.Status;
40  import org.jumpmind.symmetric.service.IAcknowledgeService;
41  import org.jumpmind.symmetric.service.IConfigurationService;
42  import org.jumpmind.symmetric.service.IDataService;
43  import org.jumpmind.symmetric.service.IOutgoingBatchService;
44  import org.jumpmind.symmetric.test.AbstractDatabaseTest;
45  import org.jumpmind.symmetric.test.TestConstants;
46  import org.junit.Before;
47  import org.junit.Test;
48  import org.springframework.dao.DataAccessException;
49  import org.springframework.jdbc.core.ConnectionCallback;
50  
51  public class OutgoingBatchServiceTest extends AbstractDatabaseTest {
52  
53      private IOutgoingBatchService batchService;
54  
55      private IDataService dataService;
56  
57      private IAcknowledgeService ackService;
58  
59      private IConfigurationService configService;
60  
61      private int triggerHistId;
62  
63      public OutgoingBatchServiceTest() throws Exception {
64          super();
65      }
66  
67      public OutgoingBatchServiceTest(String dbName) {
68          super(dbName);
69      }
70  
71      @Before
72      public void setUp() {
73          ackService = (IAcknowledgeService) find(Constants.ACKNOWLEDGE_SERVICE);
74          batchService = (IOutgoingBatchService) find(Constants.OUTGOING_BATCH_SERVICE);
75          dataService = (IDataService) find(Constants.DATA_SERVICE);
76          configService = (IConfigurationService) find(Constants.CONFIG_SERVICE);
77          Set<Long> histKeys = configService.getHistoryRecords().keySet();
78          assertFalse(histKeys.isEmpty());
79          triggerHistId = histKeys.iterator().next().intValue();
80      }
81  
82      @SuppressWarnings("deprecation")
83      @Test
84      public void test() {
85          List<NodeChannel> channels = configService.getChannels();
86          cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
87                  TestConstants.TEST_PREFIX + "outgoing_batch");
88          // create a batch
89          createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT,
90                  TestConstants.TEST_CLIENT_EXTERNAL_ID);
91          batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
92          List<OutgoingBatch> list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
93          assertTrue(list != null);
94          assertEquals(list.size(), 1);
95          assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
96  
97          // create another batch
98          createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT,
99                  TestConstants.TEST_CLIENT_EXTERNAL_ID);
100         createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT,
101                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
102         batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
103         list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
104         assertTrue(list != null);
105         assertTrue(list.size() == 2);
106         assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
107         assertTrue(list.get(1).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
108 
109         // mark the first batch as sent (should still be eligible to be resent)
110         batchService.markOutgoingBatchSent(list.get(0));
111         list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
112         assertTrue(list.size() == 2);
113         assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
114         assertTrue(list.get(1).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
115 
116         // set the second batch status to ok
117         batchService.setBatchStatus(list.get(0).getBatchId(), Status.OK);
118         list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
119         assertTrue(list.size() == 1);
120         assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
121 
122         // test for initial load (batch type == IL)
123         OutgoingBatch ilBatch = new OutgoingBatch();
124         ilBatch.setNodeId(TestConstants.TEST_CLIENT_EXTERNAL_ID);
125         ilBatch.setChannelId(TestConstants.TEST_CHANNEL_ID);
126         ilBatch.setBatchType(BatchType.INITIAL_LOAD);
127         batchService.insertOutgoingBatch(ilBatch);
128         list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
129         assertTrue(list.size() == 2);
130         assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
131         assertTrue(list.get(1).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
132 
133         // now mark the IL batch as complete
134         batchService.setBatchStatus(ilBatch.getBatchId(), Status.OK);
135         list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
136         assertTrue(list.size() == 1);
137         assertTrue(list.get(0).getChannelId().equals(TestConstants.TEST_CHANNEL_ID));
138     }
139 
140     @SuppressWarnings("deprecation")
141     @Test
142     public void testBatchBoundary() {
143         List<NodeChannel> channels = configService.getChannels();
144 
145         cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
146                 TestConstants.TEST_PREFIX + "outgoing_batch");
147         int size = 50;
148         int count = 3; // must be <= size
149         assertTrue(count <= size);
150 
151         for (int i = 0; i < size * count; i++) {
152             createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT,
153                     TestConstants.TEST_CLIENT_EXTERNAL_ID);
154         }
155 
156         for (int i = 0; i < count; i++) {
157             batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
158         }
159 
160         List<OutgoingBatch> list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
161         assertNotNull(list);
162         assertEquals(list.size(), count);
163 
164         for (int i = 0; i < count; i++) {
165             assertTrue(getBatchSize(list.get(i).getBatchId()) <= size + 1);
166         }
167     }
168 
169     @SuppressWarnings("deprecation")
170     @Test
171     public void testMultipleChannels() {
172         List<NodeChannel> channels = configService.getChannels();
173 
174         cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
175                 TestConstants.TEST_PREFIX + "outgoing_batch");
176         createDataEvent("Foo", triggerHistId, "testchannel", DataEventType.INSERT,
177                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
178         createDataEvent("Foo", triggerHistId, "config", DataEventType.INSERT, TestConstants.TEST_CLIENT_EXTERNAL_ID);
179 
180         batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
181 
182         List<OutgoingBatch> list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
183         assertNotNull(list);
184         assertEquals(list.size(), 2);
185     }
186 
187     @Test
188     public void testDisabledChannel() {
189         cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
190                 TestConstants.TEST_PREFIX + "outgoing_batch");
191         int size = 50; // magic number
192         int count = 3; // must be <= size
193         assertTrue(count <= size);
194 
195         for (int i = 0; i < size * count; i++) {
196             createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT,
197                     TestConstants.TEST_CLIENT_EXTERNAL_ID);
198         }
199 
200         List<OutgoingBatch> list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
201         assertNotNull(list);
202         assertEquals(list.size(), 0);
203     }
204 
205     @SuppressWarnings("deprecation")
206     @Test
207     public void testErrorChannel() {
208         IConfigurationService configService = (IConfigurationService) find(Constants.CONFIG_SERVICE);
209         List<NodeChannel> channels = configService.getChannels();
210 
211         cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
212                 TestConstants.TEST_PREFIX + "outgoing_batch");
213         // Create data events for two different channels
214         createDataEvent("TestTable1", triggerHistId, "testchannel", DataEventType.INSERT,
215                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
216 
217         // Build the batch, make sure this event gets its own batch
218         batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
219 
220         createDataEvent("TestTable1", triggerHistId, "testchannel", DataEventType.INSERT,
221                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
222         createDataEvent("TestTable2", triggerHistId, "config", DataEventType.INSERT,
223                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
224 
225         // Build the batches, which should be one for each channel
226         batchService.buildOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID, channels);
227 
228         // Make sure we got three batches
229         List<OutgoingBatch> batches = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
230         assertNotNull(batches);
231         assertEquals(batches.size(), 3);
232         long firstBatchId = batches.get(0).getBatchId();
233         long secondBatchId = batches.get(1).getBatchId();
234         long thirdBatchId = batches.get(2).getBatchId();
235 
236         // Ack the first batch as an error, leaving the others as new
237         ackService.ack(new BatchInfo(firstBatchId, 1));
238 
239         // Get the batches again. The error channel batches should be last
240         batches = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
241         assertNotNull(batches);
242         assertEquals(batches.size(), 3);
243         assertEquals(batches.get(0).getBatchId(), secondBatchId,
244                 "Channel in error should have batches last - missing new batch");
245         assertEquals(batches.get(1).getBatchId(), thirdBatchId,
246                 "Channel in error should have batches last - missing error batch");
247         assertEquals(batches.get(2).getBatchId(), firstBatchId,
248                 "Channel in error should have batches last - missing new batch");
249 
250     }
251 
252     protected void createDataEvent(String tableName, int auditId, String channelId, DataEventType type, String nodeId) {
253         TriggerHistory audit = new TriggerHistory();
254         audit.setTriggerHistoryId(auditId);
255         Data data = new Data(tableName, type, "r.o.w., dat-a", "p-k d.a.t.a", audit);
256         dataService.insertDataEvent(data, channelId, nodeId);
257     }
258 
259     protected int getBatchSize(final long batchId) {
260         return (Integer) getJdbcTemplate().execute(new ConnectionCallback() {
261             public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
262                 PreparedStatement s = conn.prepareStatement("select count(*) " + "from " + TestConstants.TEST_PREFIX
263                         + "data_event where batch_id = ?");
264                 s.setLong(1, batchId);
265                 ResultSet rs = s.executeQuery();
266                 rs.next();
267                 return rs.getInt(1);
268             }
269         });
270     }
271 
272 }