1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.service.impl;
22
23 import java.io.BufferedReader;
24 import java.net.ConnectException;
25 import java.net.SocketException;
26 import java.util.List;
27
28 import org.apache.commons.lang.StringUtils;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.jumpmind.symmetric.common.ErrorConstants;
32 import org.jumpmind.symmetric.model.BatchInfo;
33 import org.jumpmind.symmetric.model.Node;
34 import org.jumpmind.symmetric.model.NodeSecurity;
35 import org.jumpmind.symmetric.service.IAcknowledgeService;
36 import org.jumpmind.symmetric.service.IDataExtractorService;
37 import org.jumpmind.symmetric.service.IDataService;
38 import org.jumpmind.symmetric.service.INodeService;
39 import org.jumpmind.symmetric.service.IPushService;
40 import org.jumpmind.symmetric.transport.AuthenticationException;
41 import org.jumpmind.symmetric.transport.ConnectionRejectedException;
42 import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
43 import org.jumpmind.symmetric.transport.ITransportManager;
44 import org.jumpmind.symmetric.transport.TransportException;
45
46 public class PushService extends AbstractService implements IPushService {
47
48 private static final Log logger = LogFactory.getLog(PushService.class);
49
50 private IDataExtractorService extractor;
51
52 private IAcknowledgeService ackService;
53
54 private ITransportManager transportManager;
55
56 private INodeService nodeService;
57
58 private IDataService dataService;
59
60 public void pushData() {
61 List<Node> nodes = nodeService.findNodesToPushTo();
62 if (nodes != null && nodes.size() > 0) {
63 for (Node node : nodes) {
64 logger.info("Push requested for " + node);
65 if (pushToNode(node)) {
66 logger.info("Push completed for " + node);
67 } else {
68 logger.info("Push unsuccessful for " + node);
69 }
70 }
71 }
72 }
73
74 private boolean pushToNode(Node remote) {
75 IOutgoingWithResponseTransport transport = null;
76 boolean success = false;
77 try {
78 NodeSecurity nodeSecurity = nodeService.findNodeSecurity(remote.getNodeId());
79 if (nodeSecurity != null) {
80 if (nodeSecurity.isInitialLoadEnabled()) {
81 dataService.insertReloadEvent(remote);
82 }
83 }
84
85 transport = transportManager.getPushTransport(remote, nodeService.findIdentity());
86
87 if (extractor.extract(remote, transport)) {
88 logger.info("Push data sent to " + remote);
89 BufferedReader reader = transport.readResponse();
90 String ackString = reader.readLine();
91 String ackExtendedString = reader.readLine();
92
93 if (logger.isDebugEnabled()) {
94 logger.debug("Reading ack: " + ackString);
95 logger.debug("Reading extended ack: " + ackExtendedString);
96 }
97
98 if (StringUtils.isBlank(ackString)) {
99 logger.error("Did not receive an acknowledgement for the batches sent.");
100 }
101
102 List<BatchInfo> batches = transportManager.readAcknowledgement(ackString, ackExtendedString);
103
104 for (BatchInfo batchInfo : batches) {
105 if (logger.isDebugEnabled()) {
106 logger.debug("Saving ack: " + batchInfo.getBatchId() + ", "
107 + (batchInfo.isOk() ? "OK" : "error"));
108 }
109 ackService.ack(batchInfo);
110 }
111 }
112 success = true;
113 } catch (ConnectException ex) {
114 logger.warn(ErrorConstants.COULD_NOT_CONNECT_TO_TRANSPORT + " url=" + remote.getSyncURL());
115 } catch (ConnectionRejectedException ex) {
116 logger.warn(ErrorConstants.TRANSPORT_REJECTED_CONNECTION);
117 } catch (SocketException ex) {
118 logger.warn(ex.getMessage());
119 } catch (TransportException ex) {
120 logger.warn(ex.getMessage());
121 } catch (AuthenticationException ex) {
122 logger.warn(ErrorConstants.NOT_AUTHENTICATED);
123 } catch (Exception e) {
124
125
126 logger.error(e, e);
127 } finally {
128 try {
129 transport.close();
130 } catch (Exception e) {
131 }
132 }
133 return success;
134 }
135
136 public void setExtractor(IDataExtractorService extractor) {
137 this.extractor = extractor;
138 }
139
140 public void setTransportManager(ITransportManager tm) {
141 this.transportManager = tm;
142 }
143
144 public void setNodeService(INodeService nodeService) {
145 this.nodeService = nodeService;
146 }
147
148 public void setAckService(IAcknowledgeService ackService) {
149 this.ackService = ackService;
150 }
151
152 public void setDataService(IDataService dataService) {
153 this.dataService = dataService;
154 }
155 }