1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.service.impl;
22
23 import java.util.Calendar;
24 import java.util.Date;
25 import java.util.List;
26
27 import org.apache.commons.lang.time.DateUtils;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.jumpmind.symmetric.common.ParameterConstants;
31 import org.jumpmind.symmetric.model.Node;
32 import org.jumpmind.symmetric.service.IClusterService;
33 import org.jumpmind.symmetric.service.INodeService;
34 import org.jumpmind.symmetric.service.LockAction;
35 import org.jumpmind.symmetric.util.AppUtils;
36 import org.springframework.dao.DataIntegrityViolationException;
37 import org.springframework.transaction.annotation.Propagation;
38 import org.springframework.transaction.annotation.Transactional;
39
40 public class ClusterService extends AbstractService implements IClusterService {
41
42 protected static final Log logger = LogFactory.getLog(ClusterService.class);
43
44 protected static final String COMMON_LOCK_ID = "common";
45
46 private INodeService nodeService;
47
48 public void initLockTable() {
49 initLockTableForNodes(nodeService.findNodesToPull());
50 initLockTableForNodes(nodeService.findNodesToPushTo());
51 initLockTable(LockAction.PURGE_INCOMING, COMMON_LOCK_ID);
52 initLockTable(LockAction.PURGE_OUTGOING, COMMON_LOCK_ID);
53 initLockTable(LockAction.PURGE_STATISTICS, COMMON_LOCK_ID);
54 initLockTable(LockAction.SYNCTRIGGERS, COMMON_LOCK_ID);
55 }
56
57 private void initLockTableForNodes(final List<Node> nodes) {
58 for (final Node node : nodes) {
59 initLockTableForNode(node);
60 }
61 }
62
63 public void initLockTableForNode(final Node node) {
64 initLockTable(LockAction.PULL, node.getNodeId());
65 initLockTable(LockAction.PUSH, node.getNodeId());
66 initLockTable(LockAction.HEARTBEAT, node.getNodeId());
67 }
68
69 public void initLockTable(final LockAction action, final String lockId) {
70 try {
71 jdbcTemplate.update(getSql("insertLockSql"), new Object[] { lockId, action.name() });
72 logger.debug("Inserted into the node_lock table for " + lockId + ".");
73 } catch (final DataIntegrityViolationException ex) {
74 logger.debug("Failed to insert to the node_lock table for " + lockId + ". Must be intialized already.");
75 }
76 }
77
78 public void clearAllLocks() {
79 jdbcTemplate.update(getSql("clearAllLocksSql"));
80 }
81
82 @Transactional(propagation = Propagation.REQUIRES_NEW)
83 public boolean lock(final LockAction action, final Node node) {
84 return lock(action, node.getNodeId());
85 }
86
87 @Transactional(propagation = Propagation.REQUIRES_NEW)
88 public boolean lock(final LockAction action) {
89 return lock(action, COMMON_LOCK_ID);
90 }
91
92 @Transactional(propagation = Propagation.REQUIRES_NEW)
93 public void unlock(final LockAction action) {
94 unlock(action, COMMON_LOCK_ID);
95 }
96
97 @Transactional(propagation = Propagation.REQUIRES_NEW)
98 public void unlock(final LockAction action, final Node node) {
99 unlock(action, node.getNodeId());
100 }
101
102 private boolean lock(final LockAction action, final String id) {
103 if (isClusteringEnabled(action)) {
104 final Date timeout = DateUtils.add(new Date(), Calendar.MILLISECOND, (int) -parameterService
105 .getLong(ParameterConstants.CLUSTER_LOCK_TIMEOUT_MS));
106 return jdbcTemplate.update(getSql("aquireLockSql"), new Object[] { getLockingServerId(), id, action.name(),
107 timeout }) == 1;
108 } else {
109 return true;
110 }
111 }
112
113 private String getLockingServerId() {
114 return AppUtils.getServerId();
115 }
116
117 private void unlock(final LockAction action, final String id) {
118 if (isClusteringEnabled(action)) {
119 jdbcTemplate.update(getSql("releaseLockSql"), new Object[] { id, action.name(), getLockingServerId() });
120 }
121 }
122
123 private boolean isClusteringEnabled(final LockAction action) {
124 switch (action) {
125 case PULL:
126 return parameterService.is(ParameterConstants.CLUSTER_LOCK_DURING_PULL);
127 case PUSH:
128 return parameterService.is(ParameterConstants.CLUSTER_LOCK_DURING_PUSH);
129 case PURGE_INCOMING:
130 case PURGE_OUTGOING:
131 case PURGE_STATISTICS:
132 return parameterService.is(ParameterConstants.CLUSTER_LOCK_DURING_PURGE);
133 case HEARTBEAT:
134 return parameterService.is(ParameterConstants.CLUSTER_LOCK_DURING_HEARTBEAT);
135 case SYNCTRIGGERS:
136 return parameterService.is(ParameterConstants.CLUSTER_LOCK_DURING_SYNC_TRIGGERS);
137 case OTHER:
138 return true;
139 default:
140 return false;
141 }
142 }
143
144 public void setNodeService(INodeService nodeService) {
145 this.nodeService = nodeService;
146 }
147
148 }