View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Eric Long <erilong@users.sourceforge.net>,
5    *               Chris Henson <chenson42@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.service.impl;
23  
24  import java.io.ByteArrayInputStream;
25  import java.io.ByteArrayOutputStream;
26  import java.util.List;
27  
28  import org.apache.commons.lang.ArrayUtils;
29  import org.apache.log4j.Level;
30  import org.apache.log4j.Logger;
31  import org.jumpmind.symmetric.common.Constants;
32  import org.jumpmind.symmetric.common.csv.CsvConstants;
33  import org.jumpmind.symmetric.ext.NodeGroupTestDataLoaderFilter;
34  import org.jumpmind.symmetric.ext.TestDataLoaderFilter;
35  import org.jumpmind.symmetric.load.AbstractDataLoaderTest;
36  import org.jumpmind.symmetric.load.csv.CsvLoader;
37  import org.jumpmind.symmetric.model.IncomingBatch;
38  import org.jumpmind.symmetric.model.IncomingBatchHistory;
39  import org.jumpmind.symmetric.model.Node;
40  import org.jumpmind.symmetric.service.IParameterService;
41  import org.jumpmind.symmetric.test.TestConstants;
42  import org.jumpmind.symmetric.transport.internal.InternalIncomingTransport;
43  import org.junit.Test;
44  
45  import com.csvreader.CsvWriter;
46  
47  public class DataLoaderServiceTest extends AbstractDataLoaderTest {
48  
49      protected Node client = new Node(TestConstants.TEST_CLIENT_EXTERNAL_ID, null, null);;
50  
51      public DataLoaderServiceTest() throws Exception {
52          super();
53      }
54  
55      public DataLoaderServiceTest(String dbType) {
56          super(dbType);
57      }
58  
59      protected Level setLoggingLevelForTest(Level level) {
60          Level old = Logger.getLogger(DataLoaderService.class).getLevel();
61          Logger.getLogger(DataLoaderService.class).setLevel(level);
62          Logger.getLogger(CsvLoader.class).setLevel(level);
63          return old;
64      }
65  
66      @Test
67      public void testStatistics() throws Exception {
68          Level old = setLoggingLevelForTest(Level.OFF);
69          String[] updateValues = new String[TEST_COLUMNS.length + 1];
70          updateValues[0] = updateValues[updateValues.length - 1] = getNextId();
71          updateValues[2] = updateValues[4] = "required string";
72          String[] insertValues = (String[]) ArrayUtils.subarray(updateValues, 0, updateValues.length - 1);
73  
74          ByteArrayOutputStream out = new ByteArrayOutputStream();
75          CsvWriter writer = getWriter(out);
76          writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID });
77          writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
78          String nextBatchId = getNextBatchId();
79          writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });
80  
81          // Update becomes fallback insert
82          writer.write(CsvConstants.UPDATE);
83          writer.writeRecord(updateValues, true);
84  
85          // Insert becomes fallback update
86          writer.write(CsvConstants.INSERT);
87          writer.writeRecord(insertValues, true);
88  
89          // Insert becomes fallback update
90          writer.write(CsvConstants.INSERT);
91          writer.writeRecord(insertValues, true);
92  
93          // Clean insert
94          insertValues[0] = getNextId();
95          writer.write(CsvConstants.INSERT);
96          writer.writeRecord(insertValues, true);
97  
98          // Delete affects no rows
99          writer.writeRecord(new String[] { CsvConstants.DELETE, getNextId() }, true);
100         writer.writeRecord(new String[] { CsvConstants.DELETE, getNextId() }, true);
101         writer.writeRecord(new String[] { CsvConstants.DELETE, getNextId() }, true);
102 
103         // Failing statement
104         insertValues[0] = getNextId();
105         insertValues[5] = "i am an invalid date";
106         writer.write(CsvConstants.INSERT);
107         writer.writeRecord(insertValues, true);
108 
109         writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId });
110         writer.close();
111         load(out);
112 
113         List<IncomingBatchHistory> list = getIncomingBatchService().findIncomingBatchHistory(batchId,
114                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
115         assertEquals(list.size(), 1, "Wrong number of history. " + printDatabase());
116         IncomingBatchHistory history = list.get(0);
117         assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status. " + printDatabase());
118         assertNotNull(history.getStartTime(), "Start time cannot be null. " + printDatabase());
119         assertNotNull(history.getEndTime(), "End time cannot be null. " + printDatabase());
120         assertEquals(history.getFailedRowNumber(), 8l, "Wrong failed row number. " + printDatabase());
121         assertEquals(history.getByteCount(), 322l, "Wrong byte count. " + printDatabase());
122         assertEquals(history.getStatementCount(), 8l, "Wrong statement count. " + printDatabase());
123         assertEquals(history.getFallbackInsertCount(), 1l, "Wrong fallback insert count. " + printDatabase());
124         assertEquals(history.getFallbackUpdateCount(), 2l, "Wrong fallback update count. " + printDatabase());
125         assertEquals(history.getMissingDeleteCount(), 3l, "Wrong missing delete count. " + printDatabase());
126         setLoggingLevelForTest(old);
127     }
128 
129     @Test
130     public void testUpdateCollision() throws Exception {
131         Level old = setLoggingLevelForTest(Level.OFF);
132         String[] insertValues = new String[TEST_COLUMNS.length];
133         insertValues[0] = getNextId();
134         insertValues[2] = insertValues[4] = "inserted row for testUpdateCollision";
135 
136         String[] updateValues = new String[TEST_COLUMNS.length];
137         updateValues[0] = getId();
138         updateValues[TEST_COLUMNS.length - 1] = getNextId();
139         updateValues[2] = updateValues[4] = "update will become an insert that violates PK";
140 
141         ByteArrayOutputStream out = new ByteArrayOutputStream();
142         CsvWriter writer = getWriter(out);
143         writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID });
144         writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
145 
146         String nextBatchId = getNextBatchId();
147 
148         writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });
149 
150         // This insert will be OK
151         writer.write(CsvConstants.INSERT);
152         writer.writeRecord(insertValues, true);
153 
154         // Update becomes fallback insert, and then violate the primary key
155         writer.write(CsvConstants.UPDATE);
156         writer.writeRecord(updateValues, true);
157 
158         writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId });
159         writer.close();
160         load(out);
161 
162         List<IncomingBatchHistory> list = getIncomingBatchService().findIncomingBatchHistory(batchId,
163                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
164         assertEquals(list.size(), 1, "Wrong number of history");
165 
166         load(out);
167 
168         IncomingBatchHistory history = list.get(0);
169         assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status");
170 
171         list = getIncomingBatchService().findIncomingBatchHistory(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID);
172         assertEquals(list.size(), 2, "Wrong number of history");
173 
174         history = list.get(0);
175         assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status");
176 
177         history = list.get(1);
178         assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status");
179         setLoggingLevelForTest(old);
180     }
181 
182     @Test
183     public void testSqlStatistics() throws Exception {
184         Level old = setLoggingLevelForTest(Level.OFF);
185         String[] insertValues = new String[TEST_COLUMNS.length];
186         insertValues[2] = insertValues[4] = "sql stat test";
187 
188         ByteArrayOutputStream out = new ByteArrayOutputStream();
189         CsvWriter writer = getWriter(out);
190         writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID });
191         String nextBatchId = getNextBatchId();
192         writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });
193         writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
194 
195         // Clean insert
196         String firstId = getNextId();
197         insertValues[0] = firstId;
198         writer.write(CsvConstants.INSERT);
199         writer.writeRecord(insertValues, true);
200 
201         // Clean insert
202         String secondId = getNextId();
203         insertValues[0] = secondId;
204         writer.write(CsvConstants.INSERT);
205         writer.writeRecord(insertValues, true);
206 
207         String thirdId = getNextId();
208         insertValues[0] = thirdId;
209         insertValues[2] = "This is a very long string that will fail upon insert into the database.";
210         writer.write(CsvConstants.INSERT);
211         writer.writeRecord(insertValues, true);
212 
213         writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId });
214         writer.close();
215         load(out);
216 
217         List<IncomingBatchHistory> list = getIncomingBatchService().findIncomingBatchHistory(batchId,
218                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
219         assertEquals(list.size(), 1, "Wrong number of history");
220         IncomingBatchHistory history = list.get(0);
221         assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status. " + printDatabase());
222         assertNotNull(history.getStartTime(), "Start time cannot be null. " + printDatabase());
223         assertNotNull(history.getEndTime(), "End time cannot be null. " + printDatabase());
224         assertEquals(history.getFailedRowNumber(), 3l, "Wrong failed row number. " + printDatabase());
225         assertEquals(history.getByteCount(), 390l, "Wrong byte count. " + printDatabase());
226         assertEquals(history.getStatementCount(), 3l, "Wrong statement count. " + printDatabase());
227         assertEquals(history.getFallbackInsertCount(), 0l, "Wrong fallback insert count. " + printDatabase());
228         assertEquals(history.getFallbackUpdateCount(), 1l, "Wrong fallback update count. " + printDatabase());
229         assertEquals(history.getMissingDeleteCount(), 0l, "Wrong missing delete count. " + printDatabase());
230         assertNotNull(history.getSqlState(), "Sql state should not be null. " + printDatabase());
231         assertNotNull(history.getSqlMessage(), "Sql message should not be null. " + printDatabase());
232         setLoggingLevelForTest(old);
233     }
234 
235     @Test
236     public void testSkippingResentBatch() throws Exception {
237         String[] values = { getNextId(), "resend string", "resend string not null", "resend char",
238                 "resend char not null", "2007-01-25 00:00:00.0", "2007-01-25 01:01:01.0", "0", "7", "10.10",
239                 "0.474"};
240         getNextBatchId();
241         for (int i = 0; i < 7; i++) {
242             batchId--;
243             testSimple(CsvConstants.INSERT, values, values);
244             IncomingBatchHistory.Status expectedStatus = IncomingBatchHistory.Status.OK;
245             long expectedCount = 1;
246             if (i > 0) {
247                 expectedStatus = IncomingBatchHistory.Status.SK;
248                 expectedCount = 0;
249             }
250             assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
251                     IncomingBatch.Status.OK, "Wrong status");
252             List<IncomingBatchHistory> list = getIncomingBatchService().findIncomingBatchHistory(batchId,
253                     TestConstants.TEST_CLIENT_EXTERNAL_ID);
254             assertEquals(list.size(), i + 1, "Wrong number of history");
255             IncomingBatchHistory history = list.get(i);
256             assertEquals(history.getStatus(), expectedStatus, "Wrong status");
257             assertEquals(history.getFailedRowNumber(), 0l, "Wrong failed row number");
258             assertEquals(history.getStatementCount(), expectedCount, "Wrong statement count");
259             assertEquals(history.getFallbackInsertCount(), 0l, "Wrong fallback insert count");
260             assertEquals(history.getFallbackUpdateCount(), 0l, "Wrong fallback update count");
261             // pause to make sure we get a different start time on the incoming
262             // batch history
263             Thread.sleep(10);
264         }
265     }
266 
267     @Test
268     public void testErrorWhileSkip() throws Exception {
269         Level old = setLoggingLevelForTest(Level.OFF);
270         String[] values = { getNextId(), "string2", "string not null2", "char2", "char not null2",
271                 "2007-01-02 00:00:00.0", "2007-02-03 04:05:06.0", "0", "47", "67.89", "0.474" };
272 
273         testSimple(CsvConstants.INSERT, values, values);
274         assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
275                 IncomingBatch.Status.OK, "Wrong status");
276         List<IncomingBatchHistory> list = getIncomingBatchService().findIncomingBatchHistory(batchId,
277                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
278         assertEquals(list.size(), 1, "Wrong number of history");
279         IncomingBatchHistory history = list.get(0);
280         assertEquals(history.getStatus(), IncomingBatchHistory.Status.OK, "Wrong status");
281         assertEquals(history.getFailedRowNumber(), 0l, "Wrong failed row number");
282         assertEquals(history.getStatementCount(), 1l, "Wrong statement count");
283 
284         ByteArrayOutputStream out = new ByteArrayOutputStream();
285         CsvWriter writer = getWriter(out);
286         writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID });
287         writer.writeRecord(new String[] { CsvConstants.BATCH, getBatchId() });
288         writer.writeRecord(new String[] { CsvConstants.TABLE, TEST_TABLE });
289         writer.write("UnknownTokenWithinBatch");
290         writer.writeRecord(new String[] { CsvConstants.COMMIT, getBatchId() });
291         writer.close();
292         // Pause a moment to guarantee our history comes back in time order
293         Thread.sleep(10);
294         load(out);
295         assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
296                 IncomingBatch.Status.OK, "Wrong status");
297         list = getIncomingBatchService().findIncomingBatchHistory(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID);
298         assertEquals(list.size(), 2, "Wrong number of history");
299         history = list.get(1);
300         assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status");
301         assertEquals(history.getFailedRowNumber(), 0l, "Wrong failed row number");
302         assertEquals(history.getStatementCount(), 0l, "Wrong statement count");
303         setLoggingLevelForTest(old);
304     }
305 
306     @Test
307     public void testDataIntregrityError() throws Exception {
308         Level old = setLoggingLevelForTest(Level.OFF);
309         String[] values = { getNextId(), "string3", "string not null3", "char3", "char not null3",
310                 "2007-01-02 00:00:00.0", "2007-02-03 04:05:06.0", "0", "47", "67.89", "0.474" };
311 
312         IParameterService paramService = (IParameterService) find(Constants.PARAMETER_SERVICE);
313         paramService.saveParameter("dataloader.enable.fallback.update", "false");
314         ByteArrayOutputStream out = new ByteArrayOutputStream();
315         CsvWriter writer = getWriter(out);
316         writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID });
317         writer.writeRecord(new String[] { CsvConstants.BATCH, getNextBatchId() });
318         writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
319         writer.write(CsvConstants.INSERT);
320         writer.writeRecord(values, true);
321         writer.write(CsvConstants.INSERT);
322         writer.writeRecord(values, true);
323         writer.writeRecord(new String[] { CsvConstants.COMMIT, getBatchId() });
324         writer.close();
325         load(out);
326 
327         assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
328                 IncomingBatch.Status.ER, "Wrong status");
329         List<IncomingBatchHistory> list = getIncomingBatchService().findIncomingBatchHistory(
330                 batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID);
331         assertEquals(list.size(), 1, "Wrong number of history");
332         IncomingBatchHistory history = list.get(0);
333         assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status");
334         assertEquals(history.getFailedRowNumber(), 2l, "Wrong failed row number");
335         assertEquals(history.getStatementCount(), 2l, "Wrong statement count");
336 
337         load(out);
338         assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
339                 IncomingBatch.Status.ER, "Wrong status");
340         list = getIncomingBatchService().findIncomingBatchHistory(
341                 batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID);
342         assertEquals(list.size(), 2, "Wrong number of history");
343         history = list.get(1);
344         assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status");
345         assertEquals(history.getFailedRowNumber(), 2l, "Wrong failed row number");
346         assertEquals(history.getStatementCount(), 2l, "Wrong statement count");
347 
348         paramService.saveParameter("dataloader.enable.fallback.update", "true");
349         setLoggingLevelForTest(old);
350     }
351 
352     
353     @Test
354     public void testErrorWhileParsing() throws Exception {
355         Level old = setLoggingLevelForTest(Level.OFF);
356         String[] values = { getNextId(), "should not reach database", "string not null", "char", "char not null",
357                 "2007-01-02", "2007-02-03 04:05:06.0", "0", "47", "67.89", "0.474"};
358 
359         ByteArrayOutputStream out = new ByteArrayOutputStream();
360         CsvWriter writer = getWriter(out);
361         writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID });
362         writer.write("UnknownTokenOutsideBatch");
363         String nextBatchId = getNextBatchId();
364         writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });
365         writer.writeRecord(new String[] { CsvConstants.TABLE, TEST_TABLE });
366         writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
367         writer.write(CsvConstants.INSERT);
368         writer.writeRecord(values, true);
369         writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId });
370         writer.close();
371         load(out);
372         assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID), null,
373                 "Wrong status");
374         List<IncomingBatchHistory> list = getIncomingBatchService().findIncomingBatchHistory(batchId,
375                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
376         assertEquals(list.size(), 0, "Wrong number of history");
377         setLoggingLevelForTest(old);
378     }
379 
380     @Test
381     public void testErrorThenSuccessBatch() throws Exception {
382         Logger.getLogger(DataLoaderServiceTest.class).warn("testErrorThenSuccessBatch");
383         Level old = setLoggingLevelForTest(Level.OFF);
384         String[] values = { getNextId(), "This string is too large and will cause the statement to fail",
385                 "string not null2", "char2", "char not null2", "2007-01-02 00:00:00.0", "2007-02-03 04:05:06.0", "0",
386                 "47", "67.89", "0.474" };
387         getNextBatchId();
388         int retries = 3;
389         for (int i = 0; i < retries; i++) {
390             batchId--;
391             testSimple(CsvConstants.INSERT, values, null);
392             assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
393                     IncomingBatch.Status.ER, "Wrong status");
394             List<IncomingBatchHistory> list = getIncomingBatchService().findIncomingBatchHistory(batchId,
395                     TestConstants.TEST_CLIENT_EXTERNAL_ID);
396             assertEquals(list.size(), i + 1, "Wrong number of history. " + printDatabase());
397             IncomingBatchHistory history = list.get(i);
398             assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status. "
399                             + printDatabase());
400             assertEquals(history.getFailedRowNumber(), 1l, "Wrong failed row number. " + printDatabase());
401             assertEquals(history.getStatementCount(), 1l, "Wrong statement count. " + printDatabase());
402             // pause to make sure we get a different start time on the incoming
403             // batch history
404             Thread.sleep(10);
405         }
406 
407         batchId--;
408         values[1] = "A smaller string that will succeed";
409         testSimple(CsvConstants.INSERT, values, values);
410         assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
411                 IncomingBatch.Status.OK, "Wrong status. " + printDatabase());
412         List<IncomingBatchHistory> list = getIncomingBatchService().findIncomingBatchHistory(batchId,
413                 TestConstants.TEST_CLIENT_EXTERNAL_ID);
414         assertEquals(list.size(), retries + 1, "Wrong number of history. " + printDatabase());
415         IncomingBatchHistory history = list.get(retries);
416         assertEquals(history.getStatus(), IncomingBatchHistory.Status.OK, "Wrong status. " + printDatabase());
417         assertEquals(history.getFailedRowNumber(), 0l, "Wrong failed row number. " + printDatabase());
418         assertEquals(history.getStatementCount(), 1l, "Wrong statement count. " + printDatabase());
419         setLoggingLevelForTest(old);
420     }
421 
422     @Test
423     public void testMultipleBatch() throws Exception {
424         Level old = setLoggingLevelForTest(Level.OFF);
425         String[] values = { getNextId(), "string", "string not null2", "char2", "char not null2",
426                 "2007-01-02 00:00:00.0", "2007-02-03 04:05:06.0", "0", "47", "67.89", "0.474" };
427         String[] values2 = { getNextId(), "This string is too large and will cause the statement to fail",
428                 "string not null2", "char2", "char not null2", "2007-01-02 00:00:00.0", "2007-02-03 04:05:06.0", "0",
429                 "47", "67.89", "0.474" };
430 
431         ByteArrayOutputStream out = new ByteArrayOutputStream();
432         CsvWriter writer = getWriter(out);
433         writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID });
434         writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
435 
436         String nextBatchId = getNextBatchId();
437         writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });
438         writer.write(CsvConstants.INSERT);
439         writer.writeRecord(values, true);
440         writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId });
441 
442         String nextBatchId2 = getNextBatchId();
443         writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId2 });
444         writer.write(CsvConstants.INSERT);
445         writer.writeRecord(values2, true);
446         writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId2 });
447 
448         writer.close();
449         load(out);
450         assertTestTableEquals(values[0], values);
451         assertTestTableEquals(values2[0], null);
452 
453         assertEquals(findIncomingBatchStatus(Integer.parseInt(nextBatchId),
454                 TestConstants.TEST_CLIENT_EXTERNAL_ID), IncomingBatch.Status.OK, "Wrong status. " + printDatabase());
455         assertEquals(findIncomingBatchStatus(Integer.parseInt(nextBatchId2),
456                 TestConstants.TEST_CLIENT_EXTERNAL_ID), IncomingBatch.Status.ER, "Wrong status. " + printDatabase());
457         setLoggingLevelForTest(old);
458     }
459 
460     protected void load(ByteArrayOutputStream out) throws Exception {
461         ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
462         getTransportManager().setIncomingTransport(new InternalIncomingTransport(in));
463         getDataLoaderService().loadData(client, null);
464     }
465 
466     protected IncomingBatch.Status findIncomingBatchStatus(int batchId, String nodeId) {
467         IncomingBatch batch = getIncomingBatchService().findIncomingBatch(batchId, nodeId);
468         IncomingBatch.Status status = null;
469         if (batch != null) {
470             status = batch.getStatus();
471         }
472         return status;
473     }
474 
475     @Test
476     public void testAutoRegisteredExtensionPoint() {
477         TestDataLoaderFilter registeredFilter = (TestDataLoaderFilter) find("registeredDataFilter");
478         TestDataLoaderFilter unRegisteredFilter = (TestDataLoaderFilter) find(
479                 "unRegisteredDataFilter");
480         assertTrue(registeredFilter.getNumberOfTimesCalled() > 0);
481         assertTrue(unRegisteredFilter.getNumberOfTimesCalled() == 0);
482 
483         NodeGroupTestDataLoaderFilter registeredNodeGroupFilter = (NodeGroupTestDataLoaderFilter) find("registeredNodeGroupTestDataFilter");
484         NodeGroupTestDataLoaderFilter unRegisteredNodeGroupFilter = (NodeGroupTestDataLoaderFilter) find("unRegisteredNodeGroupTestDataFilter");
485         assertTrue(registeredNodeGroupFilter.getNumberOfTimesCalled() > 0);
486         assertTrue(unRegisteredNodeGroupFilter.getNumberOfTimesCalled() == 0);
487     }
488 
489 }