1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.extract;
22
23 import java.io.BufferedWriter;
24 import java.io.IOException;
25 import java.io.StringWriter;
26 import java.sql.Connection;
27 import java.sql.SQLException;
28 import java.sql.Statement;
29 import java.util.Date;
30
31 import org.jumpmind.symmetric.common.Constants;
32 import org.jumpmind.symmetric.common.csv.CsvConstants;
33 import org.jumpmind.symmetric.db.IDbDialect;
34 import org.jumpmind.symmetric.db.SequenceIdentifier;
35 import org.jumpmind.symmetric.model.Data;
36 import org.jumpmind.symmetric.model.DataEventType;
37 import org.jumpmind.symmetric.model.OutgoingBatch;
38 import org.jumpmind.symmetric.model.TriggerHistory;
39 import org.jumpmind.symmetric.service.IParameterService;
40 import org.jumpmind.symmetric.test.AbstractDatabaseTest;
41 import org.junit.Assert;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.springframework.dao.DataAccessException;
45 import org.springframework.jdbc.core.ConnectionCallback;
46
47 public class DataExtractorTest extends AbstractDatabaseTest {
48
49 private static final String CONTEXT_NAME = "extractorContext";
50
51 private static final String TABLE_NAME = "table1";
52
53 private IDataExtractor dataExtractor;
54
55 private IParameterService parameterService;
56
57 private IDbDialect dbDialect;
58
59 private final TestData TD1 = new TestData(999, "foo", "\"abc\", 123, \"xyz\"", "328", "basket_id",
60 "mango, watermellon, grape");
61
62 private final TestData TD2 = new TestData(998, "foo", "\"www\", 888, \"ghi\"", "6578", "basket_id",
63 "mango, watermellon, grape");
64
65 private final TestData TD3 = new TestData(997, "foo", "\"monday\", 879, \"ggg\"", "6502", "basket_id",
66 "grape, tomato, cucumber");
67
68 private final TestData TD4 = new TestData(997, "bar", "\"monday\", 879, \"ggg\"", "6502", "basket_id",
69 "grape, tomato, cucumber");
70
71
72
73 public DataExtractorTest() throws Exception {
74 super();
75 }
76
77 public DataExtractorTest(String dbName) {
78 super(dbName);
79 }
80
81 @Before
82 public void setUp() {
83 dataExtractor = (IDataExtractor) find(Constants.DATA_EXTRACTOR);
84 parameterService = (IParameterService) find(Constants.PARAMETER_SERVICE);
85 dbDialect = (IDbDialect) find(Constants.DB_DIALECT);
86 }
87
88 @Test
89 public void basicTest() {
90 TriggerHistory audit = makeTableSyncAuditId(TD1.keyColumns, TD1.columns);
91
92 try {
93 DataExtractorContext context = (DataExtractorContext) find(CONTEXT_NAME);
94
95 StringWriter stringWriter = new StringWriter();
96 BufferedWriter writer = new BufferedWriter(stringWriter);
97 dataExtractor.init(writer, context);
98
99 long batchId = 998877;
100 OutgoingBatch batch = new OutgoingBatch();
101 batch.setBatchId(batchId);
102 dataExtractor.begin(batch, writer);
103
104 Data data = new Data(TD1.dataId, TD1.key, TD1.rowData, DataEventType.INSERT, TD1.table, new Date(), audit);
105 dataExtractor.write(writer, data, context);
106 dataExtractor.commit(batch, writer);
107
108 ExpectMaster5000 em = new ExpectMaster5000();
109 em.location(parameterService.getExternalId());
110 em.batchBegin(batchId);
111 em.table(TD1.table, TD1.keyColumns, TD1.columns);
112 em.insert(TD1.rowData);
113 em.batchEnd(batchId);
114
115 writer.flush();
116 Assert.assertEquals(stringWriter.toString(), em.toString());
117 } catch (IOException e) {
118 Assert.fail("BasicTeset failed");
119 }
120
121 reset();
122 }
123
124 @Test
125 public void biggerTest() {
126 TriggerHistory audit = makeTableSyncAuditId(TD1.keyColumns, TD1.columns);
127
128 try {
129 DataExtractorContext context = (DataExtractorContext) find(CONTEXT_NAME);
130
131 StringWriter stringWriter = new StringWriter();
132 BufferedWriter writer = new BufferedWriter(stringWriter);
133 dataExtractor.init(writer, context);
134
135 long batchId = 998850;
136 OutgoingBatch batch = new OutgoingBatch();
137 batch.setBatchId(batchId);
138 dataExtractor.begin(batch, writer);
139
140 Data data = new Data(TD1.dataId, TD1.key, TD1.rowData, DataEventType.INSERT, TD1.table, new Date(), audit);
141 dataExtractor.write(writer, data, context);
142
143 data = new Data(TD2.dataId, TD2.key, TD2.rowData, DataEventType.UPDATE, TD2.table, new Date(), audit);
144 dataExtractor.write(writer, data, context);
145 dataExtractor.commit(batch, writer);
146
147 ExpectMaster5000 em = new ExpectMaster5000();
148 em.location(parameterService.getExternalId());
149 em.batchBegin(batchId);
150 em.table(TD1.table, TD1.keyColumns, TD1.columns);
151 em.insert(TD1.rowData);
152 em.update(TD2.rowData, TD2.key);
153 em.batchEnd(batchId);
154
155 writer.flush();
156 Assert.assertEquals(stringWriter.toString(), em.toString());
157 } catch (IOException e) {
158 Assert.fail("BasicTeset failed");
159 }
160
161 reset();
162 }
163
164 @Test
165 public void notherTest() {
166 try {
167 DataExtractorContext context = (DataExtractorContext) find(CONTEXT_NAME);
168
169 StringWriter stringWriter = new StringWriter();
170 BufferedWriter writer = new BufferedWriter(stringWriter);
171 dataExtractor.init(writer, context);
172
173 long batchId = 998860;
174 OutgoingBatch batch = new OutgoingBatch();
175 batch.setBatchId(batchId);
176 dataExtractor.begin(batch, writer);
177
178 TriggerHistory audit = makeTableSyncAuditId(TD1.keyColumns, TD1.columns);
179 Data data = new Data(TD1.dataId, TD1.key, TD1.rowData, DataEventType.INSERT, TD1.table, new Date(), audit);
180 dataExtractor.write(writer, data, context);
181
182 audit = makeTableSyncAuditId(TD3.keyColumns, TD3.columns);
183 data = new Data(TD3.dataId, TD3.key, TD3.rowData, DataEventType.UPDATE, TD3.table, new Date(), audit);
184 dataExtractor.write(writer, data, context);
185 data = new Data(TD3.dataId, TD3.key, TD3.rowData, DataEventType.DELETE, TD3.table, new Date(), audit);
186 dataExtractor.write(writer, data, context);
187 dataExtractor.commit(batch, writer);
188
189 ExpectMaster5000 em = new ExpectMaster5000();
190 em.location(parameterService.getExternalId());
191 em.batchBegin(batchId);
192 em.table(TD1.table, TD1.keyColumns, TD1.columns);
193 em.insert(TD1.rowData);
194 em.table(TD3.table, TD3.keyColumns, TD3.columns);
195 em.update(TD3.rowData, TD3.key);
196 em.delete(TD3.key);
197 em.batchEnd(batchId);
198
199 writer.flush();
200 Assert.assertEquals(stringWriter.toString(), em.toString());
201 } catch (IOException e) {
202 Assert.fail("BasicTest failed");
203 }
204
205 reset();
206 }
207
208 @Test
209 public void changingTables() {
210 TriggerHistory audit = makeTableSyncAuditId(TD1.keyColumns, TD1.columns);
211 TriggerHistory audit2 = makeTableSyncAuditId(TD4.keyColumns, TD4.columns);
212
213 try {
214 DataExtractorContext context = (DataExtractorContext)find(CONTEXT_NAME);
215
216 StringWriter stringWriter = new StringWriter();
217 BufferedWriter writer = new BufferedWriter(stringWriter);
218 dataExtractor.init(writer, context);
219
220 long batchId = 998800;
221 OutgoingBatch batch = new OutgoingBatch();
222 batch.setBatchId(batchId);
223 dataExtractor.begin(batch, writer);
224
225 Data data = new Data(TD1.dataId, TD1.key, TD1.rowData, DataEventType.INSERT, TD1.table, new Date(), audit);
226 dataExtractor.write(writer, data, context);
227
228 data = new Data(TD4.dataId, TD4.key, TD4.rowData, DataEventType.UPDATE, TD4.table, new Date(), audit2);
229 dataExtractor.write(writer, data, context);
230
231 data = new Data(TD2.dataId, TD2.key, TD2.rowData, DataEventType.UPDATE, TD2.table, new Date(), audit);
232 dataExtractor.write(writer, data, context);
233 dataExtractor.commit(batch, writer);
234
235 ExpectMaster5000 em = new ExpectMaster5000();
236 em.location(parameterService.getExternalId());
237 em.batchBegin(batchId);
238 em.table(TD1.table, TD1.keyColumns, TD1.columns);
239 em.insert(TD1.rowData);
240 em.table(TD4.table, TD4.keyColumns, TD4.columns);
241 em.update(TD4.rowData, TD4.key);
242 em.table(TD1.table);
243 em.update(TD2.rowData, TD2.key);
244 em.batchEnd(batchId);
245
246 writer.flush();
247 Assert.assertEquals(stringWriter.toString(), em.toString());
248 } catch (IOException e) {
249 Assert.fail("BasicTeset failed");
250 }
251
252 reset();
253 }
254
255 protected void reset() {
256 this.getJdbcTemplate().execute(new ConnectionCallback() {
257 public Object doInConnection(Connection connection) throws SQLException, DataAccessException {
258 Statement s = connection.createStatement();
259 s.executeUpdate("delete from sym_trigger_hist where source_table_name = '" + TABLE_NAME + "'");
260 return null;
261 }
262 });
263 }
264
265 private TriggerHistory makeTableSyncAuditId(final String pk, final String col) {
266 String sql = "insert into sym_trigger_hist (trigger_hist_id, source_table_name, source_schema_name, trigger_id, column_names, pk_column_names,name_for_update_trigger,name_for_delete_trigger, name_for_insert_trigger,table_hash,trigger_row_hash,last_trigger_build_reason,create_time) values (null, '"
267 + TABLE_NAME + "','symmetric',1,'" + col + "' , '" + pk + "','a','b','c',1,1,'T',current_timestamp)";
268 long key = dbDialect.insertWithGeneratedKey(sql, SequenceIdentifier.TRIGGER_HIST);
269 TriggerHistory audit = new TriggerHistory(TABLE_NAME, pk, col);
270 audit.setTriggerHistoryId((int) key);
271 return audit;
272 }
273
274 class ExpectMaster5000 {
275 StringWriter base;
276
277 BufferedWriter writer;
278
279 ExpectMaster5000() {
280 base = new StringWriter();
281 writer = new BufferedWriter(base);
282 }
283
284 void location(String location) throws IOException {
285 writeCSV(CsvConstants.NODEID);
286 writer.write(location);
287 writer.newLine();
288 }
289
290 void batchBegin(long batchId) throws IOException {
291 writeCSV(CsvConstants.BATCH);
292 writer.write(new Long(batchId).toString());
293 writer.newLine();
294 writeCSV(CsvConstants.BINARY);
295 writer.write(getDbDialect().getBinaryEncoding().name());
296 writer.newLine();
297 }
298
299 void batchEnd(long batchId) throws IOException {
300 writeCSV(CsvConstants.COMMIT);
301 writer.write(Long.toString(batchId));
302 writer.newLine();
303 }
304
305 void table(String tableName, String pk, String cols) throws IOException {
306 writeCSV(CsvConstants.TABLE);
307 writer.write(tableName);
308 writer.newLine();
309 writeCSV(CsvConstants.KEYS);
310 writer.write(pk);
311 writer.newLine();
312 writeCSV(CsvConstants.COLUMNS);
313 writer.write(cols);
314 writer.newLine();
315 }
316
317 void insert(String data) throws IOException {
318 writeCSV(CsvConstants.INSERT);
319 writer.write(data);
320 writer.newLine();
321 }
322
323 void update(String rowData, String pk) throws IOException {
324 writeCSV(CsvConstants.UPDATE);
325 writeCSV(rowData);
326 writer.write(pk);
327 writer.newLine();
328 }
329
330 void delete(String pk) throws IOException {
331 writeCSV(CsvConstants.DELETE);
332 writer.write(pk);
333 writer.newLine();
334 }
335
336 void table(String t) throws IOException {
337 writeCSV(CsvConstants.TABLE);
338 writer.write(t);
339 writer.newLine();
340 }
341
342 private void writeCSV(String constant) throws IOException {
343 writer.write(constant);
344 writer.write(", ");
345 }
346
347 @Override
348 public String toString() {
349 try {
350 writer.flush();
351 return base.toString();
352 } catch (IOException e) {
353 Assert.fail();
354 }
355 return null;
356 }
357
358 @Override
359 public boolean equals(Object other) {
360 if (other instanceof String) {
361 try {
362 String s = (String) other;
363 writer.flush();
364 String out = base.toString();
365 return out.equals(s);
366 } catch (IOException e) {
367 Assert.fail();
368 }
369 }
370
371 return false;
372 }
373 }
374
375 class TestData {
376 String table;
377
378 String rowData;
379
380 String key;
381
382 long dataId;
383
384 String keyColumns;
385
386 String columns;
387
388 TestData(long dataId, String table, String rowData, String key, String keyColumns, String columns) {
389 this.dataId = dataId;
390 this.table = table;
391 this.rowData = rowData;
392 this.key = key;
393 this.keyColumns = keyColumns;
394 this.columns = columns;
395 }
396 }
397
398 }