View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.jumpmind.symmetric.extract;
22  
23  import java.io.BufferedWriter;
24  import java.io.IOException;
25  import java.io.StringWriter;
26  import java.sql.Connection;
27  import java.sql.SQLException;
28  import java.sql.Statement;
29  import java.util.Date;
30  
31  import org.jumpmind.symmetric.common.Constants;
32  import org.jumpmind.symmetric.common.csv.CsvConstants;
33  import org.jumpmind.symmetric.db.IDbDialect;
34  import org.jumpmind.symmetric.db.SequenceIdentifier;
35  import org.jumpmind.symmetric.model.Data;
36  import org.jumpmind.symmetric.model.DataEventType;
37  import org.jumpmind.symmetric.model.OutgoingBatch;
38  import org.jumpmind.symmetric.model.TriggerHistory;
39  import org.jumpmind.symmetric.service.IParameterService;
40  import org.jumpmind.symmetric.test.AbstractDatabaseTest;
41  import org.junit.Assert;
42  import org.junit.Before;
43  import org.junit.Test;
44  import org.springframework.dao.DataAccessException;
45  import org.springframework.jdbc.core.ConnectionCallback;
46  
47  public class DataExtractorTest extends AbstractDatabaseTest {
48      
49      private static final String CONTEXT_NAME = "extractorContext";
50  
51      private static final String TABLE_NAME = "table1";
52  
53      private IDataExtractor dataExtractor;
54  
55      private IParameterService parameterService;
56  
57      private IDbDialect dbDialect;
58  
59      private final TestData TD1 = new TestData(999, "foo", "\"abc\", 123, \"xyz\"", "328", "basket_id",
60              "mango, watermellon, grape");
61  
62      private final TestData TD2 = new TestData(998, "foo", "\"www\", 888, \"ghi\"", "6578", "basket_id",
63              "mango, watermellon, grape");
64  
65      private final TestData TD3 = new TestData(997, "foo", "\"monday\", 879, \"ggg\"", "6502", "basket_id",
66              "grape, tomato, cucumber");
67  
68      private final TestData TD4 = new TestData(997, "bar", "\"monday\", 879, \"ggg\"", "6502", "basket_id",
69              "grape, tomato, cucumber");
70      
71      
72  
73      public DataExtractorTest() throws Exception {
74          super();
75      }
76  
77      public DataExtractorTest(String dbName) {
78          super(dbName);
79      }
80  
81      @Before
82      public void setUp() {
83          dataExtractor = (IDataExtractor) find(Constants.DATA_EXTRACTOR);
84          parameterService = (IParameterService) find(Constants.PARAMETER_SERVICE);
85          dbDialect = (IDbDialect) find(Constants.DB_DIALECT);
86      }
87  
88      @Test
89      public void basicTest() {
90          TriggerHistory audit = makeTableSyncAuditId(TD1.keyColumns, TD1.columns);
91  
92          try {
93              DataExtractorContext context = (DataExtractorContext) find(CONTEXT_NAME);
94  
95              StringWriter stringWriter = new StringWriter();
96              BufferedWriter writer = new BufferedWriter(stringWriter);
97              dataExtractor.init(writer, context);
98  
99              long batchId = 998877;
100             OutgoingBatch batch = new OutgoingBatch();
101             batch.setBatchId(batchId);
102             dataExtractor.begin(batch, writer);
103 
104             Data data = new Data(TD1.dataId, TD1.key, TD1.rowData, DataEventType.INSERT, TD1.table, new Date(), audit);
105             dataExtractor.write(writer, data, context);
106             dataExtractor.commit(batch, writer);
107 
108             ExpectMaster5000 em = new ExpectMaster5000();
109             em.location(parameterService.getExternalId());
110             em.batchBegin(batchId);
111             em.table(TD1.table, TD1.keyColumns, TD1.columns);
112             em.insert(TD1.rowData);
113             em.batchEnd(batchId);
114 
115             writer.flush();
116             Assert.assertEquals(stringWriter.toString(), em.toString());
117         } catch (IOException e) {
118             Assert.fail("BasicTeset failed");
119         }
120 
121         reset();
122     }
123 
124     @Test
125     public void biggerTest() {
126         TriggerHistory audit = makeTableSyncAuditId(TD1.keyColumns, TD1.columns);
127 
128         try {
129             DataExtractorContext context = (DataExtractorContext) find(CONTEXT_NAME);
130 
131             StringWriter stringWriter = new StringWriter();
132             BufferedWriter writer = new BufferedWriter(stringWriter);
133             dataExtractor.init(writer, context);
134 
135             long batchId = 998850;
136             OutgoingBatch batch = new OutgoingBatch();
137             batch.setBatchId(batchId);
138             dataExtractor.begin(batch, writer);
139 
140             Data data = new Data(TD1.dataId, TD1.key, TD1.rowData, DataEventType.INSERT, TD1.table, new Date(), audit);
141             dataExtractor.write(writer, data, context);
142 
143             data = new Data(TD2.dataId, TD2.key, TD2.rowData, DataEventType.UPDATE, TD2.table, new Date(), audit);
144             dataExtractor.write(writer, data, context);
145             dataExtractor.commit(batch, writer);
146 
147             ExpectMaster5000 em = new ExpectMaster5000();
148             em.location(parameterService.getExternalId());
149             em.batchBegin(batchId);
150             em.table(TD1.table, TD1.keyColumns, TD1.columns);
151             em.insert(TD1.rowData);
152             em.update(TD2.rowData, TD2.key);
153             em.batchEnd(batchId);
154 
155             writer.flush();
156             Assert.assertEquals(stringWriter.toString(), em.toString());
157         } catch (IOException e) {
158             Assert.fail("BasicTeset failed");
159         }
160 
161         reset();
162     }
163 
164     @Test
165     public void notherTest() {
166         try {
167             DataExtractorContext context = (DataExtractorContext) find(CONTEXT_NAME);
168 
169             StringWriter stringWriter = new StringWriter();
170             BufferedWriter writer = new BufferedWriter(stringWriter);
171             dataExtractor.init(writer, context);
172 
173             long batchId = 998860;
174             OutgoingBatch batch = new OutgoingBatch();
175             batch.setBatchId(batchId);
176             dataExtractor.begin(batch, writer);
177 
178             TriggerHistory audit = makeTableSyncAuditId(TD1.keyColumns, TD1.columns);
179             Data data = new Data(TD1.dataId, TD1.key, TD1.rowData, DataEventType.INSERT, TD1.table, new Date(), audit);
180             dataExtractor.write(writer, data, context);
181 
182             audit = makeTableSyncAuditId(TD3.keyColumns, TD3.columns);
183             data = new Data(TD3.dataId, TD3.key, TD3.rowData, DataEventType.UPDATE, TD3.table, new Date(), audit);
184             dataExtractor.write(writer, data, context);
185             data = new Data(TD3.dataId, TD3.key, TD3.rowData, DataEventType.DELETE, TD3.table, new Date(), audit);
186             dataExtractor.write(writer, data, context);
187             dataExtractor.commit(batch, writer);
188 
189             ExpectMaster5000 em = new ExpectMaster5000();
190             em.location(parameterService.getExternalId());
191             em.batchBegin(batchId);
192             em.table(TD1.table, TD1.keyColumns, TD1.columns);
193             em.insert(TD1.rowData);
194             em.table(TD3.table, TD3.keyColumns, TD3.columns);
195             em.update(TD3.rowData, TD3.key);
196             em.delete(TD3.key);
197             em.batchEnd(batchId);
198 
199             writer.flush();
200             Assert.assertEquals(stringWriter.toString(), em.toString());
201         } catch (IOException e) {
202             Assert.fail("BasicTest failed");
203         }
204 
205         reset();
206     }
207 
208     @Test
209     public void changingTables() {
210         TriggerHistory audit = makeTableSyncAuditId(TD1.keyColumns, TD1.columns);
211         TriggerHistory audit2 = makeTableSyncAuditId(TD4.keyColumns, TD4.columns);
212 
213         try {
214             DataExtractorContext context = (DataExtractorContext)find(CONTEXT_NAME);
215 
216             StringWriter stringWriter = new StringWriter();
217             BufferedWriter writer = new BufferedWriter(stringWriter);
218             dataExtractor.init(writer, context);
219 
220             long batchId = 998800;
221             OutgoingBatch batch = new OutgoingBatch();
222             batch.setBatchId(batchId);
223             dataExtractor.begin(batch, writer);
224 
225             Data data = new Data(TD1.dataId, TD1.key, TD1.rowData, DataEventType.INSERT, TD1.table, new Date(), audit);
226             dataExtractor.write(writer, data, context);
227 
228             data = new Data(TD4.dataId, TD4.key, TD4.rowData, DataEventType.UPDATE, TD4.table, new Date(), audit2);
229             dataExtractor.write(writer, data, context);
230 
231             data = new Data(TD2.dataId, TD2.key, TD2.rowData, DataEventType.UPDATE, TD2.table, new Date(), audit);
232             dataExtractor.write(writer, data, context);
233             dataExtractor.commit(batch, writer);
234 
235             ExpectMaster5000 em = new ExpectMaster5000();
236             em.location(parameterService.getExternalId());
237             em.batchBegin(batchId);
238             em.table(TD1.table, TD1.keyColumns, TD1.columns);
239             em.insert(TD1.rowData);
240             em.table(TD4.table, TD4.keyColumns, TD4.columns);
241             em.update(TD4.rowData, TD4.key);
242             em.table(TD1.table);
243             em.update(TD2.rowData, TD2.key);
244             em.batchEnd(batchId);
245 
246             writer.flush();
247             Assert.assertEquals(stringWriter.toString(), em.toString());
248         } catch (IOException e) {
249             Assert.fail("BasicTeset failed");
250         }
251 
252         reset();
253     }
254 
255     protected void reset() {
256         this.getJdbcTemplate().execute(new ConnectionCallback() {
257             public Object doInConnection(Connection connection) throws SQLException, DataAccessException {
258                 Statement s = connection.createStatement();
259                 s.executeUpdate("delete from sym_trigger_hist where source_table_name = '" + TABLE_NAME + "'");
260                 return null;
261             }
262         });
263     }
264 
265     private TriggerHistory makeTableSyncAuditId(final String pk, final String col) {
266         String sql = "insert into sym_trigger_hist (trigger_hist_id, source_table_name, source_schema_name, trigger_id, column_names, pk_column_names,name_for_update_trigger,name_for_delete_trigger, name_for_insert_trigger,table_hash,trigger_row_hash,last_trigger_build_reason,create_time) values (null, '"
267                 + TABLE_NAME + "','symmetric',1,'" + col + "' , '" + pk + "','a','b','c',1,1,'T',current_timestamp)";
268         long key = dbDialect.insertWithGeneratedKey(sql, SequenceIdentifier.TRIGGER_HIST);
269         TriggerHistory audit = new TriggerHistory(TABLE_NAME, pk, col);
270         audit.setTriggerHistoryId((int) key);
271         return audit;
272     }
273 
274     class ExpectMaster5000 {
275         StringWriter base;
276 
277         BufferedWriter writer;
278 
279         ExpectMaster5000() {
280             base = new StringWriter();
281             writer = new BufferedWriter(base);
282         }
283 
284         void location(String location) throws IOException {
285             writeCSV(CsvConstants.NODEID);
286             writer.write(location);
287             writer.newLine();
288         }
289 
290         void batchBegin(long batchId) throws IOException {
291             writeCSV(CsvConstants.BATCH);
292             writer.write(new Long(batchId).toString());
293             writer.newLine();
294             writeCSV(CsvConstants.BINARY);
295             writer.write(getDbDialect().getBinaryEncoding().name());
296             writer.newLine();
297         }
298 
299         void batchEnd(long batchId) throws IOException {
300             writeCSV(CsvConstants.COMMIT);
301             writer.write(Long.toString(batchId));
302             writer.newLine();
303         }
304 
305         void table(String tableName, String pk, String cols) throws IOException {
306             writeCSV(CsvConstants.TABLE);
307             writer.write(tableName);
308             writer.newLine();
309             writeCSV(CsvConstants.KEYS);
310             writer.write(pk);
311             writer.newLine();
312             writeCSV(CsvConstants.COLUMNS);
313             writer.write(cols);
314             writer.newLine();
315         }
316 
317         void insert(String data) throws IOException {
318             writeCSV(CsvConstants.INSERT);
319             writer.write(data);
320             writer.newLine();
321         }
322 
323         void update(String rowData, String pk) throws IOException {
324             writeCSV(CsvConstants.UPDATE);
325             writeCSV(rowData);
326             writer.write(pk);
327             writer.newLine();
328         }
329 
330         void delete(String pk) throws IOException {
331             writeCSV(CsvConstants.DELETE);
332             writer.write(pk);
333             writer.newLine();
334         }
335 
336         void table(String t) throws IOException {
337             writeCSV(CsvConstants.TABLE);
338             writer.write(t);
339             writer.newLine();
340         }
341 
342         private void writeCSV(String constant) throws IOException {
343             writer.write(constant);
344             writer.write(", ");
345         }
346 
347         @Override
348         public String toString() {
349             try {
350                 writer.flush();
351                 return base.toString();
352             } catch (IOException e) {
353                 Assert.fail();
354             }
355             return null;
356         }
357 
358         @Override
359         public boolean equals(Object other) {
360             if (other instanceof String) {
361                 try {
362                     String s = (String) other;
363                     writer.flush();
364                     String out = base.toString();
365                     return out.equals(s);
366                 } catch (IOException e) {
367                     Assert.fail();
368                 }
369             }
370 
371             return false;
372         }
373     }
374 
375     class TestData {
376         String table;
377 
378         String rowData;
379 
380         String key;
381 
382         long dataId;
383 
384         String keyColumns;
385 
386         String columns;
387 
388         TestData(long dataId, String table, String rowData, String key, String keyColumns, String columns) {
389             this.dataId = dataId;
390             this.table = table;
391             this.rowData = rowData;
392             this.key = key;
393             this.keyColumns = keyColumns;
394             this.columns = columns;
395         }
396     }
397 
398 }