View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>,
5    *               Eric Long <erilong@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  package org.jumpmind.symmetric.service.jmx;
22  
23  import java.io.FileOutputStream;
24  import java.text.DateFormat;
25  import java.text.NumberFormat;
26  import java.util.Date;
27  import java.util.Map;
28  
29  import javax.sql.DataSource;
30  
31  import org.apache.commons.dbcp.BasicDataSource;
32  import org.apache.commons.lang.NotImplementedException;
33  import org.apache.commons.lang.StringUtils;
34  import org.jumpmind.symmetric.common.ParameterConstants;
35  import org.jumpmind.symmetric.model.Node;
36  import org.jumpmind.symmetric.service.IBootstrapService;
37  import org.jumpmind.symmetric.service.IClusterService;
38  import org.jumpmind.symmetric.service.IDataExtractorService;
39  import org.jumpmind.symmetric.service.IDataService;
40  import org.jumpmind.symmetric.service.INodeService;
41  import org.jumpmind.symmetric.service.IOutgoingBatchService;
42  import org.jumpmind.symmetric.service.IParameterService;
43  import org.jumpmind.symmetric.service.IPurgeService;
44  import org.jumpmind.symmetric.service.IRegistrationService;
45  import org.jumpmind.symmetric.statistic.IStatisticManager;
46  import org.jumpmind.symmetric.statistic.StatisticName;
47  import org.jumpmind.symmetric.transport.IConcurrentConnectionManager;
48  import org.jumpmind.symmetric.transport.IOutgoingTransport;
49  import org.jumpmind.symmetric.transport.ConcurrentConnectionManager.NodeConnectionStatistics;
50  import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport;
51  import org.springframework.jmx.export.annotation.ManagedAttribute;
52  import org.springframework.jmx.export.annotation.ManagedOperation;
53  import org.springframework.jmx.export.annotation.ManagedOperationParameter;
54  import org.springframework.jmx.export.annotation.ManagedOperationParameters;
55  import org.springframework.jmx.export.annotation.ManagedResource;
56  
57  @ManagedResource(description = "The management interface for a node")
58  public class NodeManagementService {
59  
60      private IBootstrapService bootstrapService;
61  
62      private IPurgeService purgeService;
63  
64      private INodeService nodeService;
65  
66      private IDataService dataService;
67  
68      private IOutgoingBatchService outgoingBatchService;
69  
70      private IRegistrationService registrationService;
71  
72      private IDataExtractorService dataExtractorService;
73  
74      private IClusterService clusterService;
75  
76      private IParameterService parameterService;
77  
78      private IConcurrentConnectionManager concurrentConnectionManager;
79  
80      private DataSource dataSource;
81  
82      IStatisticManager statisticManager;
83  
84      public void setStatisticManager(IStatisticManager statisticManager) {
85          this.statisticManager = statisticManager;
86      }
87  
88      @ManagedOperation(description = "Run the purge process")
89      public void purge() {
90          purgeService.purge();
91      }
92  
93      @ManagedOperation(description = "Synchronize the triggers")
94      public void syncTriggers() {
95          bootstrapService.syncTriggers();
96      }
97  
98      @ManagedAttribute(description = "Get the number of current connections allowed to this "
99              + "instance of the node via HTTP.  If this value is 20, then 20 concurrent push"
100             + " clients and 20 concurrent pull clients will be allowed")
101     public int getNumfNodeConnectionsPerInstance() {
102         return parameterService.getInt(ParameterConstants.CONCURRENT_WORKERS);
103     }
104 
105     @ManagedAttribute(description = "Get connection statistics about indivdual nodes")
106     public String getNodeConcurrencyStatisticsAsText() {
107         String lineFeed = "\n";
108         if (parameterService.getString(ParameterConstants.JMX_LINE_FEED).equals("html")) {
109             lineFeed = "</br>";
110         }
111         Map<String, Map<String, NodeConnectionStatistics>> stats = concurrentConnectionManager
112                 .getNodeConnectionStatisticsByPoolByNodeId();
113         StringBuilder out = new StringBuilder();
114         for (String pool : stats.keySet()) {
115             out
116             .append("-------------------------------------------------------------------------------------------------------------------------------");            out.append(lineFeed);
117             out.append("  CONNECTION TYPE: ");
118             out.append(pool);
119             out.append(lineFeed);
120             out
121                     .append("-------------------------------------------------------------------------------------------------------------------------------");
122             out.append(lineFeed);
123             out
124                     .append("             NODE ID             LAST CONNECT TIME      NUMBER OF CONNECTIONS     NUMBER OF REJECTIONS       AVG CONNECTED TIME");
125             out.append(lineFeed);
126             out
127             .append("-------------------------------------------------------------------------------------------------------------------------------");            out.append(lineFeed);
128             Map<String, NodeConnectionStatistics> nodeStats = stats.get(pool);
129             for (String nodeId : nodeStats.keySet()) {
130                 NodeConnectionStatistics nodeStat = nodeStats.get(nodeId);
131                 out.append(StringUtils.leftPad(nodeId, 20));
132                 out.append(StringUtils.leftPad(DateFormat.getDateTimeInstance(DateFormat.MEDIUM, DateFormat.MEDIUM).format(
133                         new Date(nodeStat.getLastConnectionTimeMs())), 30));
134                 out.append(StringUtils.leftPad(Long.toString(nodeStat.getTotalConnectionCount()), 27));
135                 out.append(StringUtils.leftPad(Integer.toString(nodeStat.getNumOfRejections()), 25));
136                 out.append(StringUtils.leftPad(NumberFormat.getIntegerInstance().format(
137                         nodeStat.getTotalConnectionTimeMs() / nodeStat.getTotalConnectionCount()), 25));
138             }
139             out.append(lineFeed);
140         }
141         return out.toString();
142     }
143 
144     public String getCurrentNodeConcurrencyReservationsAsText() {
145         throw new NotImplementedException();
146     }
147 
148     @ManagedAttribute(description = "Get a list of nodes that have been added to the white list, a list of node ids that always get through the concurrency manager.")
149     public String getNodesInWhiteList() {
150         StringBuilder ret = new StringBuilder();
151         String[] list = concurrentConnectionManager.getWhiteList();
152         for (String string : list) {
153             ret.append(string);
154             ret.append(",");
155         }
156         return ret.length() > 0 ? ret.substring(0, ret.length() - 1) : "";
157     }
158 
159     @ManagedOperation(description = "Add a node id to the list of nodes that will always get through the concurrency manager")
160     @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id to add to the white list") })
161     public void addNodeToWhiteList(String nodeId) {
162         concurrentConnectionManager.addToWhitelist(nodeId);
163     }
164 
165     @ManagedOperation(description = "Remove a node id to the list of nodes that will always get through the concurrency manager")
166     @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id to remove from the white list") })
167     public void removeNodeFromWhiteList(String nodeId) {
168         concurrentConnectionManager.removeFromWhiteList(nodeId);
169     }
170 
171     @ManagedAttribute(description = "Configure the number of connections allowed to this node."
172             + "  If the value is set to zero you are effectively disabling your transport"
173             + " (wihch can be useful for maintainance")
174     public void setNumOfNodeConnectionsPerInstance(int value) {
175         parameterService.saveParameter(ParameterConstants.CONCURRENT_WORKERS, value);
176     }
177 
178     @ManagedAttribute(description = "This is a count of nodes who connected to push or pull data and were rejected because the server was too busy")
179     public long getNumOfNodesWhoConnectedAndWereRejectedForInstanceLifetime() {
180         return statisticManager.getStatistic(StatisticName.NODE_CONCURRENCY_TOO_BUSY_COUNT).getLifetimeCount();
181     }
182 
183     @ManagedAttribute(description = "This is a count of the number of reservations that were handled by this instance")
184     public long getNumOfNodesWhoConnectedForInstanceLifetime() {
185         return statisticManager.getStatistic(StatisticName.NODE_CONCURRENCY_RESERVATION_REQUESTED).getLifetimeCount();
186     }
187 
188     @ManagedAttribute(description = "This is a count of the number of reservations that handed out by this instance")
189     public long getNumOfNodesWhoReservedConnectionsForInstanceLifetime() {
190         return statisticManager.getStatistic(StatisticName.NODE_CONCURRENCY_CONNECTION_RESERVED).getLifetimeCount();
191     }
192 
193     @ManagedAttribute(description = "The group this node belongs to")
194     public String getNodeGroupId() {
195         return parameterService.getNodeGroupId();
196     }
197 
198     @ManagedAttribute(description = "An external name given to this SymmetricDS node")
199     public String getExternalId() {
200         return parameterService.getExternalId();
201     }
202 
203     @ManagedAttribute(description = "The node id given to this SymmetricDS node")
204     public String getNodeId() {
205         Node node = nodeService.findIdentity();
206         if (node != null) {
207             return node.getNodeId();
208         } else {
209             return "?";
210         }
211     }
212 
213     @ManagedAttribute(description = "Whether the basic DataSource is being used as the default datasource.")
214     public boolean isBasicDataSource() {
215         return dataSource instanceof BasicDataSource;
216     }
217 
218     @ManagedAttribute(description = "If a BasicDataSource, then show the number of active connections")
219     public int getNumberOfActiveConnections() {
220         if (isBasicDataSource()) {
221             return ((BasicDataSource) dataSource).getNumActive();
222         } else {
223             return -1;
224         }
225     }
226 
227     @ManagedOperation(description = "Check to see if the external id is registered")
228     @ManagedOperationParameters( {
229             @ManagedOperationParameter(name = "nodeGroupId", description = "The node group id for a node"),
230             @ManagedOperationParameter(name = "externalId", description = "The external id for a node") })
231     public boolean isExternalIdRegistered(String nodeGroupdId, String externalId) {
232         return nodeService.isExternalIdRegistered(nodeGroupdId, externalId);
233     }
234 
235     @ManagedOperation(description = "Emergency remove all locks (if left abandoned on a cluster)")
236     public void clearAllLocks() {
237         clusterService.clearAllLocks();
238     }
239 
240     @ManagedOperation(description = "Check to see if the initial load for a node id is complete.  This method will throw an exception if the load error'd out or was never started.")
241     @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id") })
242     public boolean isInitialLoadComplete(String nodeId) {
243         return outgoingBatchService.isInitialLoadComplete(nodeId);
244     }
245 
246     @ManagedOperation(description = "Enable or disable synchronization completely for a node")
247     @ManagedOperationParameters( {
248             @ManagedOperationParameter(name = "nodeId", description = "The node to enable or disable"),
249             @ManagedOperationParameter(name = "syncEnabled", description = "true is enabled, false is disabled") })
250     public boolean setSyncEnabledForNode(String nodeId, boolean syncEnabled) {
251         Node node = nodeService.findNode(nodeId);
252         if (node != null) {
253             node.setSyncEnabled(syncEnabled);
254             nodeService.updateNode(node);
255             return true;
256         } else {
257             return false;
258         }
259     }
260 
261     @ManagedOperation(description = "Enable or disable a channel for a specific external id")
262     @ManagedOperationParameters( {
263             @ManagedOperationParameter(name = "ignore", description = "Set to true to enable and false to disable"),
264             @ManagedOperationParameter(name = "channelId", description = "The channel id to enable or disable"),
265             @ManagedOperationParameter(name = "nodeGroupId", description = "The node group id for a node"),
266             @ManagedOperationParameter(name = "externalId", description = "The external id for a node") })
267     public void ignoreNodeChannelForExternalId(boolean ignore, String channelId, String nodeGroupId, String externalId) {
268         nodeService.ignoreNodeChannelForExternalId(ignore, channelId, nodeGroupId, externalId);
269     }
270 
271     @ManagedOperation(description = "Open the registration for a node with the specified external id")
272     @ManagedOperationParameters( {
273             @ManagedOperationParameter(name = "nodeGroup", description = "The node group id this node will belong to"),
274             @ManagedOperationParameter(name = "externalId", description = "The external id for the node") })
275     public void openRegistration(String nodeGroupId, String externalId) {
276         Node node = nodeService.findNodeByExternalId(nodeGroupId, externalId);
277         if (node != null) {
278             registrationService.reOpenRegistration(node.getExternalId());
279         } else {
280             registrationService.openRegistration(nodeGroupId, externalId);
281         }
282     }
283 
284     @ManagedOperation(description = "Send an initial load of data to a node.")
285     @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id to reload.") })
286     public String reloadNode(String nodeId) {
287         return dataService.reloadNode(nodeId);
288     }
289 
290     @ManagedOperation(description = "Send a SQL event to a node.")
291     @ManagedOperationParameters( {
292             @ManagedOperationParameter(name = "nodeId", description = "The node id to sent the event to."),
293             @ManagedOperationParameter(name = "tableName", description = "The table name the SQL is for."),
294             @ManagedOperationParameter(name = "sql", description = "The SQL statement to send.") })
295     public String sendSQL(String nodeId, String tableName, String sql) {
296         return dataService.sendSQL(nodeId, tableName, sql);
297     }
298 
299     @ManagedOperation(description = "Send a delete and reload of a table to a node.")
300     @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id to reload."),
301             @ManagedOperationParameter(name = "tableName", description = "The table name to reload.") })
302     public String reloadTable(String nodeId, String tableName) {
303         return dataService.reloadTable(nodeId, tableName);
304     }
305 
306     @ManagedOperation(description = "Send a delete and reload of a table to a node.")
307     @ManagedOperationParameters( {
308             @ManagedOperationParameter(name = "nodeId", description = "The node id to reload."),
309             @ManagedOperationParameter(name = "tableName", description = "The table name to reload."),
310             @ManagedOperationParameter(name = "overrideInitialLoadSelect", description = "Override initial load select where-clause.") })
311     public String reloadTable(String nodeId, String tableName, String overrideInitialLoadSelect) {
312         return dataService.reloadTable(nodeId, tableName, overrideInitialLoadSelect);
313     }
314 
315     @ManagedOperation(description = "Write a range of batches to a file in SymmetricDS Data Format.")
316     @ManagedOperationParameters( {
317             @ManagedOperationParameter(name = "startBatchId", description = "Starting batch ID of range"),
318             @ManagedOperationParameter(name = "endBatchId", description = "Ending batch ID of range"),
319             @ManagedOperationParameter(name = "fileName", description = "File name to write batches") })
320     public void writeBatchRangeToFile(String startBatchId, String endBatchId, String fileName) throws Exception {
321         FileOutputStream out = new FileOutputStream(fileName);
322         IOutgoingTransport transport = new InternalOutgoingTransport(out);
323         dataExtractorService.extractBatchRange(transport, startBatchId, endBatchId);
324         transport.close();
325         out.close();
326     }
327 
328     public void setBootstrapService(IBootstrapService bootstrapService) {
329         this.bootstrapService = bootstrapService;
330     }
331 
332     public void setPurgeService(IPurgeService purgeService) {
333         this.purgeService = purgeService;
334     }
335 
336     public void setDataSource(DataSource dataSource) {
337         this.dataSource = dataSource;
338     }
339 
340     public void setDataService(IDataService dataService) {
341         this.dataService = dataService;
342     }
343 
344     public void setNodeService(INodeService nodeService) {
345         this.nodeService = nodeService;
346     }
347 
348     public void setRegistrationService(IRegistrationService registrationService) {
349         this.registrationService = registrationService;
350     }
351 
352     public void setOutgoingBatchService(IOutgoingBatchService outgoingBatchService) {
353         this.outgoingBatchService = outgoingBatchService;
354     }
355 
356     public void setDataExtractorService(IDataExtractorService dataExtractorService) {
357         this.dataExtractorService = dataExtractorService;
358     }
359 
360     public void setClusterService(IClusterService clusterService) {
361         this.clusterService = clusterService;
362     }
363 
364     public void setParameterService(IParameterService parameterService) {
365         this.parameterService = parameterService;
366     }
367 
368     public void setConcurrentConnectionManager(IConcurrentConnectionManager concurrentConnectionManager) {
369         this.concurrentConnectionManager = concurrentConnectionManager;
370     }
371 
372 }