1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.transport.http;
22
23 import java.io.BufferedReader;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.io.PrintWriter;
29 import java.net.HttpURLConnection;
30 import java.net.URL;
31 import java.util.List;
32 import java.util.zip.GZIPInputStream;
33
34 import org.apache.commons.lang.StringUtils;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.jumpmind.symmetric.common.Constants;
38 import org.jumpmind.symmetric.common.ParameterConstants;
39 import org.jumpmind.symmetric.model.IncomingBatchHistory;
40 import org.jumpmind.symmetric.model.Node;
41 import org.jumpmind.symmetric.model.NodeSecurity;
42 import org.jumpmind.symmetric.service.INodeService;
43 import org.jumpmind.symmetric.service.IParameterService;
44 import org.jumpmind.symmetric.transport.AbstractTransportManager;
45 import org.jumpmind.symmetric.transport.IIncomingTransport;
46 import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
47 import org.jumpmind.symmetric.transport.ITransportManager;
48 import org.jumpmind.symmetric.transport.TransportUtils;
49 import org.jumpmind.symmetric.web.WebConstants;
50
51 /***
52 * Allow remote communication to nodes, in order to push data, pull data, and
53 * send messages.
54 */
55 public class HttpTransportManager extends AbstractTransportManager implements ITransportManager {
56
57 protected static final Log logger = LogFactory.getLog(HttpTransportManager.class);
58
59 private INodeService nodeService;
60
61 private IParameterService parameterService;
62
63 public HttpTransportManager(INodeService nodeService, IParameterService paramService) {
64 this.parameterService = paramService;
65 this.nodeService = nodeService;
66 }
67
68 public boolean sendAcknowledgement(Node remote, List<IncomingBatchHistory> list, Node local) throws IOException {
69 if (list != null && list.size() > 0) {
70 String data = getAcknowledgementData(local.getNodeId(), list);
71 return sendMessage("ack", remote, local, data);
72 }
73 return true;
74 }
75
76 public void writeAcknowledgement(OutputStream out, List<IncomingBatchHistory> list) throws IOException {
77 writeMessage(out, getAcknowledgementData(nodeService.findIdentity().getNodeId(), list));
78 }
79
80 public boolean sendMessage(String action, Node remote, Node local, String data) throws IOException {
81 HttpURLConnection conn = sendMessage(new URL(buildURL(action, remote, local)), data);
82 return conn.getResponseCode() == HttpURLConnection.HTTP_OK;
83 }
84
85 protected HttpURLConnection sendMessage(URL url, String data) throws IOException {
86 HttpURLConnection conn = (HttpURLConnection) url.openConnection();
87 conn.setRequestMethod("POST");
88 conn.setAllowUserInteraction(false);
89 conn.setDoOutput(true);
90 int timeout = parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT);
91 conn.setConnectTimeout(timeout);
92 conn.setReadTimeout(timeout);
93 conn.setRequestProperty("Content-Length", Integer.toString(data.length()));
94 writeMessage(conn.getOutputStream(), data);
95 return conn;
96 }
97
98 public void writeMessage(OutputStream out, String data) throws IOException {
99 PrintWriter pw = new PrintWriter(new OutputStreamWriter(out, Constants.ENCODING), true);
100 pw.println(data);
101 pw.close();
102 }
103
104 public IIncomingTransport getPullTransport(Node remote, Node local) throws IOException {
105 return new HttpIncomingTransport(createGetConnectionFor(new URL(buildURL("pull", remote, local))));
106 }
107
108 public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local) throws IOException {
109 URL url = new URL(buildURL("push", remote, local));
110 int httpTimeout = parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT);
111 boolean useCompression = parameterService.is(ParameterConstants.TRANSPORT_HTTP_USE_COMPRESSION_CLIENT);
112 return new HttpOutgoingTransport(url, httpTimeout, useCompression);
113 }
114
115 public IIncomingTransport getRegisterTransport(Node node) throws IOException {
116 StringBuilder builder = new StringBuilder(parameterService.getRegistrationUrl() + "/registration?");
117 append(builder, WebConstants.NODE_GROUP_ID, node.getNodeGroupId());
118 append(builder, WebConstants.EXTERNAL_ID, node.getExternalId());
119 append(builder, WebConstants.SYNC_URL, node.getSyncURL());
120 append(builder, WebConstants.SCHEMA_VERSION, node.getSchemaVersion());
121 append(builder, WebConstants.DATABASE_TYPE, node.getDatabaseType());
122 append(builder, WebConstants.DATABASE_VERSION, node.getDatabaseVersion());
123 append(builder, WebConstants.SYMMETRIC_VERSION, node.getSymmetricVersion());
124 return new HttpIncomingTransport(createGetConnectionFor(new URL(builder.toString())));
125 }
126
127 private HttpURLConnection createGetConnectionFor(URL url) throws IOException {
128 HttpURLConnection conn = (HttpURLConnection) url.openConnection();
129 conn.setRequestProperty("accept-encoding", "gzip");
130 int httpTimeout = parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT);
131 conn.setConnectTimeout(httpTimeout);
132 conn.setReadTimeout(httpTimeout);
133 conn.setRequestMethod("GET");
134 return conn;
135 }
136
137 /***
138 * If the content is gzip'd, then uncompress.
139 */
140 public static BufferedReader getReaderFrom(HttpURLConnection connection) throws IOException {
141 String type = connection.getContentEncoding();
142 InputStream in = connection.getInputStream();
143 if (!StringUtils.isBlank(type) && type.equals("gzip")) {
144 in = new GZIPInputStream(in);
145 }
146 return TransportUtils.toReader(in);
147 }
148
149 /***
150 * Build a url for an action. Include the nodeid and the security token.
151 */
152 protected String buildURL(String action, Node remote, Node local) throws IOException {
153 return addSecurityToken((chooseURL(remote) + "/" + action), "&");
154 }
155
156 /***
157 * Build the url for remote node communication. Use the remote sync_url
158 * first, if it is null or blank, then use the registration url instead.
159 */
160 private String chooseURL(Node remote) {
161 if (StringUtils.isBlank(remote.getSyncURL()) || remote.getSyncURL().startsWith(Constants.PROTOCOL_NONE)) {
162 logger
163 .debug("Using the registration URL to contact the remote node because the syncURL for the node is blank.");
164 return parameterService.getRegistrationUrl();
165 } else {
166 return remote.getSyncURL();
167 }
168
169 }
170
171 private String addSecurityToken(String base, String connector) {
172 String nodeId = nodeService.findIdentity().getNodeId();
173 StringBuilder sb = new StringBuilder(addNodeId(base, nodeId, "?"));
174 sb.append(connector);
175 sb.append(WebConstants.SECURITY_TOKEN);
176 sb.append("=");
177 NodeSecurity security = nodeService.findNodeSecurity(nodeId);
178 String securityToken = "none";
179 if (security != null) {
180 securityToken = security.getPassword();
181 }
182 sb.append(securityToken);
183 return sb.toString();
184 }
185
186 private String addNodeId(String base, String nodeId, String connector) {
187 StringBuilder sb = new StringBuilder(base);
188 sb.append(connector);
189 sb.append(WebConstants.NODE_ID);
190 sb.append("=");
191 sb.append(nodeId);
192 return sb.toString();
193 }
194
195 }