View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Eric Long <erilong@users.sourceforge.net>,
5    *               Chris Henson <chenson42@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.service.impl;
23  
24  import java.io.BufferedReader;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.OutputStream;
28  import java.io.StringReader;
29  import java.net.ConnectException;
30  import java.net.MalformedURLException;
31  import java.net.UnknownHostException;
32  import java.sql.SQLException;
33  import java.util.ArrayList;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.jumpmind.symmetric.common.Constants;
41  import org.jumpmind.symmetric.common.ErrorConstants;
42  import org.jumpmind.symmetric.common.ParameterConstants;
43  import org.jumpmind.symmetric.db.IDbDialect;
44  import org.jumpmind.symmetric.load.IBatchListener;
45  import org.jumpmind.symmetric.load.IColumnFilter;
46  import org.jumpmind.symmetric.load.IDataLoader;
47  import org.jumpmind.symmetric.load.IDataLoaderFilter;
48  import org.jumpmind.symmetric.load.IDataLoaderStatistics;
49  import org.jumpmind.symmetric.model.IncomingBatch;
50  import org.jumpmind.symmetric.model.IncomingBatchHistory;
51  import org.jumpmind.symmetric.model.Node;
52  import org.jumpmind.symmetric.model.IncomingBatchHistory.Status;
53  import org.jumpmind.symmetric.service.IDataLoaderService;
54  import org.jumpmind.symmetric.service.IIncomingBatchService;
55  import org.jumpmind.symmetric.service.INodeService;
56  import org.jumpmind.symmetric.service.RegistrationNotOpenException;
57  import org.jumpmind.symmetric.service.RegistrationRequiredException;
58  import org.jumpmind.symmetric.statistic.IStatisticManager;
59  import org.jumpmind.symmetric.statistic.StatisticName;
60  import org.jumpmind.symmetric.transport.AuthenticationException;
61  import org.jumpmind.symmetric.transport.ConnectionRejectedException;
62  import org.jumpmind.symmetric.transport.IIncomingTransport;
63  import org.jumpmind.symmetric.transport.ITransportManager;
64  import org.jumpmind.symmetric.transport.TransportException;
65  import org.jumpmind.symmetric.transport.internal.InternalIncomingTransport;
66  import org.springframework.beans.BeansException;
67  import org.springframework.beans.factory.BeanFactory;
68  import org.springframework.beans.factory.BeanFactoryAware;
69  import org.springframework.transaction.TransactionStatus;
70  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
71  import org.springframework.transaction.support.TransactionTemplate;
72  
73  public class DataLoaderService extends AbstractService implements IDataLoaderService, BeanFactoryAware {
74  
75      protected static final Log logger = LogFactory.getLog(DataLoaderService.class);
76  
77      protected IDbDialect dbDialect;
78  
79      protected IIncomingBatchService incomingBatchService;
80  
81      protected ITransportManager transportManager;
82  
83      protected TransactionTemplate transactionTemplate;
84  
85      protected BeanFactory beanFactory;
86  
87      protected List<IDataLoaderFilter> filters;
88  
89      protected IStatisticManager statisticManager;
90      
91      protected INodeService nodeService;
92  
93      protected Map<String, IColumnFilter> columnFilters = new HashMap<String, IColumnFilter>();
94  
95      protected List<IBatchListener> batchListeners;
96  
97      /***
98       * Connect to the remote node and pull data. The acknowledgment of
99       * commit/error status is sent separately after the data is processed.
100      */
101     public boolean loadData(Node remote, Node local) throws IOException {
102         boolean wasWorkDone = false;
103         try {
104             List<IncomingBatchHistory> list = loadDataAndReturnBatches(transportManager.getPullTransport(remote, local));
105             if (list.size() > 0) {
106                 sendAck(remote, local, list);
107                 wasWorkDone = true;
108             }
109         } catch (RegistrationRequiredException e) {
110             logger.warn("Registration was lost, attempting to re-register");
111             loadData(transportManager.getRegisterTransport(local));
112             nodeService.findIdentity(false);
113             wasWorkDone = true;
114         } catch (MalformedURLException e) {
115             logger.error("Could not connect to the " + remote + " node's transport because of a bad URL: "
116                     + e.getMessage());
117         }
118         return wasWorkDone;
119     }
120 
121     /***
122      * Try a configured number of times to get the ACK through.
123      */
124     private void sendAck(Node remote, Node local, List<IncomingBatchHistory> list) throws IOException {
125         Exception error = null;
126         boolean sendAck = false;
127         int numberOfStatusSendRetries = parameterService.getInt(ParameterConstants.DATA_LOADER_NUM_OF_ACK_RETRIES);
128         for (int i = 0; i < numberOfStatusSendRetries && !sendAck; i++) {
129             try {
130                 sendAck = transportManager.sendAcknowledgement(remote, list, local);
131             } catch (IOException ex) {
132                 logger.warn("Ack was not sent successfully on try number " + i + 1 + ". " + ex.getMessage());
133                 error = ex;
134             } catch (RuntimeException ex) {
135                 logger.warn("Ack was not sent successfully on try number " + i + 1 + ". " + ex.getMessage());
136                 error = ex;
137             }
138             if (!sendAck) {
139                 if (i < numberOfStatusSendRetries - 1) {
140                     sleepBetweenFailedAcks();
141                 } else if (error instanceof RuntimeException) {
142                     throw (RuntimeException) error;
143                 } else if (error instanceof IOException) {
144                     throw (IOException) error;
145                 }
146             }
147         }
148     }
149 
150     private final void sleepBetweenFailedAcks() {
151         try {
152             Thread.sleep(parameterService.getLong(ParameterConstants.DATA_LOADER_TIME_BETWEEN_ACK_RETRIES));
153         } catch (InterruptedException e) {
154         }
155     }
156 
157     public IDataLoader openDataLoader(BufferedReader reader) throws IOException {
158         IDataLoader dataLoader = (IDataLoader) beanFactory.getBean(Constants.DATALOADER);
159         dataLoader.open(reader, filters, columnFilters);
160         return dataLoader;
161     }
162 
163     public IDataLoaderStatistics loadDataBatch(String batchData) throws IOException {
164         BufferedReader reader = new BufferedReader(new StringReader(batchData));
165         IDataLoader dataLoader = openDataLoader(reader);
166         IDataLoaderStatistics stats = null;
167         try {
168             while (dataLoader.hasNext()) {
169                 dataLoader.load();
170                 IncomingBatchHistory history = new IncomingBatchHistory(dataLoader.getContext());
171                 history.setValues(dataLoader.getStatistics(), true);
172                 fireBatchComplete(dataLoader, history);                
173             }
174         } finally {
175             stats = dataLoader.getStatistics();
176             dataLoader.close();            
177         }
178         return stats;
179     }
180 
181     /***
182      * Load database from input stream and return a list of batch statuses. This
183      * is used for a pull request that responds with data, and the
184      * acknowledgment is sent later.
185      * 
186      * @param in
187      */
188     protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport transport) {
189 
190         List<IncomingBatchHistory> list = new ArrayList<IncomingBatchHistory>();
191         IncomingBatch status = null;
192         IncomingBatchHistory history = null;
193         IDataLoader dataLoader = null;
194         try {
195             dataLoader = openDataLoader(transport.open());
196             while (dataLoader.hasNext()) {
197                 status = new IncomingBatch(dataLoader.getContext());
198                 history = new IncomingBatchHistory(dataLoader.getContext());
199                 list.add(history);
200                 loadBatch(dataLoader, status, history);
201                 status = null;
202             }
203         } catch (RegistrationRequiredException ex) {
204             throw ex;
205         } catch (ConnectException ex) {
206             logger.warn(ErrorConstants.COULD_NOT_CONNECT_TO_TRANSPORT);
207             statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_CONNECT_ERROR_COUNT).increment();
208         } catch (UnknownHostException ex) {
209             logger.warn(ErrorConstants.COULD_NOT_CONNECT_TO_TRANSPORT + " Unknown host name of " + ex.getMessage());
210             statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_CONNECT_ERROR_COUNT).increment();
211         } catch (RegistrationNotOpenException ex) {
212             logger.warn(ErrorConstants.REGISTRATION_NOT_OPEN);
213         } catch (ConnectionRejectedException ex) {
214             logger.warn(ErrorConstants.TRANSPORT_REJECTED_CONNECTION);
215             statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_REJECTED_COUNT).increment();
216         } catch (AuthenticationException ex) {
217             logger.warn(ErrorConstants.NOT_AUTHENTICATED);
218         } catch (Throwable e) {
219             if (dataLoader != null && status != null) {
220                 if (e instanceof IOException || e instanceof TransportException) {
221                     logger.warn("Failed to load batch " + status.getNodeBatchId() + " because: " + e.getMessage());
222                     history.setSqlMessage(e.getMessage());
223                     statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_ERROR_COUNT).increment();
224                 } else {
225                     logger.error("Failed to load batch " + status.getNodeBatchId(), e);
226                     SQLException se = unwrapSqlException(e);
227                     if (se != null) {
228                         statisticManager.getStatistic(StatisticName.INCOMING_DATABASE_ERROR_COUNT).increment();
229                         history.setSqlState(se.getSQLState());
230                         history.setSqlCode(se.getErrorCode());
231                         history.setSqlMessage(se.getMessage());
232                     } else {
233                         history.setSqlMessage(e.getMessage());
234                         statisticManager.getStatistic(StatisticName.INCOMING_OTHER_ERROR_COUNT).increment();
235                     }
236                 }
237                 history.setValues(dataLoader.getStatistics(), false);
238                 handleBatchError(status, history);
239             } else {
240                 if (e instanceof IOException) {
241                     logger.error("Failed while reading batch because: " + e.getMessage());
242                 } else {
243                     logger.error("Failed while parsing batch.", e);
244                 }
245             }
246         } finally {
247             if (dataLoader != null) {
248                 dataLoader.close();
249             }
250             recordStatistics(list);
251         }
252         return list;
253     }
254 
255     private void recordStatistics(List<IncomingBatchHistory> list) {
256         if (list != null) {
257             statisticManager.getStatistic(StatisticName.INCOMING_BATCH_COUNT).add(list.size());
258             for (IncomingBatchHistory incomingBatchHistory : list) {
259                 statisticManager.getStatistic(StatisticName.INCOMING_MS_PER_ROW).add(
260                         incomingBatchHistory.getDatabaseMillis(), incomingBatchHistory.getStatementCount());
261                 statisticManager.getStatistic(StatisticName.INCOMING_BATCH_COUNT).increment();
262                 if (IncomingBatchHistory.Status.SK.equals(incomingBatchHistory.getStatus())) {
263                     statisticManager.getStatistic(StatisticName.INCOMING_SKIP_BATCH_COUNT).increment();
264                 }
265             }
266         }
267     }
268 
269     public boolean loadData(IIncomingTransport transport) {
270         boolean inError = false;
271         List<IncomingBatchHistory> list = loadDataAndReturnBatches(transport);
272         if (list != null && list.size() > 0) {
273             for (IncomingBatchHistory incomingBatchHistory : list) {
274                 inError |= incomingBatchHistory.getStatus() != Status.OK;
275             }
276         } else {
277             inError = true;
278         }
279         return !inError;
280     }
281 
282     protected void loadBatch(final IDataLoader dataLoader, final IncomingBatch status,
283             final IncomingBatchHistory history) {
284         transactionTemplate.execute(new TransactionCallbackWithoutResult() {
285             public void doInTransactionWithoutResult(TransactionStatus transactionstatus) {
286                 try {
287                     dbDialect.disableSyncTriggers();
288                     if (incomingBatchService.acquireIncomingBatch(status)) {
289                         dataLoader.load();
290                     } else {
291                         history.setStatus(IncomingBatchHistory.Status.SK);
292                         dataLoader.skip();
293                     }
294                     history.setValues(dataLoader.getStatistics(), true);
295                     fireBatchComplete(dataLoader, history);
296                     incomingBatchService.insertIncomingBatchHistory(history);
297                 } catch (IOException e) {
298                     throw new TransportException(e);
299                 } finally {
300                     dbDialect.enableSyncTriggers();
301                 }
302             }
303         });
304     }
305 
306     private void fireBatchComplete(IDataLoader loader, IncomingBatchHistory history) {
307         if (batchListeners != null) {
308             long ts = System.currentTimeMillis();
309             for (IBatchListener listener : batchListeners) {
310                 listener.batchComplete(loader, history);
311             }
312             // update the filter milliseconds so batch listeners are also
313             // included
314             history.setFilterMillis(history.getFilterMillis() + (System.currentTimeMillis() - ts));
315         }
316     }
317 
318     protected void handleBatchError(final IncomingBatch status, final IncomingBatchHistory history) {
319         try {
320             if (!status.isRetry()) {
321                 status.setStatus(IncomingBatch.Status.ER);
322                 incomingBatchService.insertIncomingBatch(status);
323             }
324         } catch (Exception e) {
325             logger.error("Failed to record status of batch " + status.getNodeBatchId());
326         }
327         try {
328             history.setStatus(IncomingBatchHistory.Status.ER);
329             incomingBatchService.insertIncomingBatchHistory(history);
330         } catch (Exception e) {
331             logger.error("Failed to record history of batch " + status.getNodeBatchId());
332         }
333     }
334 
335     /***
336      * Load database from input stream and write acknowledgment to output
337      * stream. This is used for a "push" request with a response of an
338      * acknowledgment.
339      * 
340      * @param in
341      * @param out
342      * @throws IOException
343      */
344     public void loadData(InputStream in, OutputStream out) throws IOException {
345         List<IncomingBatchHistory> list = loadDataAndReturnBatches(new InternalIncomingTransport(in));
346         transportManager.writeAcknowledgement(out, list);
347     }
348 
349     public void setDataLoaderFilters(List<IDataLoaderFilter> filters) {
350         this.filters = filters;
351     }
352 
353     public void addDataLoaderFilter(IDataLoaderFilter filter) {
354         if (filters == null) {
355             filters = new ArrayList<IDataLoaderFilter>();
356         }
357         filters.add(filter);
358     }
359 
360     public void removeDataLoaderFilter(IDataLoaderFilter filter) {
361         filters.remove(filter);
362     }
363 
364     public void setTransportManager(ITransportManager remoteService) {
365         this.transportManager = remoteService;
366     }
367 
368     public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
369         this.transactionTemplate = transactionTemplate;
370     }
371 
372     public void setIncomingBatchService(IIncomingBatchService incomingBatchService) {
373         this.incomingBatchService = incomingBatchService;
374     }
375 
376     public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
377         this.beanFactory = beanFactory;
378     }
379 
380     public void setDbDialect(IDbDialect dbDialect) {
381         this.dbDialect = dbDialect;
382     }
383 
384     public void addColumnFilter(String tableName, IColumnFilter filter) {
385         this.columnFilters.put(tableName, filter);
386     }
387 
388     public void setStatisticManager(IStatisticManager statisticManager) {
389         this.statisticManager = statisticManager;
390     }
391 
392     public void addBatchListener(IBatchListener batchListener) {
393         if (this.batchListeners == null) {
394             this.batchListeners = new ArrayList<IBatchListener>();
395         }
396         this.batchListeners.add(batchListener);
397     }
398 
399     public void setBatchListeners(List<IBatchListener> batchListeners) {
400         this.batchListeners = batchListeners;
401     }
402 
403     public void setNodeService(INodeService nodeService) {
404         this.nodeService = nodeService;
405     }
406 
407 }