1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.service.impl;
23
24 import java.io.BufferedReader;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.OutputStream;
28 import java.io.StringReader;
29 import java.net.ConnectException;
30 import java.net.MalformedURLException;
31 import java.net.UnknownHostException;
32 import java.sql.SQLException;
33 import java.util.ArrayList;
34 import java.util.HashMap;
35 import java.util.List;
36 import java.util.Map;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.jumpmind.symmetric.common.Constants;
41 import org.jumpmind.symmetric.common.ErrorConstants;
42 import org.jumpmind.symmetric.common.ParameterConstants;
43 import org.jumpmind.symmetric.db.IDbDialect;
44 import org.jumpmind.symmetric.load.IBatchListener;
45 import org.jumpmind.symmetric.load.IColumnFilter;
46 import org.jumpmind.symmetric.load.IDataLoader;
47 import org.jumpmind.symmetric.load.IDataLoaderFilter;
48 import org.jumpmind.symmetric.load.IDataLoaderStatistics;
49 import org.jumpmind.symmetric.model.IncomingBatch;
50 import org.jumpmind.symmetric.model.IncomingBatchHistory;
51 import org.jumpmind.symmetric.model.Node;
52 import org.jumpmind.symmetric.model.IncomingBatchHistory.Status;
53 import org.jumpmind.symmetric.service.IDataLoaderService;
54 import org.jumpmind.symmetric.service.IIncomingBatchService;
55 import org.jumpmind.symmetric.service.INodeService;
56 import org.jumpmind.symmetric.service.RegistrationNotOpenException;
57 import org.jumpmind.symmetric.service.RegistrationRequiredException;
58 import org.jumpmind.symmetric.statistic.IStatisticManager;
59 import org.jumpmind.symmetric.statistic.StatisticName;
60 import org.jumpmind.symmetric.transport.AuthenticationException;
61 import org.jumpmind.symmetric.transport.ConnectionRejectedException;
62 import org.jumpmind.symmetric.transport.IIncomingTransport;
63 import org.jumpmind.symmetric.transport.ITransportManager;
64 import org.jumpmind.symmetric.transport.TransportException;
65 import org.jumpmind.symmetric.transport.internal.InternalIncomingTransport;
66 import org.springframework.beans.BeansException;
67 import org.springframework.beans.factory.BeanFactory;
68 import org.springframework.beans.factory.BeanFactoryAware;
69 import org.springframework.transaction.TransactionStatus;
70 import org.springframework.transaction.support.TransactionCallbackWithoutResult;
71 import org.springframework.transaction.support.TransactionTemplate;
72
73 public class DataLoaderService extends AbstractService implements IDataLoaderService, BeanFactoryAware {
74
75 protected static final Log logger = LogFactory.getLog(DataLoaderService.class);
76
77 protected IDbDialect dbDialect;
78
79 protected IIncomingBatchService incomingBatchService;
80
81 protected ITransportManager transportManager;
82
83 protected TransactionTemplate transactionTemplate;
84
85 protected BeanFactory beanFactory;
86
87 protected List<IDataLoaderFilter> filters;
88
89 protected IStatisticManager statisticManager;
90
91 protected INodeService nodeService;
92
93 protected Map<String, IColumnFilter> columnFilters = new HashMap<String, IColumnFilter>();
94
95 protected List<IBatchListener> batchListeners;
96
97 /***
98 * Connect to the remote node and pull data. The acknowledgment of
99 * commit/error status is sent separately after the data is processed.
100 */
101 public boolean loadData(Node remote, Node local) throws IOException {
102 boolean wasWorkDone = false;
103 try {
104 List<IncomingBatchHistory> list = loadDataAndReturnBatches(transportManager.getPullTransport(remote, local));
105 if (list.size() > 0) {
106 sendAck(remote, local, list);
107 wasWorkDone = true;
108 }
109 } catch (RegistrationRequiredException e) {
110 logger.warn("Registration was lost, attempting to re-register");
111 loadData(transportManager.getRegisterTransport(local));
112 nodeService.findIdentity(false);
113 wasWorkDone = true;
114 } catch (MalformedURLException e) {
115 logger.error("Could not connect to the " + remote + " node's transport because of a bad URL: "
116 + e.getMessage());
117 }
118 return wasWorkDone;
119 }
120
121 /***
122 * Try a configured number of times to get the ACK through.
123 */
124 private void sendAck(Node remote, Node local, List<IncomingBatchHistory> list) throws IOException {
125 Exception error = null;
126 boolean sendAck = false;
127 int numberOfStatusSendRetries = parameterService.getInt(ParameterConstants.DATA_LOADER_NUM_OF_ACK_RETRIES);
128 for (int i = 0; i < numberOfStatusSendRetries && !sendAck; i++) {
129 try {
130 sendAck = transportManager.sendAcknowledgement(remote, list, local);
131 } catch (IOException ex) {
132 logger.warn("Ack was not sent successfully on try number " + i + 1 + ". " + ex.getMessage());
133 error = ex;
134 } catch (RuntimeException ex) {
135 logger.warn("Ack was not sent successfully on try number " + i + 1 + ". " + ex.getMessage());
136 error = ex;
137 }
138 if (!sendAck) {
139 if (i < numberOfStatusSendRetries - 1) {
140 sleepBetweenFailedAcks();
141 } else if (error instanceof RuntimeException) {
142 throw (RuntimeException) error;
143 } else if (error instanceof IOException) {
144 throw (IOException) error;
145 }
146 }
147 }
148 }
149
150 private final void sleepBetweenFailedAcks() {
151 try {
152 Thread.sleep(parameterService.getLong(ParameterConstants.DATA_LOADER_TIME_BETWEEN_ACK_RETRIES));
153 } catch (InterruptedException e) {
154 }
155 }
156
157 public IDataLoader openDataLoader(BufferedReader reader) throws IOException {
158 IDataLoader dataLoader = (IDataLoader) beanFactory.getBean(Constants.DATALOADER);
159 dataLoader.open(reader, filters, columnFilters);
160 return dataLoader;
161 }
162
163 public IDataLoaderStatistics loadDataBatch(String batchData) throws IOException {
164 BufferedReader reader = new BufferedReader(new StringReader(batchData));
165 IDataLoader dataLoader = openDataLoader(reader);
166 IDataLoaderStatistics stats = null;
167 try {
168 while (dataLoader.hasNext()) {
169 dataLoader.load();
170 IncomingBatchHistory history = new IncomingBatchHistory(dataLoader.getContext());
171 history.setValues(dataLoader.getStatistics(), true);
172 fireBatchComplete(dataLoader, history);
173 }
174 } finally {
175 stats = dataLoader.getStatistics();
176 dataLoader.close();
177 }
178 return stats;
179 }
180
181 /***
182 * Load database from input stream and return a list of batch statuses. This
183 * is used for a pull request that responds with data, and the
184 * acknowledgment is sent later.
185 *
186 * @param in
187 */
188 protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport transport) {
189
190 List<IncomingBatchHistory> list = new ArrayList<IncomingBatchHistory>();
191 IncomingBatch status = null;
192 IncomingBatchHistory history = null;
193 IDataLoader dataLoader = null;
194 try {
195 dataLoader = openDataLoader(transport.open());
196 while (dataLoader.hasNext()) {
197 status = new IncomingBatch(dataLoader.getContext());
198 history = new IncomingBatchHistory(dataLoader.getContext());
199 list.add(history);
200 loadBatch(dataLoader, status, history);
201 status = null;
202 }
203 } catch (RegistrationRequiredException ex) {
204 throw ex;
205 } catch (ConnectException ex) {
206 logger.warn(ErrorConstants.COULD_NOT_CONNECT_TO_TRANSPORT);
207 statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_CONNECT_ERROR_COUNT).increment();
208 } catch (UnknownHostException ex) {
209 logger.warn(ErrorConstants.COULD_NOT_CONNECT_TO_TRANSPORT + " Unknown host name of " + ex.getMessage());
210 statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_CONNECT_ERROR_COUNT).increment();
211 } catch (RegistrationNotOpenException ex) {
212 logger.warn(ErrorConstants.REGISTRATION_NOT_OPEN);
213 } catch (ConnectionRejectedException ex) {
214 logger.warn(ErrorConstants.TRANSPORT_REJECTED_CONNECTION);
215 statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_REJECTED_COUNT).increment();
216 } catch (AuthenticationException ex) {
217 logger.warn(ErrorConstants.NOT_AUTHENTICATED);
218 } catch (Throwable e) {
219 if (dataLoader != null && status != null) {
220 if (e instanceof IOException || e instanceof TransportException) {
221 logger.warn("Failed to load batch " + status.getNodeBatchId() + " because: " + e.getMessage());
222 history.setSqlMessage(e.getMessage());
223 statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_ERROR_COUNT).increment();
224 } else {
225 logger.error("Failed to load batch " + status.getNodeBatchId(), e);
226 SQLException se = unwrapSqlException(e);
227 if (se != null) {
228 statisticManager.getStatistic(StatisticName.INCOMING_DATABASE_ERROR_COUNT).increment();
229 history.setSqlState(se.getSQLState());
230 history.setSqlCode(se.getErrorCode());
231 history.setSqlMessage(se.getMessage());
232 } else {
233 history.setSqlMessage(e.getMessage());
234 statisticManager.getStatistic(StatisticName.INCOMING_OTHER_ERROR_COUNT).increment();
235 }
236 }
237 history.setValues(dataLoader.getStatistics(), false);
238 handleBatchError(status, history);
239 } else {
240 if (e instanceof IOException) {
241 logger.error("Failed while reading batch because: " + e.getMessage());
242 } else {
243 logger.error("Failed while parsing batch.", e);
244 }
245 }
246 } finally {
247 if (dataLoader != null) {
248 dataLoader.close();
249 }
250 recordStatistics(list);
251 }
252 return list;
253 }
254
255 private void recordStatistics(List<IncomingBatchHistory> list) {
256 if (list != null) {
257 statisticManager.getStatistic(StatisticName.INCOMING_BATCH_COUNT).add(list.size());
258 for (IncomingBatchHistory incomingBatchHistory : list) {
259 statisticManager.getStatistic(StatisticName.INCOMING_MS_PER_ROW).add(
260 incomingBatchHistory.getDatabaseMillis(), incomingBatchHistory.getStatementCount());
261 statisticManager.getStatistic(StatisticName.INCOMING_BATCH_COUNT).increment();
262 if (IncomingBatchHistory.Status.SK.equals(incomingBatchHistory.getStatus())) {
263 statisticManager.getStatistic(StatisticName.INCOMING_SKIP_BATCH_COUNT).increment();
264 }
265 }
266 }
267 }
268
269 public boolean loadData(IIncomingTransport transport) {
270 boolean inError = false;
271 List<IncomingBatchHistory> list = loadDataAndReturnBatches(transport);
272 if (list != null && list.size() > 0) {
273 for (IncomingBatchHistory incomingBatchHistory : list) {
274 inError |= incomingBatchHistory.getStatus() != Status.OK;
275 }
276 } else {
277 inError = true;
278 }
279 return !inError;
280 }
281
282 protected void loadBatch(final IDataLoader dataLoader, final IncomingBatch status,
283 final IncomingBatchHistory history) {
284 transactionTemplate.execute(new TransactionCallbackWithoutResult() {
285 public void doInTransactionWithoutResult(TransactionStatus transactionstatus) {
286 try {
287 dbDialect.disableSyncTriggers();
288 if (incomingBatchService.acquireIncomingBatch(status)) {
289 dataLoader.load();
290 } else {
291 history.setStatus(IncomingBatchHistory.Status.SK);
292 dataLoader.skip();
293 }
294 history.setValues(dataLoader.getStatistics(), true);
295 fireBatchComplete(dataLoader, history);
296 incomingBatchService.insertIncomingBatchHistory(history);
297 } catch (IOException e) {
298 throw new TransportException(e);
299 } finally {
300 dbDialect.enableSyncTriggers();
301 }
302 }
303 });
304 }
305
306 private void fireBatchComplete(IDataLoader loader, IncomingBatchHistory history) {
307 if (batchListeners != null) {
308 long ts = System.currentTimeMillis();
309 for (IBatchListener listener : batchListeners) {
310 listener.batchComplete(loader, history);
311 }
312
313
314 history.setFilterMillis(history.getFilterMillis() + (System.currentTimeMillis() - ts));
315 }
316 }
317
318 protected void handleBatchError(final IncomingBatch status, final IncomingBatchHistory history) {
319 try {
320 if (!status.isRetry()) {
321 status.setStatus(IncomingBatch.Status.ER);
322 incomingBatchService.insertIncomingBatch(status);
323 }
324 } catch (Exception e) {
325 logger.error("Failed to record status of batch " + status.getNodeBatchId());
326 }
327 try {
328 history.setStatus(IncomingBatchHistory.Status.ER);
329 incomingBatchService.insertIncomingBatchHistory(history);
330 } catch (Exception e) {
331 logger.error("Failed to record history of batch " + status.getNodeBatchId());
332 }
333 }
334
335 /***
336 * Load database from input stream and write acknowledgment to output
337 * stream. This is used for a "push" request with a response of an
338 * acknowledgment.
339 *
340 * @param in
341 * @param out
342 * @throws IOException
343 */
344 public void loadData(InputStream in, OutputStream out) throws IOException {
345 List<IncomingBatchHistory> list = loadDataAndReturnBatches(new InternalIncomingTransport(in));
346 transportManager.writeAcknowledgement(out, list);
347 }
348
349 public void setDataLoaderFilters(List<IDataLoaderFilter> filters) {
350 this.filters = filters;
351 }
352
353 public void addDataLoaderFilter(IDataLoaderFilter filter) {
354 if (filters == null) {
355 filters = new ArrayList<IDataLoaderFilter>();
356 }
357 filters.add(filter);
358 }
359
360 public void removeDataLoaderFilter(IDataLoaderFilter filter) {
361 filters.remove(filter);
362 }
363
364 public void setTransportManager(ITransportManager remoteService) {
365 this.transportManager = remoteService;
366 }
367
368 public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
369 this.transactionTemplate = transactionTemplate;
370 }
371
372 public void setIncomingBatchService(IIncomingBatchService incomingBatchService) {
373 this.incomingBatchService = incomingBatchService;
374 }
375
376 public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
377 this.beanFactory = beanFactory;
378 }
379
380 public void setDbDialect(IDbDialect dbDialect) {
381 this.dbDialect = dbDialect;
382 }
383
384 public void addColumnFilter(String tableName, IColumnFilter filter) {
385 this.columnFilters.put(tableName, filter);
386 }
387
388 public void setStatisticManager(IStatisticManager statisticManager) {
389 this.statisticManager = statisticManager;
390 }
391
392 public void addBatchListener(IBatchListener batchListener) {
393 if (this.batchListeners == null) {
394 this.batchListeners = new ArrayList<IBatchListener>();
395 }
396 this.batchListeners.add(batchListener);
397 }
398
399 public void setBatchListeners(List<IBatchListener> batchListeners) {
400 this.batchListeners = batchListeners;
401 }
402
403 public void setNodeService(INodeService nodeService) {
404 this.nodeService = nodeService;
405 }
406
407 }