View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.jumpmind.symmetric.transport.http;
22  
23  import java.io.BufferedReader;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.OutputStream;
27  import java.io.OutputStreamWriter;
28  import java.io.PrintWriter;
29  import java.net.HttpURLConnection;
30  import java.net.URL;
31  import java.util.List;
32  import java.util.zip.GZIPInputStream;
33  
34  import org.apache.commons.lang.StringUtils;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.jumpmind.symmetric.common.Constants;
38  import org.jumpmind.symmetric.common.ParameterConstants;
39  import org.jumpmind.symmetric.model.IncomingBatchHistory;
40  import org.jumpmind.symmetric.model.Node;
41  import org.jumpmind.symmetric.model.NodeSecurity;
42  import org.jumpmind.symmetric.service.INodeService;
43  import org.jumpmind.symmetric.service.IParameterService;
44  import org.jumpmind.symmetric.transport.AbstractTransportManager;
45  import org.jumpmind.symmetric.transport.IIncomingTransport;
46  import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
47  import org.jumpmind.symmetric.transport.ITransportManager;
48  import org.jumpmind.symmetric.transport.TransportUtils;
49  import org.jumpmind.symmetric.web.WebConstants;
50  
51  /***
52   * Allow remote communication to nodes, in order to push data, pull data, and
53   * send messages.
54   */
55  public class HttpTransportManager extends AbstractTransportManager implements ITransportManager {
56  
57      protected static final Log logger = LogFactory.getLog(HttpTransportManager.class);
58  
59      private INodeService nodeService;
60  
61      private IParameterService parameterService;
62  
63      public HttpTransportManager(INodeService nodeService, IParameterService paramService) {
64          this.parameterService = paramService;
65          this.nodeService = nodeService;
66      }
67  
68      public boolean sendAcknowledgement(Node remote, List<IncomingBatchHistory> list, Node local) throws IOException {
69          if (list != null && list.size() > 0) {
70              String data = getAcknowledgementData(local.getNodeId(), list);
71              return sendMessage("ack", remote, local, data);
72          }
73          return true;
74      }
75  
76      public void writeAcknowledgement(OutputStream out, List<IncomingBatchHistory> list) throws IOException {        
77          writeMessage(out, getAcknowledgementData(nodeService.findIdentity().getNodeId(), list));
78      }
79  
80      public boolean sendMessage(String action, Node remote, Node local, String data) throws IOException {
81          HttpURLConnection conn = sendMessage(new URL(buildURL(action, remote, local)), data);
82          return conn.getResponseCode() == HttpURLConnection.HTTP_OK;
83      }
84  
85      protected HttpURLConnection sendMessage(URL url, String data) throws IOException {
86          HttpURLConnection conn = (HttpURLConnection) url.openConnection();
87          conn.setRequestMethod("POST");
88          conn.setAllowUserInteraction(false);
89          conn.setDoOutput(true);
90          int timeout = parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT);
91          conn.setConnectTimeout(timeout);
92          conn.setReadTimeout(timeout);
93          conn.setRequestProperty("Content-Length", Integer.toString(data.length()));
94          writeMessage(conn.getOutputStream(), data);
95          return conn;
96      }
97  
98      public void writeMessage(OutputStream out, String data) throws IOException {
99          PrintWriter pw = new PrintWriter(new OutputStreamWriter(out, Constants.ENCODING), true);
100         pw.println(data);
101         pw.close();
102     }
103 
104     public IIncomingTransport getPullTransport(Node remote, Node local) throws IOException {
105         return new HttpIncomingTransport(createGetConnectionFor(new URL(buildURL("pull", remote, local))));
106     }
107 
108     public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local) throws IOException {
109         URL url = new URL(buildURL("push", remote, local));
110         int httpTimeout = parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT);
111         boolean useCompression = parameterService.is(ParameterConstants.TRANSPORT_HTTP_USE_COMPRESSION_CLIENT);
112         return new HttpOutgoingTransport(url, httpTimeout, useCompression);
113     }
114 
115     public IIncomingTransport getRegisterTransport(Node node) throws IOException {
116         StringBuilder builder = new StringBuilder(parameterService.getRegistrationUrl() + "/registration?");
117         append(builder, WebConstants.NODE_GROUP_ID, node.getNodeGroupId());
118         append(builder, WebConstants.EXTERNAL_ID, node.getExternalId());
119         append(builder, WebConstants.SYNC_URL, node.getSyncURL());
120         append(builder, WebConstants.SCHEMA_VERSION, node.getSchemaVersion());
121         append(builder, WebConstants.DATABASE_TYPE, node.getDatabaseType());
122         append(builder, WebConstants.DATABASE_VERSION, node.getDatabaseVersion());
123         append(builder, WebConstants.SYMMETRIC_VERSION, node.getSymmetricVersion());
124         return new HttpIncomingTransport(createGetConnectionFor(new URL(builder.toString())));
125     }
126 
127     private HttpURLConnection createGetConnectionFor(URL url) throws IOException {
128         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
129         conn.setRequestProperty("accept-encoding", "gzip");
130         int httpTimeout = parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT);
131         conn.setConnectTimeout(httpTimeout);
132         conn.setReadTimeout(httpTimeout);
133         conn.setRequestMethod("GET");
134         return conn;
135     }
136 
137     /***
138      * If the content is gzip'd, then uncompress.
139      */
140     public static BufferedReader getReaderFrom(HttpURLConnection connection) throws IOException {
141         String type = connection.getContentEncoding();
142         InputStream in = connection.getInputStream();
143         if (!StringUtils.isBlank(type) && type.equals("gzip")) {
144             in = new GZIPInputStream(in);
145         }
146         return TransportUtils.toReader(in);
147     }
148 
149     /***
150      * Build a url for an action. Include the nodeid and the security token.
151      */
152     protected String buildURL(String action, Node remote, Node local) throws IOException {
153         return addSecurityToken((chooseURL(remote) + "/" + action), "&");
154     }
155 
156     /***
157      * Build the url for remote node communication. Use the remote sync_url
158      * first, if it is null or blank, then use the registration url instead.
159      */
160     private String chooseURL(Node remote) {
161         if (StringUtils.isBlank(remote.getSyncURL()) || remote.getSyncURL().startsWith(Constants.PROTOCOL_NONE)) {
162             logger
163                     .debug("Using the registration URL to contact the remote node because the syncURL for the node is blank.");
164             return parameterService.getRegistrationUrl();
165         } else {
166             return remote.getSyncURL();
167         }
168 
169     }
170 
171     private String addSecurityToken(String base, String connector) {
172         String nodeId = nodeService.findIdentity().getNodeId();
173         StringBuilder sb = new StringBuilder(addNodeId(base, nodeId, "?"));
174         sb.append(connector);
175         sb.append(WebConstants.SECURITY_TOKEN);
176         sb.append("=");
177         NodeSecurity security = nodeService.findNodeSecurity(nodeId);
178         String securityToken = "none";
179         if (security != null) {
180             securityToken = security.getPassword();
181         }
182         sb.append(securityToken);
183         return sb.toString();
184     }
185 
186     private String addNodeId(String base, String nodeId, String connector) {
187         StringBuilder sb = new StringBuilder(base);
188         sb.append(connector);
189         sb.append(WebConstants.NODE_ID);
190         sb.append("=");
191         sb.append(nodeId);
192         return sb.toString();
193     }
194 
195 }