View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.jumpmind.symmetric.service.impl;
22  
23  import java.io.BufferedReader;
24  import java.net.ConnectException;
25  import java.net.SocketException;
26  import java.util.List;
27  
28  import org.apache.commons.lang.StringUtils;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.jumpmind.symmetric.common.ErrorConstants;
32  import org.jumpmind.symmetric.model.BatchInfo;
33  import org.jumpmind.symmetric.model.Node;
34  import org.jumpmind.symmetric.model.NodeSecurity;
35  import org.jumpmind.symmetric.service.IAcknowledgeService;
36  import org.jumpmind.symmetric.service.IDataExtractorService;
37  import org.jumpmind.symmetric.service.IDataService;
38  import org.jumpmind.symmetric.service.INodeService;
39  import org.jumpmind.symmetric.service.IPushService;
40  import org.jumpmind.symmetric.transport.AuthenticationException;
41  import org.jumpmind.symmetric.transport.ConnectionRejectedException;
42  import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
43  import org.jumpmind.symmetric.transport.ITransportManager;
44  import org.jumpmind.symmetric.transport.TransportException;
45  
46  public class PushService extends AbstractService implements IPushService {
47  
48      private static final Log logger = LogFactory.getLog(PushService.class);
49  
50      private IDataExtractorService extractor;
51  
52      private IAcknowledgeService ackService;
53  
54      private ITransportManager transportManager;
55  
56      private INodeService nodeService;
57  
58      private IDataService dataService;
59  
60      public void pushData() {
61          List<Node> nodes = nodeService.findNodesToPushTo();
62          if (nodes != null && nodes.size() > 0) {
63              for (Node node : nodes) {
64                  logger.info("Push requested for " + node);
65                  if (pushToNode(node)) {
66                      logger.info("Push completed for " + node);
67                  } else {
68                      logger.info("Push unsuccessful for " + node);
69                  }
70              }
71          }
72      }
73  
74      private boolean pushToNode(Node remote) {
75          IOutgoingWithResponseTransport transport = null;
76          boolean success = false;
77          try {
78              NodeSecurity nodeSecurity = nodeService.findNodeSecurity(remote.getNodeId());
79              if (nodeSecurity != null) {
80                  if (nodeSecurity.isInitialLoadEnabled()) {
81                      dataService.insertReloadEvent(remote);
82                  }
83              }
84  
85              transport = transportManager.getPushTransport(remote, nodeService.findIdentity());
86  
87              if (extractor.extract(remote, transport)) {
88                  logger.info("Push data sent to " + remote);
89                  BufferedReader reader = transport.readResponse();
90                  String ackString = reader.readLine();
91                  String ackExtendedString = reader.readLine();
92  
93                  if (logger.isDebugEnabled()) {
94                      logger.debug("Reading ack: " + ackString);
95                      logger.debug("Reading extended ack: " + ackExtendedString);
96                  }
97                  
98                  if (StringUtils.isBlank(ackString)) {
99                      logger.error("Did not receive an acknowledgement for the batches sent.");
100                 }
101 
102                 List<BatchInfo> batches = transportManager.readAcknowledgement(ackString, ackExtendedString);
103 
104                 for (BatchInfo batchInfo : batches) {
105                     if (logger.isDebugEnabled()) {
106                         logger.debug("Saving ack: " + batchInfo.getBatchId() + ", "
107                                 + (batchInfo.isOk() ? "OK" : "error"));
108                     }
109                     ackService.ack(batchInfo);
110                 }
111             }
112             success = true;
113         } catch (ConnectException ex) {
114             logger.warn(ErrorConstants.COULD_NOT_CONNECT_TO_TRANSPORT + " url=" + remote.getSyncURL());
115         } catch (ConnectionRejectedException ex) {
116             logger.warn(ErrorConstants.TRANSPORT_REJECTED_CONNECTION);
117         } catch (SocketException ex) {
118             logger.warn(ex.getMessage());
119         } catch (TransportException ex) {
120             logger.warn(ex.getMessage());
121         } catch (AuthenticationException ex) {
122             logger.warn(ErrorConstants.NOT_AUTHENTICATED);
123         } catch (Exception e) {
124             // just report the error because we want to push to other nodes
125             // in our list
126             logger.error(e, e);
127         } finally {
128             try {
129                 transport.close();
130             } catch (Exception e) {
131             }
132         }
133         return success;
134     }
135 
136     public void setExtractor(IDataExtractorService extractor) {
137         this.extractor = extractor;
138     }
139 
140     public void setTransportManager(ITransportManager tm) {
141         this.transportManager = tm;
142     }
143 
144     public void setNodeService(INodeService nodeService) {
145         this.nodeService = nodeService;
146     }
147 
148     public void setAckService(IAcknowledgeService ackService) {
149         this.ackService = ackService;
150     }
151 
152     public void setDataService(IDataService dataService) {
153         this.dataService = dataService;
154     }
155 }