View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.jumpmind.symmetric.service.impl;
22  
23  import java.util.Calendar;
24  import java.util.Date;
25  import java.util.List;
26  
27  import org.apache.commons.lang.time.DateUtils;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.jumpmind.symmetric.common.ParameterConstants;
31  import org.jumpmind.symmetric.model.Node;
32  import org.jumpmind.symmetric.service.IClusterService;
33  import org.jumpmind.symmetric.service.INodeService;
34  import org.jumpmind.symmetric.service.LockAction;
35  import org.jumpmind.symmetric.util.AppUtils;
36  import org.springframework.dao.DataIntegrityViolationException;
37  import org.springframework.transaction.annotation.Propagation;
38  import org.springframework.transaction.annotation.Transactional;
39  
40  public class ClusterService extends AbstractService implements IClusterService {
41  
42      protected static final Log logger = LogFactory.getLog(ClusterService.class);
43  
44      protected static final String COMMON_LOCK_ID = "common";
45  
46      private INodeService nodeService;
47  
48      public void initLockTable() {
49          initLockTableForNodes(nodeService.findNodesToPull());
50          initLockTableForNodes(nodeService.findNodesToPushTo());
51          initLockTable(LockAction.PURGE_INCOMING, COMMON_LOCK_ID);
52          initLockTable(LockAction.PURGE_OUTGOING, COMMON_LOCK_ID);
53          initLockTable(LockAction.PURGE_STATISTICS, COMMON_LOCK_ID);
54          initLockTable(LockAction.SYNCTRIGGERS, COMMON_LOCK_ID);
55      }
56  
57      private void initLockTableForNodes(final List<Node> nodes) {
58          for (final Node node : nodes) {
59              initLockTableForNode(node);
60          }
61      }
62  
63      public void initLockTableForNode(final Node node) {
64          initLockTable(LockAction.PULL, node.getNodeId());
65          initLockTable(LockAction.PUSH, node.getNodeId());
66          initLockTable(LockAction.HEARTBEAT, node.getNodeId());
67      }
68  
69      public void initLockTable(final LockAction action, final String lockId) {
70          try {
71              jdbcTemplate.update(getSql("insertLockSql"), new Object[] { lockId, action.name() });
72              logger.debug("Inserted into the node_lock table for " + lockId + ".");
73          } catch (final DataIntegrityViolationException ex) {
74              logger.debug("Failed to insert to the node_lock table for " + lockId + ".  Must be intialized already.");
75          }
76      }
77  
78      public void clearAllLocks() {
79          jdbcTemplate.update(getSql("clearAllLocksSql"));
80      }
81  
82      @Transactional(propagation = Propagation.REQUIRES_NEW)
83      public boolean lock(final LockAction action, final Node node) {
84          return lock(action, node.getNodeId());
85      }
86  
87      @Transactional(propagation = Propagation.REQUIRES_NEW)
88      public boolean lock(final LockAction action) {
89          return lock(action, COMMON_LOCK_ID);
90      }
91  
92      @Transactional(propagation = Propagation.REQUIRES_NEW)
93      public void unlock(final LockAction action) {
94          unlock(action, COMMON_LOCK_ID);
95      }
96  
97      @Transactional(propagation = Propagation.REQUIRES_NEW)
98      public void unlock(final LockAction action, final Node node) {
99          unlock(action, node.getNodeId());
100     }
101 
102     private boolean lock(final LockAction action, final String id) {
103         if (isClusteringEnabled(action)) {
104             final Date timeout = DateUtils.add(new Date(), Calendar.MILLISECOND, (int) -parameterService
105                     .getLong(ParameterConstants.CLUSTER_LOCK_TIMEOUT_MS));
106             return jdbcTemplate.update(getSql("aquireLockSql"), new Object[] { getLockingServerId(), id, action.name(),
107                     timeout }) == 1;
108         } else {
109             return true;
110         }
111     }
112 
113     private String getLockingServerId() {
114         return AppUtils.getServerId();
115     }
116 
117     private void unlock(final LockAction action, final String id) {
118         if (isClusteringEnabled(action)) {
119             jdbcTemplate.update(getSql("releaseLockSql"), new Object[] { id, action.name(), getLockingServerId() });
120         }
121     }
122 
123     private boolean isClusteringEnabled(final LockAction action) {
124         switch (action) {
125         case PULL:
126             return parameterService.is(ParameterConstants.CLUSTER_LOCK_DURING_PULL);
127         case PUSH:
128             return parameterService.is(ParameterConstants.CLUSTER_LOCK_DURING_PUSH);
129         case PURGE_INCOMING:
130         case PURGE_OUTGOING:
131         case PURGE_STATISTICS:
132             return parameterService.is(ParameterConstants.CLUSTER_LOCK_DURING_PURGE);
133         case HEARTBEAT:
134             return parameterService.is(ParameterConstants.CLUSTER_LOCK_DURING_HEARTBEAT);
135         case SYNCTRIGGERS:
136             return parameterService.is(ParameterConstants.CLUSTER_LOCK_DURING_SYNC_TRIGGERS);
137         case OTHER:
138             return true;
139         default:
140             return false;
141         }
142     }
143 
144     public void setNodeService(INodeService nodeService) {
145         this.nodeService = nodeService;
146     }
147 
148 }