View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.jumpmind.symmetric.service.impl;
22  
23  import java.sql.ResultSet;
24  import java.sql.SQLException;
25  import java.util.Calendar;
26  import java.util.List;
27  
28  import org.apache.commons.lang.time.DateUtils;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.jumpmind.symmetric.common.ParameterConstants;
32  import org.jumpmind.symmetric.service.IClusterService;
33  import org.jumpmind.symmetric.service.IPurgeService;
34  import org.jumpmind.symmetric.service.LockAction;
35  import org.springframework.jdbc.core.RowMapper;
36  
37  public class PurgeService extends AbstractService implements IPurgeService {
38  
39      private final static Log logger = LogFactory.getLog(PurgeService.class);
40  
41      private IClusterService clusterService;
42  
43      @SuppressWarnings("unchecked")
44      public void purge() {
45          Calendar retentionCutoff = Calendar.getInstance();
46          retentionCutoff.add(Calendar.MINUTE, -parameterService
47                  .getInt(ParameterConstants.PURGE_RETENTION_MINUTES));
48          purgeOutgoing(retentionCutoff);
49          purgeIncoming(retentionCutoff);
50          purgeStatistic(retentionCutoff);
51      }
52  
53      private void purgeStatistic(Calendar retentionCutoff) {
54          try {
55              if (clusterService.lock(LockAction.PURGE_STATISTICS)) {
56                  try {
57                      logger.info("The statistic purge process is about to run.");
58                      int count = jdbcTemplate.update(getSql("deleteFromStatisticSql"),
59                              new Object[] { retentionCutoff.getTime() });
60                      logger.info("Purged " + count + " statistic rows.");
61                  } finally {
62                      clusterService.unlock(LockAction.PURGE_STATISTICS);
63                      logger.info("The statistic purge process has completed.");
64                  }
65  
66              } else {
67                  logger.info("Could not get a lock to run an statistic purge.");
68              }
69          } catch (Exception ex) {
70              logger.error(ex, ex);
71          }
72      }
73  
74      private void purgeOutgoing(Calendar retentionCutoff) {
75          try {
76              if (clusterService.lock(LockAction.PURGE_OUTGOING)) {
77                  try {
78                      logger.info("The outgoing purge process is about to run.");
79  
80                      purgeOutgoingBatch(retentionCutoff);
81                      purgeDataRows(retentionCutoff);
82                  } finally {
83                      clusterService.unlock(LockAction.PURGE_OUTGOING);
84                      logger.info("The outgoing purge process has completed.");
85                  }
86              } else {
87                  logger.info("Could not get a lock to run an outgoing purge.");
88              }
89          } catch (Exception ex) {
90              logger.error(ex, ex);
91          }
92      }
93  
94      private void purgeOutgoingBatch(final Calendar time) {
95          logger.info("Getting range for outgoing batch");
96          int[] minMax = queryForMinMax(getSql("selectOutgoingBatchRangeSql"), new Object[] { time.getTime() });
97          int maxNumOfDataIdsToPurgeInTx = parameterService
98                  .getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
99          int maxNumOfBatchIdsToPurgeInTx = parameterService
100                 .getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_BATCH_IDS);
101         purgeByMinMax(minMax, getSql("deleteDataEventSql"), true, maxNumOfBatchIdsToPurgeInTx);
102         purgeByMinMax(minMax, getSql("deleteOutgoingBatchSql"), false, maxNumOfDataIdsToPurgeInTx);
103         purgeByMinMax(minMax, getSql("deleteOutgoingBatchHistSql"), true, maxNumOfBatchIdsToPurgeInTx);
104     }
105 
106     private void purgeDataRows(final Calendar time) {
107         logger.info("Getting range for data");
108         int[] minMax = queryForMinMax(getSql("selectDataRangeSql"), new Object[] { time.getTime() });
109         int maxNumOfDataIdsToPurgeInTx = parameterService
110                 .getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
111         purgeByMinMax(minMax, getSql("deleteDataSql"), true, maxNumOfDataIdsToPurgeInTx);
112     }
113 
114     private int[] queryForMinMax(String sql, Object[] params) {
115         int[] minMax = (int[]) jdbcTemplate.queryForObject(sql, params, new RowMapper() {
116             public Object mapRow(ResultSet rs, int row) throws SQLException {
117                 return new int[] { rs.getInt(1), rs.getInt(2) };
118             }
119         });
120         return minMax;
121     }
122 
123     private void purgeByMinMax(int[] minMax, String deleteSql, boolean useRangeTwice, int maxNumtoPurgeinTx) {
124         int minId = minMax[0];
125         int purgeUpToId = minMax[1];
126         long ts = System.currentTimeMillis();
127         int totalCount = 0;
128         String tableName = deleteSql.trim().split("//s")[2];
129         logger.info("About to purge " + tableName);
130 
131         while (minId <= purgeUpToId) {
132             int maxId = minId + maxNumtoPurgeinTx;
133             if (maxId > purgeUpToId) {
134                 maxId = purgeUpToId;
135             }
136 
137             Object[] param = null;
138             if (useRangeTwice) {
139                 param = new Object[] { minId, maxId, minId, maxId };
140             } else {
141                 param = new Object[] { minId, maxId };
142             }
143 
144             totalCount += jdbcTemplate.update(deleteSql, param);
145 
146             if (totalCount > 0 && (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5)) {
147                 logger.info("Purged " + totalCount + " of " + tableName + " rows so far.");
148                 ts = System.currentTimeMillis();
149             }
150             minId = maxId + 1;
151         }
152         logger.info("Done purging " + totalCount + " of " + tableName + " rows.");
153     }
154 
155     private void purgeIncoming(Calendar retentionCutoff) {
156         try {
157             if (clusterService.lock(LockAction.PURGE_INCOMING)) {
158                 try {
159                     logger.info("The incoming purge process is about to run.");
160 
161                     purgeIncomingBatch(retentionCutoff);
162                 } finally {
163                     clusterService.unlock(LockAction.PURGE_INCOMING);
164                     logger.info("The incoming purge process has completed.");
165                 }
166             } else {
167                 logger.info("Could not get a lock to run an incoming purge.");
168             }
169         } catch (Exception ex) {
170             logger.error(ex, ex);
171         }
172     }
173 
174     @SuppressWarnings( { "unchecked" })
175     private void purgeIncomingBatch(final Calendar time) {
176         logger.info("Getting range for incoming batch");
177         List<NodeBatchRange> nodeBatchRangeList = jdbcTemplate.query(getSql("selectIncomingBatchRangeSql"),
178                 new Object[] { time.getTime() }, new RowMapper() {
179                     public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
180                         return new NodeBatchRange(rs.getString(1), rs.getInt(2), rs.getInt(3));
181                     }
182                 });
183         purgeByNodeBatchRangeList(getSql("deleteIncomingBatchSql"), nodeBatchRangeList);
184         purgeByNodeBatchRangeList(getSql("deleteIncomingBatchHistSql"), nodeBatchRangeList);
185     }
186 
187     private void purgeByNodeBatchRangeList(String deleteSql, List<NodeBatchRange> nodeBatchRangeList) {
188         long ts = System.currentTimeMillis();
189         int totalCount = 0;
190         String tableName = deleteSql.trim().split("//s")[2];
191         logger.info("About to purge " + tableName);
192 
193         for (NodeBatchRange nodeBatchRange : nodeBatchRangeList) {
194             totalCount += purgeByNodeBatchRange(deleteSql, nodeBatchRange);
195             if (totalCount > 0 && (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5)) {
196                 logger.info("Purged " + totalCount + " of " + tableName + " rows so far.");
197                 ts = System.currentTimeMillis();
198             }
199         }
200         logger.info("Done purging " + totalCount + " of " + tableName + " rows.");
201     }
202 
203     private int purgeByNodeBatchRange(String deleteSql, NodeBatchRange nodeBatchRange) {
204         int maxNumOfDataIdsToPurgeInTx = parameterService
205                 .getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
206         int minBatchId = nodeBatchRange.getMinBatchId();
207         int purgeUpToBatchId = nodeBatchRange.getMaxBatchId();
208         int totalCount = 0;
209 
210         while (minBatchId <= purgeUpToBatchId) {
211             int maxBatchId = minBatchId + maxNumOfDataIdsToPurgeInTx;
212             if (maxBatchId > purgeUpToBatchId) {
213                 maxBatchId = purgeUpToBatchId;
214             }
215             totalCount += jdbcTemplate.update(deleteSql, new Object[] { minBatchId, maxBatchId,
216                     nodeBatchRange.getNodeId() });
217             minBatchId = maxBatchId + 1;
218         }
219 
220         return totalCount;
221     }
222 
223     class NodeBatchRange {
224         private String nodeId;
225 
226         private int minBatchId;
227 
228         private int maxBatchId;
229 
230         public NodeBatchRange(String nodeId, int minBatchId, int maxBatchId) {
231             this.nodeId = nodeId;
232             this.minBatchId = minBatchId;
233             this.maxBatchId = maxBatchId;
234         }
235 
236         public String getNodeId() {
237             return nodeId;
238         }
239 
240         public int getMaxBatchId() {
241             return maxBatchId;
242         }
243 
244         public int getMinBatchId() {
245             return minBatchId;
246         }
247     }
248 
249     public void purgeAllIncomingEventsForNode(String nodeId) {
250         int count = jdbcTemplate.update(getSql("deleteIncomingBatchByNodeSql"), new Object[] { nodeId });
251         logger.info("Purged all " + count + " incoming batch for node " + nodeId);
252         count = jdbcTemplate.update(getSql("deleteIncomingBatchHistByNodeSql"), new Object[] { nodeId });
253         logger.info("Purged all " + count + " incoming batch hist for node " + nodeId);
254     }
255     
256     public void purgeAllOutgoingEventsForNode(String nodeId) {
257         long ts = System.currentTimeMillis();
258         int count = jdbcTemplate.update(getSql("deleteOutgoingEventsByNodeSql"), new Object[] {nodeId});
259         logger.info("Deleted " + count + " data events for node " + nodeId + " in " + (System.currentTimeMillis()-ts) + "ms.");
260     }
261 
262     public void setClusterService(IClusterService clusterService) {
263         this.clusterService = clusterService;
264     }
265 
266 }