1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.service.impl;
23
24 import java.io.BufferedWriter;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.sql.Connection;
28 import java.sql.PreparedStatement;
29 import java.sql.ResultSet;
30 import java.sql.SQLException;
31 import java.util.ArrayList;
32 import java.util.Date;
33 import java.util.List;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.jumpmind.symmetric.Version;
38 import org.jumpmind.symmetric.common.Constants;
39 import org.jumpmind.symmetric.common.TableConstants;
40 import org.jumpmind.symmetric.db.IDbDialect;
41 import org.jumpmind.symmetric.extract.DataExtractorContext;
42 import org.jumpmind.symmetric.extract.IDataExtractor;
43 import org.jumpmind.symmetric.extract.IExtractorFilter;
44 import org.jumpmind.symmetric.extract.csv.Util;
45 import org.jumpmind.symmetric.model.BatchType;
46 import org.jumpmind.symmetric.model.Data;
47 import org.jumpmind.symmetric.model.DataEventType;
48 import org.jumpmind.symmetric.model.Node;
49 import org.jumpmind.symmetric.model.NodeChannel;
50 import org.jumpmind.symmetric.model.OutgoingBatch;
51 import org.jumpmind.symmetric.model.OutgoingBatchHistory;
52 import org.jumpmind.symmetric.model.Trigger;
53 import org.jumpmind.symmetric.model.TriggerHistory;
54 import org.jumpmind.symmetric.model.OutgoingBatch.Status;
55 import org.jumpmind.symmetric.service.IAcknowledgeService;
56 import org.jumpmind.symmetric.service.IConfigurationService;
57 import org.jumpmind.symmetric.service.IDataExtractorService;
58 import org.jumpmind.symmetric.service.IExtractListener;
59 import org.jumpmind.symmetric.service.INodeService;
60 import org.jumpmind.symmetric.service.IOutgoingBatchService;
61 import org.jumpmind.symmetric.transport.IOutgoingTransport;
62 import org.jumpmind.symmetric.transport.TransportUtils;
63 import org.springframework.beans.BeansException;
64 import org.springframework.beans.factory.BeanFactory;
65 import org.springframework.beans.factory.BeanFactoryAware;
66 import org.springframework.dao.DataAccessException;
67 import org.springframework.jdbc.core.ConnectionCallback;
68 import org.springframework.jdbc.support.JdbcUtils;
69
70 public class DataExtractorService extends AbstractService implements IDataExtractorService, BeanFactoryAware {
71
72 protected static final Log logger = LogFactory.getLog(DataExtractorService.class);
73
74 private IOutgoingBatchService outgoingBatchService;
75
76 private IConfigurationService configurationService;
77
78 private IAcknowledgeService acknowledgeService;
79
80 private INodeService nodeService;
81
82 private IDbDialect dbDialect;
83
84 private BeanFactory beanFactory;
85
86 private DataExtractorContext clonableContext;
87
88 private List<IExtractorFilter> extractorFilters;
89
90 /***
91 * @see DataExtractorService#extractConfigurationStandalone(Node,
92 * BufferedWriter)
93 */
94 public void extractConfigurationStandalone(Node node, OutputStream out) throws IOException {
95 this.extractConfigurationStandalone(node, TransportUtils.toWriter(out));
96 }
97
98 /***
99 * Extract the SymmetricDS configuration for the passed in {@link Node}.
100 * Note that this method will insert an already acknowledged batch to
101 * indicate that the configuration was sent. If the configuration fails to
102 * load for some reason on the client the batch status will NOT reflect the
103 * failure.
104 */
105 public void extractConfigurationStandalone(Node node, BufferedWriter writer) throws IOException {
106
107 try {
108 OutgoingBatch batch = new OutgoingBatch(node, Constants.CHANNEL_CONFIG, BatchType.INITIAL_LOAD);
109 outgoingBatchService.insertOutgoingBatch(batch);
110 OutgoingBatchHistory history = new OutgoingBatchHistory(batch);
111
112 final IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
113 final DataExtractorContext ctxCopy = clonableContext.copy(dataExtractor);
114
115 dataExtractor.init(writer, ctxCopy);
116 dataExtractor.begin(batch, writer);
117
118 extractConfiguration(node, writer, ctxCopy);
119
120 dataExtractor.commit(batch, writer);
121
122 history.setStatus(OutgoingBatchHistory.Status.SE);
123 history.setEndTime(new Date());
124 outgoingBatchService.insertOutgoingBatchHistory(history);
125
126
127
128 acknowledgeService.ack(batch.getBatchInfo());
129
130 } finally {
131 writer.flush();
132 }
133 }
134
135 public void extractConfiguration(Node node, BufferedWriter writer, DataExtractorContext ctx) throws IOException {
136 List<Trigger> triggers = configurationService.getConfigurationTriggers(parameterService.getNodeGroupId(), node
137 .getNodeGroupId(), true);
138 if (node != null && node.isVersionGreaterThanOrEqualTo(1, 5, 0)) {
139 for (int i = triggers.size() - 1; i >= 0; i--) {
140 Trigger trigger = triggers.get(i);
141 StringBuilder sql = new StringBuilder(dbDialect.createPurgeSqlFor(node, trigger, null));
142 addPurgeCriteriaToConfigurationTables(trigger.getSourceTableName(), sql);
143 Util.writeSql(sql.toString(), writer);
144 }
145 }
146
147 for (int i = 0; i < triggers.size(); i++) {
148 Trigger trigger = triggers.get(i);
149 TriggerHistory hist = new TriggerHistory(dbDialect.getMetaDataFor(trigger, false), trigger);
150 hist.setTriggerHistoryId(Integer.MAX_VALUE - i);
151 if (!trigger.getSourceTableName().endsWith(TableConstants.SYM_NODE_IDENTITY)) {
152 writeInitialLoad(node, trigger, hist, writer, null, ctx);
153 } else {
154 Data data = new Data(1, null, node.getNodeId(), DataEventType.INSERT, trigger.getSourceTableName(),
155 null, hist);
156 ctx.getDataExtractor().write(writer, data, ctx);
157 }
158 }
159 }
160
161 private void addPurgeCriteriaToConfigurationTables(String sourceTableName, StringBuilder sql) {
162 if ((TableConstants.getTableName(dbDialect.getTablePrefix(), TableConstants.SYM_NODE)
163 .equalsIgnoreCase(sourceTableName))
164 || TableConstants.getTableName(dbDialect.getTablePrefix(), TableConstants.SYM_NODE_SECURITY)
165 .equalsIgnoreCase(sourceTableName)) {
166 Node me = nodeService.findIdentity();
167 if (me != null) {
168 sql.append(String.format(" where created_at_node_id='%s'", me.getNodeId()));
169 }
170 }
171 }
172
173 private IDataExtractor getDataExtractor(String version) {
174 String beanName = Constants.DATA_EXTRACTOR;
175
176 if (version != null) {
177 int[] versions = Version.parseVersion(version);
178 if (versions[0] == 1) {
179 if (versions[1] <= 2) {
180 beanName += "10";
181 } else if (versions[1] <= 3) {
182 beanName += "13";
183 } else if (versions[1] <= 4 && !version.equals("1.4.1-appaji")) {
184 beanName += "14";
185 }
186 }
187 }
188 return (IDataExtractor) beanFactory.getBean(beanName);
189 }
190
191 public OutgoingBatch extractInitialLoadFor(Node node, Trigger trigger, BufferedWriter writer) {
192 OutgoingBatch batch = new OutgoingBatch(node, trigger.getChannelId(), BatchType.INITIAL_LOAD);
193 outgoingBatchService.insertOutgoingBatch(batch);
194 OutgoingBatchHistory history = new OutgoingBatchHistory(batch);
195 writeInitialLoad(node, trigger, writer, batch, null);
196 history.setStatus(OutgoingBatchHistory.Status.SE);
197 history.setEndTime(new Date());
198 outgoingBatchService.insertOutgoingBatchHistory(history);
199 return batch;
200 }
201
202 public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, BufferedWriter writer,
203 DataExtractorContext ctx) {
204 writeInitialLoad(node, trigger, writer, null, ctx);
205 }
206
207 protected void writeInitialLoad(Node node, Trigger trigger, BufferedWriter writer, final OutgoingBatch batch,
208 final DataExtractorContext ctx) {
209 writeInitialLoad(node, trigger, configurationService.getLatestHistoryRecordFor(trigger.getTriggerId()), writer,
210 batch, ctx);
211 }
212
213 /***
214 *
215 * @param node
216 * @param trigger
217 * @param hist
218 * @param transport
219 * @param batch
220 * If null, then assume this 'initial load' is part of
221 * another batch.
222 * @param ctx
223 */
224 protected void writeInitialLoad(Node node, final Trigger trigger, final TriggerHistory hist,
225 final BufferedWriter writer, final OutgoingBatch batch, final DataExtractorContext ctx) {
226
227 final String sql = dbDialect.createInitalLoadSqlFor(node, trigger);
228
229 final IDataExtractor dataExtractor = ctx != null ? ctx.getDataExtractor() : getDataExtractor(node
230 .getSymmetricVersion());
231
232 jdbcTemplate.execute(new ConnectionCallback() {
233 public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
234 try {
235 PreparedStatement st = null;
236 ResultSet rs = null;
237 try {
238 st = conn.prepareStatement(sql, java.sql.ResultSet.TYPE_FORWARD_ONLY,
239 java.sql.ResultSet.CONCUR_READ_ONLY);
240 st.setFetchSize(dbDialect.getStreamingResultsFetchSize());
241 rs = st.executeQuery();
242 final DataExtractorContext ctxCopy = ctx == null ? clonableContext.copy(dataExtractor) : ctx;
243 if (batch != null) {
244 dataExtractor.init(writer, ctxCopy);
245 dataExtractor.begin(batch, writer);
246 }
247 while (rs.next()) {
248 dataExtractor.write(writer, new Data(0, null, rs.getString(1), DataEventType.INSERT, hist
249 .getSourceTableName(), null, hist), ctxCopy);
250 }
251 if (batch != null) {
252 dataExtractor.commit(batch, writer);
253 }
254 } finally {
255 JdbcUtils.closeResultSet(rs);
256 JdbcUtils.closeStatement(st);
257 }
258 return null;
259 } catch (Exception e) {
260 throw new RuntimeException("Error during SQL: " + sql, e);
261 }
262 }
263 });
264 }
265
266 public boolean extract(Node node, IOutgoingTransport transport) throws Exception {
267 IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
268 ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
269 return extract(node, handler);
270 }
271
272 /***
273 * Allow a handler callback to do the work so we can route the extracted
274 * data to other types of handlers for processing.
275 */
276 public boolean extract(Node node, final IExtractListener handler) throws Exception {
277
278 List<NodeChannel> channels = configurationService.getChannels();
279
280 for (NodeChannel nodeChannel : channels) {
281 outgoingBatchService.buildOutgoingBatches(node.getNodeId(), nodeChannel);
282 }
283
284 List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatches(node.getNodeId());
285
286 if (batches != null && batches.size() > 0) {
287 OutgoingBatchHistory history = null;
288 try {
289 handler.init();
290 for (final OutgoingBatch batch : batches) {
291 history = new OutgoingBatchHistory(batch);
292 handler.startBatch(batch);
293 selectEventDataToExtract(handler, batch);
294 handler.endBatch(batch);
295 history.setStatus(OutgoingBatchHistory.Status.SE);
296 history.setEndTime(new Date());
297 outgoingBatchService.insertOutgoingBatchHistory(history);
298 }
299 } catch (RuntimeException e) {
300 SQLException se = unwrapSqlException(e);
301 if (history != null) {
302 if (se != null) {
303 history.setSqlState(se.getSQLState());
304 history.setSqlCode(se.getErrorCode());
305 history.setSqlMessage(se.getMessage());
306 } else {
307 history.setSqlMessage(e.getMessage());
308 }
309 history.setStatus(OutgoingBatchHistory.Status.SE);
310 history.setEndTime(new Date());
311 outgoingBatchService.setBatchStatus(history.getBatchId(), Status.ER);
312 outgoingBatchService.insertOutgoingBatchHistory(history);
313 } else {
314 logger.error(
315 "Could not log the outgoing batch status because the batch history has not been created.",
316 e);
317 }
318 throw e;
319 } finally {
320 handler.done();
321 }
322 return true;
323 }
324 return false;
325 }
326
327 public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId)
328 throws Exception {
329 IDataExtractor dataExtractor = getDataExtractor(null);
330 ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
331 return extractBatchRange(handler, startBatchId, endBatchId);
332 }
333
334 public boolean extractBatchRange(final IExtractListener handler, String startBatchId, String endBatchId)
335 throws Exception {
336
337 List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatchRange(startBatchId, endBatchId);
338
339 if (batches != null && batches.size() > 0) {
340 try {
341 handler.init();
342 for (final OutgoingBatch batch : batches) {
343 handler.startBatch(batch);
344 selectEventDataToExtract(handler, batch);
345 handler.endBatch(batch);
346 }
347 } finally {
348 handler.done();
349 }
350 return true;
351 }
352 return false;
353 }
354
355 private void selectEventDataToExtract(final IExtractListener handler, final OutgoingBatch batch) {
356 jdbcTemplate.execute(new ConnectionCallback() {
357 public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
358 PreparedStatement ps = conn.prepareStatement(getSql("selectEventDataToExtractSql"),
359 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
360 ps.setFetchSize(dbDialect.getStreamingResultsFetchSize());
361 ps.setString(1, batch.getNodeId());
362 ps.setLong(2, batch.getBatchId());
363 ResultSet rs = ps.executeQuery();
364 try {
365 while (rs.next()) {
366 try {
367 handler.dataExtracted(next(rs));
368 } catch (RuntimeException e) {
369 throw e;
370 } catch (Exception e) {
371 throw new RuntimeException(e);
372 }
373 }
374 } finally {
375 JdbcUtils.closeResultSet(rs);
376 JdbcUtils.closeStatement(ps);
377 }
378 return null;
379 }
380 });
381 }
382
383 private Data next(ResultSet results) throws SQLException {
384 long dataId = results.getLong(1);
385 String tableName = results.getString(2);
386 DataEventType eventType = DataEventType.getEventType(results.getString(3));
387 String rowData = results.getString(4);
388 String pk = results.getString(5);
389 String oldData = results.getString(6);
390 Date created = results.getDate(7);
391 int histId = results.getInt(8);
392 TriggerHistory hist = configurationService.getHistoryRecordFor(histId);
393 Data data = new Data(dataId, pk, rowData, eventType, tableName, created, hist);
394 data.setOldData(oldData);
395 return data;
396 }
397
398 public void setOutgoingBatchService(IOutgoingBatchService batchBuilderService) {
399 this.outgoingBatchService = batchBuilderService;
400 }
401
402 public void setContext(DataExtractorContext context) {
403 this.clonableContext = context;
404 }
405
406 public void setDbDialect(IDbDialect dialect) {
407 this.dbDialect = dialect;
408 }
409
410 public void setConfigurationService(IConfigurationService configurationService) {
411 this.configurationService = configurationService;
412 }
413
414 class ExtractStreamHandler implements IExtractListener {
415
416 IOutgoingTransport transport;
417
418 IDataExtractor dataExtractor;
419
420 DataExtractorContext context;
421
422 BufferedWriter writer;
423
424 ExtractStreamHandler(IDataExtractor dataExtractor, IOutgoingTransport transport) throws Exception {
425 this.transport = transport;
426 this.dataExtractor = dataExtractor;
427 }
428
429 public void dataExtracted(Data data) throws Exception {
430 if (extractorFilters != null) {
431 for (IExtractorFilter filter : extractorFilters) {
432 if (!filter.filterData(data, context)) {
433
434 return;
435 }
436 }
437 }
438 dataExtractor.write(writer, data, context);
439 }
440
441 public void done() throws IOException {
442 }
443
444 public void endBatch(OutgoingBatch batch) throws Exception {
445 dataExtractor.commit(batch, writer);
446 }
447
448 public void init() throws Exception {
449 this.writer = transport.open();
450 this.context = DataExtractorService.this.clonableContext.copy(dataExtractor);
451 dataExtractor.init(writer, context);
452 }
453
454 public void startBatch(OutgoingBatch batch) throws Exception {
455 context.setBatch(batch);
456 dataExtractor.begin(batch, writer);
457 }
458
459 }
460
461 public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
462 this.beanFactory = beanFactory;
463 }
464
465 public void addExtractorFilter(IExtractorFilter extractorFilter) {
466 if (this.extractorFilters == null) {
467 this.extractorFilters = new ArrayList<IExtractorFilter>();
468 }
469 this.extractorFilters.add(extractorFilter);
470 }
471
472 public void setExtractorFilters(List<IExtractorFilter> extractorFilters) {
473 this.extractorFilters = extractorFilters;
474 }
475
476 public void setAcknowledgeService(IAcknowledgeService acknowledgeService) {
477 this.acknowledgeService = acknowledgeService;
478 }
479
480 public void setNodeService(INodeService nodeService) {
481 this.nodeService = nodeService;
482 }
483
484 }