View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Eric Long <erilong@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.jumpmind.symmetric.service.impl;
22  
23  import java.io.ByteArrayOutputStream;
24  import java.io.IOException;
25  import java.io.OutputStreamWriter;
26  import java.sql.PreparedStatement;
27  import java.sql.SQLException;
28  import java.sql.Types;
29  import java.util.ArrayList;
30  import java.util.HashMap;
31  import java.util.List;
32  import java.util.ListIterator;
33  import java.util.Map;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.jumpmind.symmetric.common.Constants;
38  import org.jumpmind.symmetric.common.ParameterConstants;
39  import org.jumpmind.symmetric.common.csv.CsvUtil;
40  import org.jumpmind.symmetric.db.IDbDialect;
41  import org.jumpmind.symmetric.db.SequenceIdentifier;
42  import org.jumpmind.symmetric.load.IReloadListener;
43  import org.jumpmind.symmetric.model.Data;
44  import org.jumpmind.symmetric.model.DataEvent;
45  import org.jumpmind.symmetric.model.DataEventType;
46  import org.jumpmind.symmetric.model.Node;
47  import org.jumpmind.symmetric.model.NodeChannel;
48  import org.jumpmind.symmetric.model.Trigger;
49  import org.jumpmind.symmetric.model.TriggerHistory;
50  import org.jumpmind.symmetric.service.IConfigurationService;
51  import org.jumpmind.symmetric.service.IDataService;
52  import org.jumpmind.symmetric.service.INodeService;
53  import org.jumpmind.symmetric.service.IOutgoingBatchService;
54  import org.jumpmind.symmetric.service.IPurgeService;
55  import org.springframework.dao.DataAccessException;
56  import org.springframework.jdbc.core.PreparedStatementCallback;
57  
58  import com.csvreader.CsvWriter;
59  
60  public class DataService extends AbstractService implements IDataService {
61  
62      static final Log logger = LogFactory.getLog(DataService.class);
63  
64      private IConfigurationService configurationService;
65  
66      private INodeService nodeService;
67  
68      private IPurgeService purgeService;
69  
70      private IOutgoingBatchService outgoingBatchService;
71  
72      private String tablePrefix;
73  
74      private IDbDialect dbDialect;
75  
76      private List<IReloadListener> listeners;
77  
78      public void insertReloadEvent(final Node targetNode, final Trigger trigger) {
79          insertReloadEvent(targetNode, trigger, null);
80      }
81      
82      public void insertReloadEvent(final Node targetNode, final Trigger trigger, final String overrideInitialLoadSelect) {
83          TriggerHistory history = lookupTriggerHistory(trigger);
84          // initial_load_select for table can be overridden by populating the
85          // row_data
86          Data data = new Data(history.getSourceTableName(), DataEventType.RELOAD, overrideInitialLoadSelect, null,
87                  history);
88          insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
89      }
90      
91      public void insertResendConfigEvent(final Node targetNode) {
92          Data data = new Data(Constants.NA, DataEventType.CONFIG, null, null, null);
93          insertDataEvent(data, Constants.CHANNEL_CONFIG, targetNode.getNodeId());
94      }    
95  
96      private TriggerHistory lookupTriggerHistory(Trigger trigger) {
97          TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
98  
99          if (history == null) {
100             throw new RuntimeException("Cannot find history for trigger " + trigger.getTriggerId() + ", "
101                     + trigger.getSourceTableName());
102         }
103         return history;
104     }
105 
106     public void insertPurgeEvent(final Node targetNode, final Trigger trigger) {
107         String sql = dbDialect.createPurgeSqlFor(targetNode, trigger, lookupTriggerHistory(trigger));
108         insertSqlEvent(targetNode, trigger, sql);
109     }
110 
111     public void insertSqlEvent(final Node targetNode, final Trigger trigger, String sql) {
112         TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
113         Data data = new Data(trigger.getSourceTableName(), DataEventType.SQL, CsvUtil.escapeCsvData(sql), null, history);
114         insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
115     }
116     
117     public void insertSqlEvent(final Node targetNode, String sql) {
118         Data data = new Data(Constants.NA, DataEventType.SQL, CsvUtil.escapeCsvData(sql), null, null);
119         insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
120     }    
121 
122     public void insertCreateEvent(final Node targetNode, final Trigger trigger, String xml) {
123         TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
124         Data data = new Data(trigger.getSourceTableName(), DataEventType.CREATE, CsvUtil.escapeCsvData(xml), null,
125                 history);
126         insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
127     }
128 
129     public long insertData(final Data data) {
130         return dbDialect.insertWithGeneratedKey(getSql("insertIntoDataSql"), SequenceIdentifier.DATA,
131                 new PreparedStatementCallback() {
132                     public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
133                         ps.setString(1, data.getTableName());
134                         ps.setString(2, data.getEventType().getCode());
135                         ps.setString(3, data.getRowData());
136                         ps.setString(4, data.getPkData());
137                         ps.setString(5, data.getOldData());
138                         ps.setLong(6, data.getAudit() != null ? data.getAudit().getTriggerHistoryId() : -1);
139                         return null;
140                     }
141                 });
142     }
143 
144     public void insertDataEvent(DataEvent dataEvent) {
145         jdbcTemplate.update(getSql("insertIntoDataEventSql"), new Object[] { dataEvent.getDataId(),
146                 dataEvent.getNodeId(), dataEvent.getChannelId(), dataEvent.getTransactionId(), dataEvent.getBatchId(),
147                 dataEvent.isBatched() ? 1 : 0 }, new int[] { Types.INTEGER, Types.VARCHAR, Types.VARCHAR,
148                 Types.VARCHAR, Types.INTEGER, Types.INTEGER });
149     }
150 
151     public void insertDataEvent(Data data, String channelId, List<Node> nodes) {
152         insertDataEvent(data, channelId, null, nodes);
153     }
154 
155     public void insertDataEvent(Data data, String channelId, String transactionId, List<Node> nodes) {
156         long dataId = insertData(data);
157         for (Node node : nodes) {
158             insertDataEvent(new DataEvent(dataId, node.getNodeId(), channelId, transactionId));
159         }
160     }
161 
162     public void insertDataEvent(Data data, String channelId, String nodeId) {
163         long dataId = insertData(data);
164         insertDataEvent(new DataEvent(dataId, nodeId, channelId));
165     }
166 
167     public String reloadNode(String nodeId) {
168         Node targetNode = nodeService.findNode(nodeId);
169         if (targetNode == null) {
170             return "Unknown node " + nodeId;
171         }
172         if (nodeService.setInitialLoadEnabled(nodeId, true)) {
173             return "Successfully opened initial load for node " + nodeId;
174         } else {
175             return "Could not open initial load for " + nodeId;
176         }
177     }
178 
179     public void insertReloadEvent(Node targetNode) {
180         Node sourceNode = nodeService.findIdentity();
181         if (listeners != null) {
182             for (IReloadListener listener : listeners) {
183                 listener.beforeReload(targetNode);
184             }
185         }
186         
187         // outgoing data events are pointless because we are reloading all data
188         purgeService.purgeAllOutgoingEventsForNode(targetNode.getNodeId());
189 
190         insertNodeSecurityUpdate(targetNode);
191         List<Trigger> triggers = configurationService.getActiveTriggersForReload(sourceNode.getNodeGroupId(),
192                 targetNode.getNodeGroupId());
193 
194         if (parameterService.is(ParameterConstants.AUTO_CREATE_SCHEMA_BEFORE_RELOAD)) {
195             for (Trigger trigger : triggers) {
196                 String xml = dbDialect.getCreateTableXML(trigger);
197                 insertCreateEvent(targetNode, trigger, xml);
198                 buildReloadBatches(targetNode.getNodeId());
199             }
200         }
201         if (parameterService.is(ParameterConstants.AUTO_DELETE_BEFORE_RELOAD)) {
202             for (ListIterator<Trigger> iterator = triggers.listIterator(triggers.size()); iterator.hasPrevious();) {
203                 Trigger trigger = iterator.previous();
204                 insertPurgeEvent(targetNode, trigger);
205                 buildReloadBatches(targetNode.getNodeId());
206             }
207         }
208 
209         for (Trigger trigger : triggers) {
210             insertReloadEvent(targetNode, trigger);
211             buildReloadBatches(targetNode.getNodeId());
212         }
213 
214         if (listeners != null) {
215             for (IReloadListener listener : listeners) {
216                 listener.afterReload(targetNode);
217             }
218         }
219         nodeService.setInitialLoadEnabled(targetNode.getNodeId(), false);
220         insertNodeSecurityUpdate(targetNode);
221 
222         // remove all incoming events from the node are starting a reload for.
223         purgeService.purgeAllIncomingEventsForNode(targetNode.getNodeId());
224     }
225 
226     /***
227      * This should be called after a reload event is inserted so there is a one
228      * to one between data events and reload batches.
229      */
230     private void buildReloadBatches(String nodeId) {
231         NodeChannel channel = new NodeChannel();
232         channel.setId(Constants.CHANNEL_RELOAD);
233         channel.setEnabled(true);
234         outgoingBatchService.buildOutgoingBatches(nodeId, channel);
235 
236     }
237 
238     private void insertNodeSecurityUpdate(Node node) {
239         Data data = createData(tablePrefix + "_node_security", " t.node_id = '" + node.getNodeId() + "'");
240         if (data != null) {
241             insertDataEvent(data, Constants.CHANNEL_RELOAD, node.getNodeId());
242         }
243     }
244 
245     public String sendSQL(String nodeId, String tableName, String sql) {
246         Node sourceNode = nodeService.findIdentity();
247         Node targetNode = nodeService.findNode(nodeId);
248         if (targetNode == null) {
249             return "Unknown node " + nodeId;
250         }
251 
252         Trigger trigger = configurationService.getTriggerFor(tableName, sourceNode.getNodeGroupId());
253         if (trigger == null) {
254             return "Trigger for table " + tableName + " does not exist from node " + sourceNode.getNodeGroupId();
255         }
256 
257         insertSqlEvent(targetNode, trigger, sql);
258         return "Successfully create SQL event for node " + targetNode.getNodeId();
259     }
260 
261     public String reloadTable(String nodeId, String tableName) {
262         return reloadTable(nodeId, tableName, null);
263     }
264 
265     public String reloadTable(String nodeId, String tableName, String overrideInitialLoadSelect) {
266         Node sourceNode = nodeService.findIdentity();
267         Node targetNode = nodeService.findNode(nodeId);
268         if (targetNode == null) {
269             return "Unknown node " + nodeId;
270         }
271 
272         Trigger trigger = configurationService.getTriggerFor(tableName, sourceNode.getNodeGroupId());
273         if (trigger == null) {
274             return "Trigger for table " + tableName + " does not exist from node " + sourceNode.getNodeGroupId();
275         }
276 
277         if (parameterService.is(ParameterConstants.AUTO_CREATE_SCHEMA_BEFORE_RELOAD)) {
278             String xml = dbDialect.getCreateTableXML(trigger);
279             insertCreateEvent(targetNode, trigger, xml);
280         } else if (parameterService.is(ParameterConstants.AUTO_DELETE_BEFORE_RELOAD)) {
281             insertPurgeEvent(targetNode, trigger);
282         }
283 
284         insertReloadEvent(targetNode, trigger, overrideInitialLoadSelect);
285 
286         return "Successfully created event to reload table " + tableName + " for node " + targetNode.getNodeId();
287     }
288 
289     /***
290      * Because we can't add a trigger on the _node table, we are artificially
291      * generating heartbeat events.
292      * 
293      * @param node
294      */
295     public void insertHeartbeatEvent(Node node) {
296         StringBuilder sql = new StringBuilder(getSql("updateNodeHeartbeatSql"));
297         sql.append("'");
298         sql.append(node.getNodeId());
299         sql.append("'");
300         List<Node> targets = nodeService.findNodesToPushTo();
301         for (Node targetNode : targets) {
302             insertSqlEvent(targetNode, sql.toString());
303         }
304     }
305 
306     public Data createData(String tableName) {
307         return createData(tableName, null);
308     }
309 
310     public Data createData(String tableName, String whereClause) {
311         Data data = null;
312         Trigger trigger = configurationService.getTriggerFor(tableName, parameterService.getNodeGroupId());
313         if (trigger != null) {
314             String rowData = null;
315             String pkData = null;
316             if (whereClause != null) {
317                 rowData = (String) jdbcTemplate.queryForObject(dbDialect.createCsvDataSql(trigger, whereClause),
318                         String.class);
319                 pkData = (String) jdbcTemplate.queryForObject(dbDialect.createCsvPrimaryKeySql(trigger, whereClause),
320                         String.class);
321             }
322             TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
323             data = new Data(trigger.getSourceTableName(), DataEventType.UPDATE, rowData, pkData, history);
324         }
325         return data;
326     }
327 
328     public Map<String, String> getRowDataAsMap(Data data) {
329         Map<String, String> map = new HashMap<String, String>();
330         String[] columnNames = CsvUtil.tokenizeCsvData(data.getAudit().getColumnNames());
331         String[] columnData = CsvUtil.tokenizeCsvData(data.getRowData());
332         for (int i = 0; i < columnNames.length; i++) {
333             map.put(columnNames[i].toLowerCase(), columnData[i]);
334         }
335         return map;
336     }
337 
338     public void setRowDataFromMap(Data data, Map<String, String> map) {
339         String[] columnNames = CsvUtil.tokenizeCsvData(data.getAudit().getColumnNames());
340         ByteArrayOutputStream out = new ByteArrayOutputStream();
341         CsvWriter writer = new CsvWriter(new OutputStreamWriter(out), ',');
342         writer.setEscapeMode(CsvWriter.ESCAPE_MODE_BACKSLASH);
343         for (String columnName : columnNames) {
344             try {
345                 writer.write(map.get(columnName.toLowerCase()), true);
346             } catch (IOException e) {
347             }
348         }
349         writer.close();
350         data.setRowData(out.toString());
351     }
352 
353     @Deprecated
354     public String[] tokenizeCsvData(String csvData) {
355         return CsvUtil.tokenizeCsvData(csvData);
356     }
357 
358     public void setReloadListeners(List<IReloadListener> listeners) {
359         this.listeners = listeners;
360     }
361 
362     public void addReloadListener(IReloadListener listener) {
363         if (listeners == null) {
364             listeners = new ArrayList<IReloadListener>();
365         }
366         listeners.add(listener);
367     }
368 
369     public void removeReloadListener(IReloadListener listener) {
370         listeners.remove(listener);
371     }
372 
373     public void setConfigurationService(IConfigurationService configurationService) {
374         this.configurationService = configurationService;
375     }
376 
377     public void setNodeService(INodeService nodeService) {
378         this.nodeService = nodeService;
379     }
380 
381     public void setDbDialect(IDbDialect dbDialect) {
382         this.dbDialect = dbDialect;
383     }
384 
385     public void setTablePrefix(String tablePrefix) {
386         this.tablePrefix = tablePrefix;
387     }
388 
389     public void setPurgeService(IPurgeService purgeService) {
390         this.purgeService = purgeService;
391     }
392 
393     public void setOutgoingBatchService(IOutgoingBatchService outgoingBatchService) {
394         this.outgoingBatchService = outgoingBatchService;
395     }
396 
397 }