View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>,
5    *               Eric Long <erilong@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.service.impl;
23  
24  import java.io.BufferedWriter;
25  import java.io.IOException;
26  import java.io.OutputStream;
27  import java.sql.Connection;
28  import java.sql.PreparedStatement;
29  import java.sql.ResultSet;
30  import java.sql.SQLException;
31  import java.util.ArrayList;
32  import java.util.Date;
33  import java.util.List;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.jumpmind.symmetric.Version;
38  import org.jumpmind.symmetric.common.Constants;
39  import org.jumpmind.symmetric.common.TableConstants;
40  import org.jumpmind.symmetric.db.IDbDialect;
41  import org.jumpmind.symmetric.extract.DataExtractorContext;
42  import org.jumpmind.symmetric.extract.IDataExtractor;
43  import org.jumpmind.symmetric.extract.IExtractorFilter;
44  import org.jumpmind.symmetric.extract.csv.Util;
45  import org.jumpmind.symmetric.model.BatchType;
46  import org.jumpmind.symmetric.model.Data;
47  import org.jumpmind.symmetric.model.DataEventType;
48  import org.jumpmind.symmetric.model.Node;
49  import org.jumpmind.symmetric.model.NodeChannel;
50  import org.jumpmind.symmetric.model.OutgoingBatch;
51  import org.jumpmind.symmetric.model.OutgoingBatchHistory;
52  import org.jumpmind.symmetric.model.Trigger;
53  import org.jumpmind.symmetric.model.TriggerHistory;
54  import org.jumpmind.symmetric.model.OutgoingBatch.Status;
55  import org.jumpmind.symmetric.service.IAcknowledgeService;
56  import org.jumpmind.symmetric.service.IConfigurationService;
57  import org.jumpmind.symmetric.service.IDataExtractorService;
58  import org.jumpmind.symmetric.service.IExtractListener;
59  import org.jumpmind.symmetric.service.INodeService;
60  import org.jumpmind.symmetric.service.IOutgoingBatchService;
61  import org.jumpmind.symmetric.transport.IOutgoingTransport;
62  import org.jumpmind.symmetric.transport.TransportUtils;
63  import org.springframework.beans.BeansException;
64  import org.springframework.beans.factory.BeanFactory;
65  import org.springframework.beans.factory.BeanFactoryAware;
66  import org.springframework.dao.DataAccessException;
67  import org.springframework.jdbc.core.ConnectionCallback;
68  import org.springframework.jdbc.support.JdbcUtils;
69  
70  public class DataExtractorService extends AbstractService implements IDataExtractorService, BeanFactoryAware {
71  
72      protected static final Log logger = LogFactory.getLog(DataExtractorService.class);
73  
74      private IOutgoingBatchService outgoingBatchService;
75  
76      private IConfigurationService configurationService;
77  
78      private IAcknowledgeService acknowledgeService;
79  
80      private INodeService nodeService;
81  
82      private IDbDialect dbDialect;
83  
84      private BeanFactory beanFactory;
85  
86      private DataExtractorContext clonableContext;
87  
88      private List<IExtractorFilter> extractorFilters;
89  
90      /***
91       * @see DataExtractorService#extractConfigurationStandalone(Node,
92       *      BufferedWriter)
93       */
94      public void extractConfigurationStandalone(Node node, OutputStream out) throws IOException {
95          this.extractConfigurationStandalone(node, TransportUtils.toWriter(out));
96      }
97  
98      /***
99       * Extract the SymmetricDS configuration for the passed in {@link Node}.
100      * Note that this method will insert an already acknowledged batch to
101      * indicate that the configuration was sent. If the configuration fails to
102      * load for some reason on the client the batch status will NOT reflect the
103      * failure.
104      */
105     public void extractConfigurationStandalone(Node node, BufferedWriter writer) throws IOException {
106 
107         try {
108             OutgoingBatch batch = new OutgoingBatch(node, Constants.CHANNEL_CONFIG, BatchType.INITIAL_LOAD);
109             outgoingBatchService.insertOutgoingBatch(batch);
110             OutgoingBatchHistory history = new OutgoingBatchHistory(batch);
111 
112             final IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
113             final DataExtractorContext ctxCopy = clonableContext.copy(dataExtractor);
114 
115             dataExtractor.init(writer, ctxCopy);
116             dataExtractor.begin(batch, writer);
117 
118             extractConfiguration(node, writer, ctxCopy);
119 
120             dataExtractor.commit(batch, writer);
121 
122             history.setStatus(OutgoingBatchHistory.Status.SE);
123             history.setEndTime(new Date());
124             outgoingBatchService.insertOutgoingBatchHistory(history);
125 
126             // acknowledge right away, because the acknowledgment is not
127             // built into the registration protocol.
128             acknowledgeService.ack(batch.getBatchInfo());
129 
130         } finally {
131             writer.flush();
132         }
133     }
134 
135     public void extractConfiguration(Node node, BufferedWriter writer, DataExtractorContext ctx) throws IOException {
136         List<Trigger> triggers = configurationService.getConfigurationTriggers(parameterService.getNodeGroupId(), node
137                 .getNodeGroupId(), true);
138         if (node != null && node.isVersionGreaterThanOrEqualTo(1, 5, 0)) {
139             for (int i = triggers.size() - 1; i >= 0; i--) {
140                 Trigger trigger = triggers.get(i);
141                 StringBuilder sql = new StringBuilder(dbDialect.createPurgeSqlFor(node, trigger, null));
142                 addPurgeCriteriaToConfigurationTables(trigger.getSourceTableName(), sql);
143                 Util.writeSql(sql.toString(), writer);
144             }
145         }
146 
147         for (int i = 0; i < triggers.size(); i++) {
148             Trigger trigger = triggers.get(i);
149             TriggerHistory hist = new TriggerHistory(dbDialect.getMetaDataFor(trigger, false), trigger);
150             hist.setTriggerHistoryId(Integer.MAX_VALUE - i);
151             if (!trigger.getSourceTableName().endsWith(TableConstants.SYM_NODE_IDENTITY)) {
152                 writeInitialLoad(node, trigger, hist, writer, null, ctx);
153             } else {
154                 Data data = new Data(1, null, node.getNodeId(), DataEventType.INSERT, trigger.getSourceTableName(),
155                         null, hist);
156                 ctx.getDataExtractor().write(writer, data, ctx);
157             }
158         }
159     }
160 
161     private void addPurgeCriteriaToConfigurationTables(String sourceTableName, StringBuilder sql) {
162         if ((TableConstants.getTableName(dbDialect.getTablePrefix(), TableConstants.SYM_NODE)
163                 .equalsIgnoreCase(sourceTableName))
164                 || TableConstants.getTableName(dbDialect.getTablePrefix(), TableConstants.SYM_NODE_SECURITY)
165                         .equalsIgnoreCase(sourceTableName)) {
166             Node me = nodeService.findIdentity();
167             if (me != null) {
168                 sql.append(String.format(" where created_at_node_id='%s'", me.getNodeId()));
169             }
170         }
171     }
172 
173     private IDataExtractor getDataExtractor(String version) {
174         String beanName = Constants.DATA_EXTRACTOR;
175         // Version 1.4.1-appaji accepts "old" token, so it's like a 1.5 version
176         if (version != null) {
177             int[] versions = Version.parseVersion(version);
178             if (versions[0] == 1) {
179                 if (versions[1] <= 2) {
180                     beanName += "10";
181                 } else if (versions[1] <= 3) {
182                     beanName += "13";
183                 } else if (versions[1] <= 4 && !version.equals("1.4.1-appaji")) {
184                     beanName += "14";
185                 }
186             }
187         }
188         return (IDataExtractor) beanFactory.getBean(beanName);
189     }
190 
191     public OutgoingBatch extractInitialLoadFor(Node node, Trigger trigger, BufferedWriter writer) {
192         OutgoingBatch batch = new OutgoingBatch(node, trigger.getChannelId(), BatchType.INITIAL_LOAD);
193         outgoingBatchService.insertOutgoingBatch(batch);
194         OutgoingBatchHistory history = new OutgoingBatchHistory(batch);
195         writeInitialLoad(node, trigger, writer, batch, null);
196         history.setStatus(OutgoingBatchHistory.Status.SE);
197         history.setEndTime(new Date());
198         outgoingBatchService.insertOutgoingBatchHistory(history);
199         return batch;
200     }
201 
202     public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, BufferedWriter writer,
203             DataExtractorContext ctx) {
204         writeInitialLoad(node, trigger, writer, null, ctx);
205     }
206 
207     protected void writeInitialLoad(Node node, Trigger trigger, BufferedWriter writer, final OutgoingBatch batch,
208             final DataExtractorContext ctx) {
209         writeInitialLoad(node, trigger, configurationService.getLatestHistoryRecordFor(trigger.getTriggerId()), writer,
210                 batch, ctx);
211     }
212 
213     /***
214      * 
215      * @param node
216      * @param trigger
217      * @param hist
218      * @param transport
219      * @param batch
220      *                If null, then assume this 'initial load' is part of
221      *                another batch.
222      * @param ctx
223      */
224     protected void writeInitialLoad(Node node, final Trigger trigger, final TriggerHistory hist,
225             final BufferedWriter writer, final OutgoingBatch batch, final DataExtractorContext ctx) {
226 
227         final String sql = dbDialect.createInitalLoadSqlFor(node, trigger);
228 
229         final IDataExtractor dataExtractor = ctx != null ? ctx.getDataExtractor() : getDataExtractor(node
230                 .getSymmetricVersion());
231 
232         jdbcTemplate.execute(new ConnectionCallback() {
233             public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
234                 try {
235                     PreparedStatement st = null;
236                     ResultSet rs = null;
237                     try {
238                         st = conn.prepareStatement(sql, java.sql.ResultSet.TYPE_FORWARD_ONLY,
239                                 java.sql.ResultSet.CONCUR_READ_ONLY);
240                         st.setFetchSize(dbDialect.getStreamingResultsFetchSize());
241                         rs = st.executeQuery();
242                         final DataExtractorContext ctxCopy = ctx == null ? clonableContext.copy(dataExtractor) : ctx;
243                         if (batch != null) {
244                             dataExtractor.init(writer, ctxCopy);
245                             dataExtractor.begin(batch, writer);
246                         }
247                         while (rs.next()) {
248                             dataExtractor.write(writer, new Data(0, null, rs.getString(1), DataEventType.INSERT, hist
249                                     .getSourceTableName(), null, hist), ctxCopy);
250                         }
251                         if (batch != null) {
252                             dataExtractor.commit(batch, writer);
253                         }
254                     } finally {
255                         JdbcUtils.closeResultSet(rs);
256                         JdbcUtils.closeStatement(st);
257                     }
258                     return null;
259                 } catch (Exception e) {
260                     throw new RuntimeException("Error during SQL: " + sql, e);
261                 }
262             }
263         });
264     }
265 
266     public boolean extract(Node node, IOutgoingTransport transport) throws Exception {
267         IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
268         ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
269         return extract(node, handler);
270     }
271 
272     /***
273      * Allow a handler callback to do the work so we can route the extracted
274      * data to other types of handlers for processing.
275      */
276     public boolean extract(Node node, final IExtractListener handler) throws Exception {
277 
278         List<NodeChannel> channels = configurationService.getChannels();
279 
280         for (NodeChannel nodeChannel : channels) {
281             outgoingBatchService.buildOutgoingBatches(node.getNodeId(), nodeChannel);
282         }
283 
284         List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatches(node.getNodeId());
285 
286         if (batches != null && batches.size() > 0) {
287             OutgoingBatchHistory history = null;
288             try {
289                 handler.init();
290                 for (final OutgoingBatch batch : batches) {
291                     history = new OutgoingBatchHistory(batch);
292                     handler.startBatch(batch);
293                     selectEventDataToExtract(handler, batch);
294                     handler.endBatch(batch);
295                     history.setStatus(OutgoingBatchHistory.Status.SE);
296                     history.setEndTime(new Date());
297                     outgoingBatchService.insertOutgoingBatchHistory(history);
298                 }
299             } catch (RuntimeException e) {
300                 SQLException se = unwrapSqlException(e);
301                 if (history != null) {
302                     if (se != null) {
303                         history.setSqlState(se.getSQLState());
304                         history.setSqlCode(se.getErrorCode());
305                         history.setSqlMessage(se.getMessage());
306                     } else {
307                         history.setSqlMessage(e.getMessage());
308                     }
309                     history.setStatus(OutgoingBatchHistory.Status.SE);
310                     history.setEndTime(new Date());
311                     outgoingBatchService.setBatchStatus(history.getBatchId(), Status.ER);
312                     outgoingBatchService.insertOutgoingBatchHistory(history);
313                 } else {
314                     logger.error(
315                             "Could not log the outgoing batch status because the batch history has not been created.",
316                             e);
317                 }
318                 throw e;
319             } finally {
320                 handler.done();
321             }
322             return true;
323         }
324         return false;
325     }
326 
327     public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId)
328             throws Exception {
329         IDataExtractor dataExtractor = getDataExtractor(null);
330         ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
331         return extractBatchRange(handler, startBatchId, endBatchId);
332     }
333 
334     public boolean extractBatchRange(final IExtractListener handler, String startBatchId, String endBatchId)
335             throws Exception {
336 
337         List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatchRange(startBatchId, endBatchId);
338 
339         if (batches != null && batches.size() > 0) {
340             try {
341                 handler.init();
342                 for (final OutgoingBatch batch : batches) {
343                     handler.startBatch(batch);
344                     selectEventDataToExtract(handler, batch);
345                     handler.endBatch(batch);
346                 }
347             } finally {
348                 handler.done();
349             }
350             return true;
351         }
352         return false;
353     }
354 
355     private void selectEventDataToExtract(final IExtractListener handler, final OutgoingBatch batch) {
356         jdbcTemplate.execute(new ConnectionCallback() {
357             public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
358                 PreparedStatement ps = conn.prepareStatement(getSql("selectEventDataToExtractSql"),
359                         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
360                 ps.setFetchSize(dbDialect.getStreamingResultsFetchSize());
361                 ps.setString(1, batch.getNodeId());
362                 ps.setLong(2, batch.getBatchId());
363                 ResultSet rs = ps.executeQuery();
364                 try {
365                     while (rs.next()) {
366                         try {
367                             handler.dataExtracted(next(rs));
368                         } catch (RuntimeException e) {
369                             throw e;
370                         } catch (Exception e) {
371                             throw new RuntimeException(e);
372                         }
373                     }
374                 } finally {
375                     JdbcUtils.closeResultSet(rs);
376                     JdbcUtils.closeStatement(ps);
377                 }
378                 return null;
379             }
380         });
381     }
382 
383     private Data next(ResultSet results) throws SQLException {
384         long dataId = results.getLong(1);
385         String tableName = results.getString(2);
386         DataEventType eventType = DataEventType.getEventType(results.getString(3));
387         String rowData = results.getString(4);
388         String pk = results.getString(5);
389         String oldData = results.getString(6);
390         Date created = results.getDate(7);
391         int histId = results.getInt(8);
392         TriggerHistory hist = configurationService.getHistoryRecordFor(histId);
393         Data data = new Data(dataId, pk, rowData, eventType, tableName, created, hist);
394         data.setOldData(oldData);
395         return data;
396     }
397 
398     public void setOutgoingBatchService(IOutgoingBatchService batchBuilderService) {
399         this.outgoingBatchService = batchBuilderService;
400     }
401 
402     public void setContext(DataExtractorContext context) {
403         this.clonableContext = context;
404     }
405 
406     public void setDbDialect(IDbDialect dialect) {
407         this.dbDialect = dialect;
408     }
409 
410     public void setConfigurationService(IConfigurationService configurationService) {
411         this.configurationService = configurationService;
412     }
413 
414     class ExtractStreamHandler implements IExtractListener {
415 
416         IOutgoingTransport transport;
417 
418         IDataExtractor dataExtractor;
419 
420         DataExtractorContext context;
421 
422         BufferedWriter writer;
423 
424         ExtractStreamHandler(IDataExtractor dataExtractor, IOutgoingTransport transport) throws Exception {
425             this.transport = transport;
426             this.dataExtractor = dataExtractor;
427         }
428 
429         public void dataExtracted(Data data) throws Exception {
430             if (extractorFilters != null) {
431                 for (IExtractorFilter filter : extractorFilters) {
432                     if (!filter.filterData(data, context)) {
433                         // short circuit the extract if instructed
434                         return;
435                     }
436                 }
437             }
438             dataExtractor.write(writer, data, context);
439         }
440 
441         public void done() throws IOException {
442         }
443 
444         public void endBatch(OutgoingBatch batch) throws Exception {
445             dataExtractor.commit(batch, writer);
446         }
447 
448         public void init() throws Exception {
449             this.writer = transport.open();
450             this.context = DataExtractorService.this.clonableContext.copy(dataExtractor);
451             dataExtractor.init(writer, context);
452         }
453 
454         public void startBatch(OutgoingBatch batch) throws Exception {
455             context.setBatch(batch);
456             dataExtractor.begin(batch, writer);
457         }
458 
459     }
460 
461     public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
462         this.beanFactory = beanFactory;
463     }
464 
465     public void addExtractorFilter(IExtractorFilter extractorFilter) {
466         if (this.extractorFilters == null) {
467             this.extractorFilters = new ArrayList<IExtractorFilter>();
468         }
469         this.extractorFilters.add(extractorFilter);
470     }
471 
472     public void setExtractorFilters(List<IExtractorFilter> extractorFilters) {
473         this.extractorFilters = extractorFilters;
474     }
475 
476     public void setAcknowledgeService(IAcknowledgeService acknowledgeService) {
477         this.acknowledgeService = acknowledgeService;
478     }
479 
480     public void setNodeService(INodeService nodeService) {
481         this.nodeService = nodeService;
482     }
483 
484 }