1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.service.jmx;
22
23 import java.io.FileOutputStream;
24 import java.text.DateFormat;
25 import java.text.NumberFormat;
26 import java.util.Date;
27 import java.util.Map;
28
29 import javax.sql.DataSource;
30
31 import org.apache.commons.dbcp.BasicDataSource;
32 import org.apache.commons.lang.NotImplementedException;
33 import org.apache.commons.lang.StringUtils;
34 import org.jumpmind.symmetric.common.ParameterConstants;
35 import org.jumpmind.symmetric.model.Node;
36 import org.jumpmind.symmetric.service.IBootstrapService;
37 import org.jumpmind.symmetric.service.IClusterService;
38 import org.jumpmind.symmetric.service.IDataExtractorService;
39 import org.jumpmind.symmetric.service.IDataService;
40 import org.jumpmind.symmetric.service.INodeService;
41 import org.jumpmind.symmetric.service.IOutgoingBatchService;
42 import org.jumpmind.symmetric.service.IParameterService;
43 import org.jumpmind.symmetric.service.IPurgeService;
44 import org.jumpmind.symmetric.service.IRegistrationService;
45 import org.jumpmind.symmetric.statistic.IStatisticManager;
46 import org.jumpmind.symmetric.statistic.StatisticName;
47 import org.jumpmind.symmetric.transport.IConcurrentConnectionManager;
48 import org.jumpmind.symmetric.transport.IOutgoingTransport;
49 import org.jumpmind.symmetric.transport.ConcurrentConnectionManager.NodeConnectionStatistics;
50 import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport;
51 import org.springframework.jmx.export.annotation.ManagedAttribute;
52 import org.springframework.jmx.export.annotation.ManagedOperation;
53 import org.springframework.jmx.export.annotation.ManagedOperationParameter;
54 import org.springframework.jmx.export.annotation.ManagedOperationParameters;
55 import org.springframework.jmx.export.annotation.ManagedResource;
56
57 @ManagedResource(description = "The management interface for a node")
58 public class NodeManagementService {
59
60 private IBootstrapService bootstrapService;
61
62 private IPurgeService purgeService;
63
64 private INodeService nodeService;
65
66 private IDataService dataService;
67
68 private IOutgoingBatchService outgoingBatchService;
69
70 private IRegistrationService registrationService;
71
72 private IDataExtractorService dataExtractorService;
73
74 private IClusterService clusterService;
75
76 private IParameterService parameterService;
77
78 private IConcurrentConnectionManager concurrentConnectionManager;
79
80 private DataSource dataSource;
81
82 IStatisticManager statisticManager;
83
84 public void setStatisticManager(IStatisticManager statisticManager) {
85 this.statisticManager = statisticManager;
86 }
87
88 @ManagedOperation(description = "Run the purge process")
89 public void purge() {
90 purgeService.purge();
91 }
92
93 @ManagedOperation(description = "Synchronize the triggers")
94 public void syncTriggers() {
95 bootstrapService.syncTriggers();
96 }
97
98 @ManagedAttribute(description = "Get the number of current connections allowed to this "
99 + "instance of the node via HTTP. If this value is 20, then 20 concurrent push"
100 + " clients and 20 concurrent pull clients will be allowed")
101 public int getNumfNodeConnectionsPerInstance() {
102 return parameterService.getInt(ParameterConstants.CONCURRENT_WORKERS);
103 }
104
105 @ManagedAttribute(description = "Get connection statistics about indivdual nodes")
106 public String getNodeConcurrencyStatisticsAsText() {
107 String lineFeed = "\n";
108 if (parameterService.getString(ParameterConstants.JMX_LINE_FEED).equals("html")) {
109 lineFeed = "</br>";
110 }
111 Map<String, Map<String, NodeConnectionStatistics>> stats = concurrentConnectionManager
112 .getNodeConnectionStatisticsByPoolByNodeId();
113 StringBuilder out = new StringBuilder();
114 for (String pool : stats.keySet()) {
115 out
116 .append("-------------------------------------------------------------------------------------------------------------------------------"); out.append(lineFeed);
117 out.append(" CONNECTION TYPE: ");
118 out.append(pool);
119 out.append(lineFeed);
120 out
121 .append("-------------------------------------------------------------------------------------------------------------------------------");
122 out.append(lineFeed);
123 out
124 .append(" NODE ID LAST CONNECT TIME NUMBER OF CONNECTIONS NUMBER OF REJECTIONS AVG CONNECTED TIME");
125 out.append(lineFeed);
126 out
127 .append("-------------------------------------------------------------------------------------------------------------------------------"); out.append(lineFeed);
128 Map<String, NodeConnectionStatistics> nodeStats = stats.get(pool);
129 for (String nodeId : nodeStats.keySet()) {
130 NodeConnectionStatistics nodeStat = nodeStats.get(nodeId);
131 out.append(StringUtils.leftPad(nodeId, 20));
132 out.append(StringUtils.leftPad(DateFormat.getDateTimeInstance(DateFormat.MEDIUM, DateFormat.MEDIUM).format(
133 new Date(nodeStat.getLastConnectionTimeMs())), 30));
134 out.append(StringUtils.leftPad(Long.toString(nodeStat.getTotalConnectionCount()), 27));
135 out.append(StringUtils.leftPad(Integer.toString(nodeStat.getNumOfRejections()), 25));
136 out.append(StringUtils.leftPad(NumberFormat.getIntegerInstance().format(
137 nodeStat.getTotalConnectionTimeMs() / nodeStat.getTotalConnectionCount()), 25));
138 }
139 out.append(lineFeed);
140 }
141 return out.toString();
142 }
143
144 public String getCurrentNodeConcurrencyReservationsAsText() {
145 throw new NotImplementedException();
146 }
147
148 @ManagedAttribute(description = "Get a list of nodes that have been added to the white list, a list of node ids that always get through the concurrency manager.")
149 public String getNodesInWhiteList() {
150 StringBuilder ret = new StringBuilder();
151 String[] list = concurrentConnectionManager.getWhiteList();
152 for (String string : list) {
153 ret.append(string);
154 ret.append(",");
155 }
156 return ret.length() > 0 ? ret.substring(0, ret.length() - 1) : "";
157 }
158
159 @ManagedOperation(description = "Add a node id to the list of nodes that will always get through the concurrency manager")
160 @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id to add to the white list") })
161 public void addNodeToWhiteList(String nodeId) {
162 concurrentConnectionManager.addToWhitelist(nodeId);
163 }
164
165 @ManagedOperation(description = "Remove a node id to the list of nodes that will always get through the concurrency manager")
166 @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id to remove from the white list") })
167 public void removeNodeFromWhiteList(String nodeId) {
168 concurrentConnectionManager.removeFromWhiteList(nodeId);
169 }
170
171 @ManagedAttribute(description = "Configure the number of connections allowed to this node."
172 + " If the value is set to zero you are effectively disabling your transport"
173 + " (wihch can be useful for maintainance")
174 public void setNumOfNodeConnectionsPerInstance(int value) {
175 parameterService.saveParameter(ParameterConstants.CONCURRENT_WORKERS, value);
176 }
177
178 @ManagedAttribute(description = "This is a count of nodes who connected to push or pull data and were rejected because the server was too busy")
179 public long getNumOfNodesWhoConnectedAndWereRejectedForInstanceLifetime() {
180 return statisticManager.getStatistic(StatisticName.NODE_CONCURRENCY_TOO_BUSY_COUNT).getLifetimeCount();
181 }
182
183 @ManagedAttribute(description = "This is a count of the number of reservations that were handled by this instance")
184 public long getNumOfNodesWhoConnectedForInstanceLifetime() {
185 return statisticManager.getStatistic(StatisticName.NODE_CONCURRENCY_RESERVATION_REQUESTED).getLifetimeCount();
186 }
187
188 @ManagedAttribute(description = "This is a count of the number of reservations that handed out by this instance")
189 public long getNumOfNodesWhoReservedConnectionsForInstanceLifetime() {
190 return statisticManager.getStatistic(StatisticName.NODE_CONCURRENCY_CONNECTION_RESERVED).getLifetimeCount();
191 }
192
193 @ManagedAttribute(description = "The group this node belongs to")
194 public String getNodeGroupId() {
195 return parameterService.getNodeGroupId();
196 }
197
198 @ManagedAttribute(description = "An external name given to this SymmetricDS node")
199 public String getExternalId() {
200 return parameterService.getExternalId();
201 }
202
203 @ManagedAttribute(description = "The node id given to this SymmetricDS node")
204 public String getNodeId() {
205 Node node = nodeService.findIdentity();
206 if (node != null) {
207 return node.getNodeId();
208 } else {
209 return "?";
210 }
211 }
212
213 @ManagedAttribute(description = "Whether the basic DataSource is being used as the default datasource.")
214 public boolean isBasicDataSource() {
215 return dataSource instanceof BasicDataSource;
216 }
217
218 @ManagedAttribute(description = "If a BasicDataSource, then show the number of active connections")
219 public int getNumberOfActiveConnections() {
220 if (isBasicDataSource()) {
221 return ((BasicDataSource) dataSource).getNumActive();
222 } else {
223 return -1;
224 }
225 }
226
227 @ManagedOperation(description = "Check to see if the external id is registered")
228 @ManagedOperationParameters( {
229 @ManagedOperationParameter(name = "nodeGroupId", description = "The node group id for a node"),
230 @ManagedOperationParameter(name = "externalId", description = "The external id for a node") })
231 public boolean isExternalIdRegistered(String nodeGroupdId, String externalId) {
232 return nodeService.isExternalIdRegistered(nodeGroupdId, externalId);
233 }
234
235 @ManagedOperation(description = "Emergency remove all locks (if left abandoned on a cluster)")
236 public void clearAllLocks() {
237 clusterService.clearAllLocks();
238 }
239
240 @ManagedOperation(description = "Check to see if the initial load for a node id is complete. This method will throw an exception if the load error'd out or was never started.")
241 @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id") })
242 public boolean isInitialLoadComplete(String nodeId) {
243 return outgoingBatchService.isInitialLoadComplete(nodeId);
244 }
245
246 @ManagedOperation(description = "Enable or disable synchronization completely for a node")
247 @ManagedOperationParameters( {
248 @ManagedOperationParameter(name = "nodeId", description = "The node to enable or disable"),
249 @ManagedOperationParameter(name = "syncEnabled", description = "true is enabled, false is disabled") })
250 public boolean setSyncEnabledForNode(String nodeId, boolean syncEnabled) {
251 Node node = nodeService.findNode(nodeId);
252 if (node != null) {
253 node.setSyncEnabled(syncEnabled);
254 nodeService.updateNode(node);
255 return true;
256 } else {
257 return false;
258 }
259 }
260
261 @ManagedOperation(description = "Enable or disable a channel for a specific external id")
262 @ManagedOperationParameters( {
263 @ManagedOperationParameter(name = "ignore", description = "Set to true to enable and false to disable"),
264 @ManagedOperationParameter(name = "channelId", description = "The channel id to enable or disable"),
265 @ManagedOperationParameter(name = "nodeGroupId", description = "The node group id for a node"),
266 @ManagedOperationParameter(name = "externalId", description = "The external id for a node") })
267 public void ignoreNodeChannelForExternalId(boolean ignore, String channelId, String nodeGroupId, String externalId) {
268 nodeService.ignoreNodeChannelForExternalId(ignore, channelId, nodeGroupId, externalId);
269 }
270
271 @ManagedOperation(description = "Open the registration for a node with the specified external id")
272 @ManagedOperationParameters( {
273 @ManagedOperationParameter(name = "nodeGroup", description = "The node group id this node will belong to"),
274 @ManagedOperationParameter(name = "externalId", description = "The external id for the node") })
275 public void openRegistration(String nodeGroupId, String externalId) {
276 Node node = nodeService.findNodeByExternalId(nodeGroupId, externalId);
277 if (node != null) {
278 registrationService.reOpenRegistration(node.getExternalId());
279 } else {
280 registrationService.openRegistration(nodeGroupId, externalId);
281 }
282 }
283
284 @ManagedOperation(description = "Send an initial load of data to a node.")
285 @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id to reload.") })
286 public String reloadNode(String nodeId) {
287 return dataService.reloadNode(nodeId);
288 }
289
290 @ManagedOperation(description = "Send a SQL event to a node.")
291 @ManagedOperationParameters( {
292 @ManagedOperationParameter(name = "nodeId", description = "The node id to sent the event to."),
293 @ManagedOperationParameter(name = "tableName", description = "The table name the SQL is for."),
294 @ManagedOperationParameter(name = "sql", description = "The SQL statement to send.") })
295 public String sendSQL(String nodeId, String tableName, String sql) {
296 return dataService.sendSQL(nodeId, tableName, sql);
297 }
298
299 @ManagedOperation(description = "Send a delete and reload of a table to a node.")
300 @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id to reload."),
301 @ManagedOperationParameter(name = "tableName", description = "The table name to reload.") })
302 public String reloadTable(String nodeId, String tableName) {
303 return dataService.reloadTable(nodeId, tableName);
304 }
305
306 @ManagedOperation(description = "Send a delete and reload of a table to a node.")
307 @ManagedOperationParameters( {
308 @ManagedOperationParameter(name = "nodeId", description = "The node id to reload."),
309 @ManagedOperationParameter(name = "tableName", description = "The table name to reload."),
310 @ManagedOperationParameter(name = "overrideInitialLoadSelect", description = "Override initial load select where-clause.") })
311 public String reloadTable(String nodeId, String tableName, String overrideInitialLoadSelect) {
312 return dataService.reloadTable(nodeId, tableName, overrideInitialLoadSelect);
313 }
314
315 @ManagedOperation(description = "Write a range of batches to a file in SymmetricDS Data Format.")
316 @ManagedOperationParameters( {
317 @ManagedOperationParameter(name = "startBatchId", description = "Starting batch ID of range"),
318 @ManagedOperationParameter(name = "endBatchId", description = "Ending batch ID of range"),
319 @ManagedOperationParameter(name = "fileName", description = "File name to write batches") })
320 public void writeBatchRangeToFile(String startBatchId, String endBatchId, String fileName) throws Exception {
321 FileOutputStream out = new FileOutputStream(fileName);
322 IOutgoingTransport transport = new InternalOutgoingTransport(out);
323 dataExtractorService.extractBatchRange(transport, startBatchId, endBatchId);
324 transport.close();
325 out.close();
326 }
327
328 public void setBootstrapService(IBootstrapService bootstrapService) {
329 this.bootstrapService = bootstrapService;
330 }
331
332 public void setPurgeService(IPurgeService purgeService) {
333 this.purgeService = purgeService;
334 }
335
336 public void setDataSource(DataSource dataSource) {
337 this.dataSource = dataSource;
338 }
339
340 public void setDataService(IDataService dataService) {
341 this.dataService = dataService;
342 }
343
344 public void setNodeService(INodeService nodeService) {
345 this.nodeService = nodeService;
346 }
347
348 public void setRegistrationService(IRegistrationService registrationService) {
349 this.registrationService = registrationService;
350 }
351
352 public void setOutgoingBatchService(IOutgoingBatchService outgoingBatchService) {
353 this.outgoingBatchService = outgoingBatchService;
354 }
355
356 public void setDataExtractorService(IDataExtractorService dataExtractorService) {
357 this.dataExtractorService = dataExtractorService;
358 }
359
360 public void setClusterService(IClusterService clusterService) {
361 this.clusterService = clusterService;
362 }
363
364 public void setParameterService(IParameterService parameterService) {
365 this.parameterService = parameterService;
366 }
367
368 public void setConcurrentConnectionManager(IConcurrentConnectionManager concurrentConnectionManager) {
369 this.concurrentConnectionManager = concurrentConnectionManager;
370 }
371
372 }