1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.service.impl;
22
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.io.OutputStreamWriter;
26 import java.sql.PreparedStatement;
27 import java.sql.SQLException;
28 import java.sql.Types;
29 import java.util.ArrayList;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.ListIterator;
33 import java.util.Map;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.jumpmind.symmetric.common.Constants;
38 import org.jumpmind.symmetric.common.ParameterConstants;
39 import org.jumpmind.symmetric.common.csv.CsvUtil;
40 import org.jumpmind.symmetric.db.IDbDialect;
41 import org.jumpmind.symmetric.db.SequenceIdentifier;
42 import org.jumpmind.symmetric.load.IReloadListener;
43 import org.jumpmind.symmetric.model.Data;
44 import org.jumpmind.symmetric.model.DataEvent;
45 import org.jumpmind.symmetric.model.DataEventType;
46 import org.jumpmind.symmetric.model.Node;
47 import org.jumpmind.symmetric.model.NodeChannel;
48 import org.jumpmind.symmetric.model.Trigger;
49 import org.jumpmind.symmetric.model.TriggerHistory;
50 import org.jumpmind.symmetric.service.IConfigurationService;
51 import org.jumpmind.symmetric.service.IDataService;
52 import org.jumpmind.symmetric.service.INodeService;
53 import org.jumpmind.symmetric.service.IOutgoingBatchService;
54 import org.jumpmind.symmetric.service.IPurgeService;
55 import org.springframework.dao.DataAccessException;
56 import org.springframework.jdbc.core.PreparedStatementCallback;
57
58 import com.csvreader.CsvWriter;
59
60 public class DataService extends AbstractService implements IDataService {
61
62 static final Log logger = LogFactory.getLog(DataService.class);
63
64 private IConfigurationService configurationService;
65
66 private INodeService nodeService;
67
68 private IPurgeService purgeService;
69
70 private IOutgoingBatchService outgoingBatchService;
71
72 private String tablePrefix;
73
74 private IDbDialect dbDialect;
75
76 private List<IReloadListener> listeners;
77
78 public void insertReloadEvent(final Node targetNode, final Trigger trigger) {
79 insertReloadEvent(targetNode, trigger, null);
80 }
81
82 public void insertReloadEvent(final Node targetNode, final Trigger trigger, final String overrideInitialLoadSelect) {
83 TriggerHistory history = lookupTriggerHistory(trigger);
84
85
86 Data data = new Data(history.getSourceTableName(), DataEventType.RELOAD, overrideInitialLoadSelect, null,
87 history);
88 insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
89 }
90
91 public void insertResendConfigEvent(final Node targetNode) {
92 Data data = new Data(Constants.NA, DataEventType.CONFIG, null, null, null);
93 insertDataEvent(data, Constants.CHANNEL_CONFIG, targetNode.getNodeId());
94 }
95
96 private TriggerHistory lookupTriggerHistory(Trigger trigger) {
97 TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
98
99 if (history == null) {
100 throw new RuntimeException("Cannot find history for trigger " + trigger.getTriggerId() + ", "
101 + trigger.getSourceTableName());
102 }
103 return history;
104 }
105
106 public void insertPurgeEvent(final Node targetNode, final Trigger trigger) {
107 String sql = dbDialect.createPurgeSqlFor(targetNode, trigger, lookupTriggerHistory(trigger));
108 insertSqlEvent(targetNode, trigger, sql);
109 }
110
111 public void insertSqlEvent(final Node targetNode, final Trigger trigger, String sql) {
112 TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
113 Data data = new Data(trigger.getSourceTableName(), DataEventType.SQL, CsvUtil.escapeCsvData(sql), null, history);
114 insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
115 }
116
117 public void insertSqlEvent(final Node targetNode, String sql) {
118 Data data = new Data(Constants.NA, DataEventType.SQL, CsvUtil.escapeCsvData(sql), null, null);
119 insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
120 }
121
122 public void insertCreateEvent(final Node targetNode, final Trigger trigger, String xml) {
123 TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
124 Data data = new Data(trigger.getSourceTableName(), DataEventType.CREATE, CsvUtil.escapeCsvData(xml), null,
125 history);
126 insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
127 }
128
129 public long insertData(final Data data) {
130 return dbDialect.insertWithGeneratedKey(getSql("insertIntoDataSql"), SequenceIdentifier.DATA,
131 new PreparedStatementCallback() {
132 public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
133 ps.setString(1, data.getTableName());
134 ps.setString(2, data.getEventType().getCode());
135 ps.setString(3, data.getRowData());
136 ps.setString(4, data.getPkData());
137 ps.setString(5, data.getOldData());
138 ps.setLong(6, data.getAudit() != null ? data.getAudit().getTriggerHistoryId() : -1);
139 return null;
140 }
141 });
142 }
143
144 public void insertDataEvent(DataEvent dataEvent) {
145 jdbcTemplate.update(getSql("insertIntoDataEventSql"), new Object[] { dataEvent.getDataId(),
146 dataEvent.getNodeId(), dataEvent.getChannelId(), dataEvent.getTransactionId(), dataEvent.getBatchId(),
147 dataEvent.isBatched() ? 1 : 0 }, new int[] { Types.INTEGER, Types.VARCHAR, Types.VARCHAR,
148 Types.VARCHAR, Types.INTEGER, Types.INTEGER });
149 }
150
151 public void insertDataEvent(Data data, String channelId, List<Node> nodes) {
152 insertDataEvent(data, channelId, null, nodes);
153 }
154
155 public void insertDataEvent(Data data, String channelId, String transactionId, List<Node> nodes) {
156 long dataId = insertData(data);
157 for (Node node : nodes) {
158 insertDataEvent(new DataEvent(dataId, node.getNodeId(), channelId, transactionId));
159 }
160 }
161
162 public void insertDataEvent(Data data, String channelId, String nodeId) {
163 long dataId = insertData(data);
164 insertDataEvent(new DataEvent(dataId, nodeId, channelId));
165 }
166
167 public String reloadNode(String nodeId) {
168 Node targetNode = nodeService.findNode(nodeId);
169 if (targetNode == null) {
170 return "Unknown node " + nodeId;
171 }
172 if (nodeService.setInitialLoadEnabled(nodeId, true)) {
173 return "Successfully opened initial load for node " + nodeId;
174 } else {
175 return "Could not open initial load for " + nodeId;
176 }
177 }
178
179 public void insertReloadEvent(Node targetNode) {
180 Node sourceNode = nodeService.findIdentity();
181 if (listeners != null) {
182 for (IReloadListener listener : listeners) {
183 listener.beforeReload(targetNode);
184 }
185 }
186
187
188 purgeService.purgeAllOutgoingEventsForNode(targetNode.getNodeId());
189
190 insertNodeSecurityUpdate(targetNode);
191 List<Trigger> triggers = configurationService.getActiveTriggersForReload(sourceNode.getNodeGroupId(),
192 targetNode.getNodeGroupId());
193
194 if (parameterService.is(ParameterConstants.AUTO_CREATE_SCHEMA_BEFORE_RELOAD)) {
195 for (Trigger trigger : triggers) {
196 String xml = dbDialect.getCreateTableXML(trigger);
197 insertCreateEvent(targetNode, trigger, xml);
198 buildReloadBatches(targetNode.getNodeId());
199 }
200 }
201 if (parameterService.is(ParameterConstants.AUTO_DELETE_BEFORE_RELOAD)) {
202 for (ListIterator<Trigger> iterator = triggers.listIterator(triggers.size()); iterator.hasPrevious();) {
203 Trigger trigger = iterator.previous();
204 insertPurgeEvent(targetNode, trigger);
205 buildReloadBatches(targetNode.getNodeId());
206 }
207 }
208
209 for (Trigger trigger : triggers) {
210 insertReloadEvent(targetNode, trigger);
211 buildReloadBatches(targetNode.getNodeId());
212 }
213
214 if (listeners != null) {
215 for (IReloadListener listener : listeners) {
216 listener.afterReload(targetNode);
217 }
218 }
219 nodeService.setInitialLoadEnabled(targetNode.getNodeId(), false);
220 insertNodeSecurityUpdate(targetNode);
221
222
223 purgeService.purgeAllIncomingEventsForNode(targetNode.getNodeId());
224 }
225
226 /***
227 * This should be called after a reload event is inserted so there is a one
228 * to one between data events and reload batches.
229 */
230 private void buildReloadBatches(String nodeId) {
231 NodeChannel channel = new NodeChannel();
232 channel.setId(Constants.CHANNEL_RELOAD);
233 channel.setEnabled(true);
234 outgoingBatchService.buildOutgoingBatches(nodeId, channel);
235
236 }
237
238 private void insertNodeSecurityUpdate(Node node) {
239 Data data = createData(tablePrefix + "_node_security", " t.node_id = '" + node.getNodeId() + "'");
240 if (data != null) {
241 insertDataEvent(data, Constants.CHANNEL_RELOAD, node.getNodeId());
242 }
243 }
244
245 public String sendSQL(String nodeId, String tableName, String sql) {
246 Node sourceNode = nodeService.findIdentity();
247 Node targetNode = nodeService.findNode(nodeId);
248 if (targetNode == null) {
249 return "Unknown node " + nodeId;
250 }
251
252 Trigger trigger = configurationService.getTriggerFor(tableName, sourceNode.getNodeGroupId());
253 if (trigger == null) {
254 return "Trigger for table " + tableName + " does not exist from node " + sourceNode.getNodeGroupId();
255 }
256
257 insertSqlEvent(targetNode, trigger, sql);
258 return "Successfully create SQL event for node " + targetNode.getNodeId();
259 }
260
261 public String reloadTable(String nodeId, String tableName) {
262 return reloadTable(nodeId, tableName, null);
263 }
264
265 public String reloadTable(String nodeId, String tableName, String overrideInitialLoadSelect) {
266 Node sourceNode = nodeService.findIdentity();
267 Node targetNode = nodeService.findNode(nodeId);
268 if (targetNode == null) {
269 return "Unknown node " + nodeId;
270 }
271
272 Trigger trigger = configurationService.getTriggerFor(tableName, sourceNode.getNodeGroupId());
273 if (trigger == null) {
274 return "Trigger for table " + tableName + " does not exist from node " + sourceNode.getNodeGroupId();
275 }
276
277 if (parameterService.is(ParameterConstants.AUTO_CREATE_SCHEMA_BEFORE_RELOAD)) {
278 String xml = dbDialect.getCreateTableXML(trigger);
279 insertCreateEvent(targetNode, trigger, xml);
280 } else if (parameterService.is(ParameterConstants.AUTO_DELETE_BEFORE_RELOAD)) {
281 insertPurgeEvent(targetNode, trigger);
282 }
283
284 insertReloadEvent(targetNode, trigger, overrideInitialLoadSelect);
285
286 return "Successfully created event to reload table " + tableName + " for node " + targetNode.getNodeId();
287 }
288
289 /***
290 * Because we can't add a trigger on the _node table, we are artificially
291 * generating heartbeat events.
292 *
293 * @param node
294 */
295 public void insertHeartbeatEvent(Node node) {
296 StringBuilder sql = new StringBuilder(getSql("updateNodeHeartbeatSql"));
297 sql.append("'");
298 sql.append(node.getNodeId());
299 sql.append("'");
300 List<Node> targets = nodeService.findNodesToPushTo();
301 for (Node targetNode : targets) {
302 insertSqlEvent(targetNode, sql.toString());
303 }
304 }
305
306 public Data createData(String tableName) {
307 return createData(tableName, null);
308 }
309
310 public Data createData(String tableName, String whereClause) {
311 Data data = null;
312 Trigger trigger = configurationService.getTriggerFor(tableName, parameterService.getNodeGroupId());
313 if (trigger != null) {
314 String rowData = null;
315 String pkData = null;
316 if (whereClause != null) {
317 rowData = (String) jdbcTemplate.queryForObject(dbDialect.createCsvDataSql(trigger, whereClause),
318 String.class);
319 pkData = (String) jdbcTemplate.queryForObject(dbDialect.createCsvPrimaryKeySql(trigger, whereClause),
320 String.class);
321 }
322 TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
323 data = new Data(trigger.getSourceTableName(), DataEventType.UPDATE, rowData, pkData, history);
324 }
325 return data;
326 }
327
328 public Map<String, String> getRowDataAsMap(Data data) {
329 Map<String, String> map = new HashMap<String, String>();
330 String[] columnNames = CsvUtil.tokenizeCsvData(data.getAudit().getColumnNames());
331 String[] columnData = CsvUtil.tokenizeCsvData(data.getRowData());
332 for (int i = 0; i < columnNames.length; i++) {
333 map.put(columnNames[i].toLowerCase(), columnData[i]);
334 }
335 return map;
336 }
337
338 public void setRowDataFromMap(Data data, Map<String, String> map) {
339 String[] columnNames = CsvUtil.tokenizeCsvData(data.getAudit().getColumnNames());
340 ByteArrayOutputStream out = new ByteArrayOutputStream();
341 CsvWriter writer = new CsvWriter(new OutputStreamWriter(out), ',');
342 writer.setEscapeMode(CsvWriter.ESCAPE_MODE_BACKSLASH);
343 for (String columnName : columnNames) {
344 try {
345 writer.write(map.get(columnName.toLowerCase()), true);
346 } catch (IOException e) {
347 }
348 }
349 writer.close();
350 data.setRowData(out.toString());
351 }
352
353 @Deprecated
354 public String[] tokenizeCsvData(String csvData) {
355 return CsvUtil.tokenizeCsvData(csvData);
356 }
357
358 public void setReloadListeners(List<IReloadListener> listeners) {
359 this.listeners = listeners;
360 }
361
362 public void addReloadListener(IReloadListener listener) {
363 if (listeners == null) {
364 listeners = new ArrayList<IReloadListener>();
365 }
366 listeners.add(listener);
367 }
368
369 public void removeReloadListener(IReloadListener listener) {
370 listeners.remove(listener);
371 }
372
373 public void setConfigurationService(IConfigurationService configurationService) {
374 this.configurationService = configurationService;
375 }
376
377 public void setNodeService(INodeService nodeService) {
378 this.nodeService = nodeService;
379 }
380
381 public void setDbDialect(IDbDialect dbDialect) {
382 this.dbDialect = dbDialect;
383 }
384
385 public void setTablePrefix(String tablePrefix) {
386 this.tablePrefix = tablePrefix;
387 }
388
389 public void setPurgeService(IPurgeService purgeService) {
390 this.purgeService = purgeService;
391 }
392
393 public void setOutgoingBatchService(IOutgoingBatchService outgoingBatchService) {
394 this.outgoingBatchService = outgoingBatchService;
395 }
396
397 }