1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.service.impl;
22
23 import java.io.BufferedWriter;
24 import java.util.Set;
25 import java.util.StringTokenizer;
26
27 import org.jumpmind.symmetric.common.Constants;
28 import org.jumpmind.symmetric.db.DbTriggerTest;
29 import org.jumpmind.symmetric.model.Data;
30 import org.jumpmind.symmetric.model.DataEventType;
31 import org.jumpmind.symmetric.model.Node;
32 import org.jumpmind.symmetric.model.Trigger;
33 import org.jumpmind.symmetric.model.TriggerHistory;
34 import org.jumpmind.symmetric.service.IBootstrapService;
35 import org.jumpmind.symmetric.service.IConfigurationService;
36 import org.jumpmind.symmetric.service.IDataExtractorService;
37 import org.jumpmind.symmetric.service.IDataService;
38 import org.jumpmind.symmetric.test.AbstractDatabaseTest;
39 import org.jumpmind.symmetric.test.TestConstants;
40 import org.jumpmind.symmetric.transport.mock.MockOutgoingTransport;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.springframework.jdbc.core.JdbcTemplate;
44
45 public class DataExtractorServiceTest extends AbstractDatabaseTest {
46
47 protected IDataExtractorService dataExtractorService;
48
49 protected IConfigurationService configurationService;
50
51 protected IDataService dataService;
52
53 private int triggerHistId;
54
55 protected Node node;
56
57 public DataExtractorServiceTest() throws Exception {
58 super();
59 }
60
61 public DataExtractorServiceTest(String dbName) {
62 super(dbName);
63 }
64
65 @Before
66 public void setUp() {
67 dataExtractorService = (IDataExtractorService) find(Constants.DATAEXTRACTOR_SERVICE);
68 configurationService = (IConfigurationService) find(Constants.CONFIG_SERVICE);
69 dataService = (IDataService) find(Constants.DATA_SERVICE);
70 node = new Node();
71 node.setNodeId(TestConstants.TEST_CLIENT_EXTERNAL_ID);
72 node.setNodeGroupId(TestConstants.TEST_CLIENT_NODE_GROUP);
73 Set<Long> histKeys = configurationService.getHistoryRecords().keySet();
74 assertFalse(histKeys.isEmpty());
75 triggerHistId = histKeys.iterator().next().intValue();
76 }
77
78 @Test
79 public void testInitialLoadExtract() throws Exception {
80 ((IBootstrapService) find(Constants.BOOTSTRAP_SERVICE)).syncTriggers();
81 MockOutgoingTransport mockTransport = new MockOutgoingTransport();
82 BufferedWriter writer = mockTransport.open();
83 JdbcTemplate template = getJdbcTemplate();
84 template.update("delete from " + DbTriggerTest.TEST_TRIGGERS_TABLE);
85 Trigger trigger = configurationService.getTriggerFor(DbTriggerTest.TEST_TRIGGERS_TABLE, TestConstants.TEST_CONTINUOUS_NODE_GROUP);
86 dataExtractorService.extractInitialLoadFor(node, trigger, writer);
87 String loadResults = mockTransport.toString();
88 assertEquals(countLines(loadResults), 4, "Unexpected number of lines in the csv result: " + loadResults);
89 assertTrue(loadResults.startsWith("nodeid, 00000"), "Unexpected line at the start of the feed.");
90
91 DbTriggerTest.insert(DbTriggerTest.INSERT1_VALUES, template, getDbDialect());
92 DbTriggerTest.insert(DbTriggerTest.INSERT2_VALUES, template, getDbDialect());
93
94 dataExtractorService.extractInitialLoadFor(node, trigger, writer);
95 loadResults = mockTransport.toString();
96 assertEquals(countLines(loadResults), 13, "Unexpected number of lines in the csv result: " + loadResults);
97
98
99 }
100
101 @Test
102 public void testExtract() throws Exception {
103 cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
104 TestConstants.TEST_PREFIX + "outgoing_batch");
105 createDataEvent("Foo", triggerHistId, TestConstants.TEST_CHANNEL_ID, DataEventType.INSERT, node.getNodeId());
106
107 MockOutgoingTransport mockTransport = new MockOutgoingTransport();
108 mockTransport.open();
109 dataExtractorService.extract(node, mockTransport);
110 String loadResults = mockTransport.toString();
111
112 assertEquals(countLines(loadResults), 8, "Unexpected number of lines in the transport result: "
113 + loadResults);
114 }
115
116 private int countLines(String results) {
117 return new StringTokenizer(results, "\n").countTokens();
118 }
119
120 private void createDataEvent(String tableName, int auditId, String channelId, DataEventType type, String nodeId) {
121 TriggerHistory audit = new TriggerHistory();
122 audit.setTriggerHistoryId(auditId);
123 Data data = new Data(tableName, type, "r.o.w., dat-a", "p-k d.a.t.a", audit);
124 dataService.insertDataEvent(data, channelId, nodeId);
125 }
126 }