1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.service.impl;
22
23 import java.sql.ResultSet;
24 import java.sql.SQLException;
25 import java.util.Calendar;
26 import java.util.List;
27
28 import org.apache.commons.lang.time.DateUtils;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.jumpmind.symmetric.common.ParameterConstants;
32 import org.jumpmind.symmetric.service.IClusterService;
33 import org.jumpmind.symmetric.service.IPurgeService;
34 import org.jumpmind.symmetric.service.LockAction;
35 import org.springframework.jdbc.core.RowMapper;
36
37 public class PurgeService extends AbstractService implements IPurgeService {
38
39 private final static Log logger = LogFactory.getLog(PurgeService.class);
40
41 private IClusterService clusterService;
42
43 @SuppressWarnings("unchecked")
44 public void purge() {
45 Calendar retentionCutoff = Calendar.getInstance();
46 retentionCutoff.add(Calendar.MINUTE, -parameterService
47 .getInt(ParameterConstants.PURGE_RETENTION_MINUTES));
48 purgeOutgoing(retentionCutoff);
49 purgeIncoming(retentionCutoff);
50 purgeStatistic(retentionCutoff);
51 }
52
53 private void purgeStatistic(Calendar retentionCutoff) {
54 try {
55 if (clusterService.lock(LockAction.PURGE_STATISTICS)) {
56 try {
57 logger.info("The statistic purge process is about to run.");
58 int count = jdbcTemplate.update(getSql("deleteFromStatisticSql"),
59 new Object[] { retentionCutoff.getTime() });
60 logger.info("Purged " + count + " statistic rows.");
61 } finally {
62 clusterService.unlock(LockAction.PURGE_STATISTICS);
63 logger.info("The statistic purge process has completed.");
64 }
65
66 } else {
67 logger.info("Could not get a lock to run an statistic purge.");
68 }
69 } catch (Exception ex) {
70 logger.error(ex, ex);
71 }
72 }
73
74 private void purgeOutgoing(Calendar retentionCutoff) {
75 try {
76 if (clusterService.lock(LockAction.PURGE_OUTGOING)) {
77 try {
78 logger.info("The outgoing purge process is about to run.");
79
80 purgeOutgoingBatch(retentionCutoff);
81 purgeDataRows(retentionCutoff);
82 } finally {
83 clusterService.unlock(LockAction.PURGE_OUTGOING);
84 logger.info("The outgoing purge process has completed.");
85 }
86 } else {
87 logger.info("Could not get a lock to run an outgoing purge.");
88 }
89 } catch (Exception ex) {
90 logger.error(ex, ex);
91 }
92 }
93
94 private void purgeOutgoingBatch(final Calendar time) {
95 logger.info("Getting range for outgoing batch");
96 int[] minMax = queryForMinMax(getSql("selectOutgoingBatchRangeSql"), new Object[] { time.getTime() });
97 int maxNumOfDataIdsToPurgeInTx = parameterService
98 .getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
99 int maxNumOfBatchIdsToPurgeInTx = parameterService
100 .getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_BATCH_IDS);
101 purgeByMinMax(minMax, getSql("deleteDataEventSql"), true, maxNumOfBatchIdsToPurgeInTx);
102 purgeByMinMax(minMax, getSql("deleteOutgoingBatchSql"), false, maxNumOfDataIdsToPurgeInTx);
103 purgeByMinMax(minMax, getSql("deleteOutgoingBatchHistSql"), true, maxNumOfBatchIdsToPurgeInTx);
104 }
105
106 private void purgeDataRows(final Calendar time) {
107 logger.info("Getting range for data");
108 int[] minMax = queryForMinMax(getSql("selectDataRangeSql"), new Object[] { time.getTime() });
109 int maxNumOfDataIdsToPurgeInTx = parameterService
110 .getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
111 purgeByMinMax(minMax, getSql("deleteDataSql"), true, maxNumOfDataIdsToPurgeInTx);
112 }
113
114 private int[] queryForMinMax(String sql, Object[] params) {
115 int[] minMax = (int[]) jdbcTemplate.queryForObject(sql, params, new RowMapper() {
116 public Object mapRow(ResultSet rs, int row) throws SQLException {
117 return new int[] { rs.getInt(1), rs.getInt(2) };
118 }
119 });
120 return minMax;
121 }
122
123 private void purgeByMinMax(int[] minMax, String deleteSql, boolean useRangeTwice, int maxNumtoPurgeinTx) {
124 int minId = minMax[0];
125 int purgeUpToId = minMax[1];
126 long ts = System.currentTimeMillis();
127 int totalCount = 0;
128 String tableName = deleteSql.trim().split("//s")[2];
129 logger.info("About to purge " + tableName);
130
131 while (minId <= purgeUpToId) {
132 int maxId = minId + maxNumtoPurgeinTx;
133 if (maxId > purgeUpToId) {
134 maxId = purgeUpToId;
135 }
136
137 Object[] param = null;
138 if (useRangeTwice) {
139 param = new Object[] { minId, maxId, minId, maxId };
140 } else {
141 param = new Object[] { minId, maxId };
142 }
143
144 totalCount += jdbcTemplate.update(deleteSql, param);
145
146 if (totalCount > 0 && (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5)) {
147 logger.info("Purged " + totalCount + " of " + tableName + " rows so far.");
148 ts = System.currentTimeMillis();
149 }
150 minId = maxId + 1;
151 }
152 logger.info("Done purging " + totalCount + " of " + tableName + " rows.");
153 }
154
155 private void purgeIncoming(Calendar retentionCutoff) {
156 try {
157 if (clusterService.lock(LockAction.PURGE_INCOMING)) {
158 try {
159 logger.info("The incoming purge process is about to run.");
160
161 purgeIncomingBatch(retentionCutoff);
162 } finally {
163 clusterService.unlock(LockAction.PURGE_INCOMING);
164 logger.info("The incoming purge process has completed.");
165 }
166 } else {
167 logger.info("Could not get a lock to run an incoming purge.");
168 }
169 } catch (Exception ex) {
170 logger.error(ex, ex);
171 }
172 }
173
174 @SuppressWarnings( { "unchecked" })
175 private void purgeIncomingBatch(final Calendar time) {
176 logger.info("Getting range for incoming batch");
177 List<NodeBatchRange> nodeBatchRangeList = jdbcTemplate.query(getSql("selectIncomingBatchRangeSql"),
178 new Object[] { time.getTime() }, new RowMapper() {
179 public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
180 return new NodeBatchRange(rs.getString(1), rs.getInt(2), rs.getInt(3));
181 }
182 });
183 purgeByNodeBatchRangeList(getSql("deleteIncomingBatchSql"), nodeBatchRangeList);
184 purgeByNodeBatchRangeList(getSql("deleteIncomingBatchHistSql"), nodeBatchRangeList);
185 }
186
187 private void purgeByNodeBatchRangeList(String deleteSql, List<NodeBatchRange> nodeBatchRangeList) {
188 long ts = System.currentTimeMillis();
189 int totalCount = 0;
190 String tableName = deleteSql.trim().split("//s")[2];
191 logger.info("About to purge " + tableName);
192
193 for (NodeBatchRange nodeBatchRange : nodeBatchRangeList) {
194 totalCount += purgeByNodeBatchRange(deleteSql, nodeBatchRange);
195 if (totalCount > 0 && (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5)) {
196 logger.info("Purged " + totalCount + " of " + tableName + " rows so far.");
197 ts = System.currentTimeMillis();
198 }
199 }
200 logger.info("Done purging " + totalCount + " of " + tableName + " rows.");
201 }
202
203 private int purgeByNodeBatchRange(String deleteSql, NodeBatchRange nodeBatchRange) {
204 int maxNumOfDataIdsToPurgeInTx = parameterService
205 .getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
206 int minBatchId = nodeBatchRange.getMinBatchId();
207 int purgeUpToBatchId = nodeBatchRange.getMaxBatchId();
208 int totalCount = 0;
209
210 while (minBatchId <= purgeUpToBatchId) {
211 int maxBatchId = minBatchId + maxNumOfDataIdsToPurgeInTx;
212 if (maxBatchId > purgeUpToBatchId) {
213 maxBatchId = purgeUpToBatchId;
214 }
215 totalCount += jdbcTemplate.update(deleteSql, new Object[] { minBatchId, maxBatchId,
216 nodeBatchRange.getNodeId() });
217 minBatchId = maxBatchId + 1;
218 }
219
220 return totalCount;
221 }
222
223 class NodeBatchRange {
224 private String nodeId;
225
226 private int minBatchId;
227
228 private int maxBatchId;
229
230 public NodeBatchRange(String nodeId, int minBatchId, int maxBatchId) {
231 this.nodeId = nodeId;
232 this.minBatchId = minBatchId;
233 this.maxBatchId = maxBatchId;
234 }
235
236 public String getNodeId() {
237 return nodeId;
238 }
239
240 public int getMaxBatchId() {
241 return maxBatchId;
242 }
243
244 public int getMinBatchId() {
245 return minBatchId;
246 }
247 }
248
249 public void purgeAllIncomingEventsForNode(String nodeId) {
250 int count = jdbcTemplate.update(getSql("deleteIncomingBatchByNodeSql"), new Object[] { nodeId });
251 logger.info("Purged all " + count + " incoming batch for node " + nodeId);
252 count = jdbcTemplate.update(getSql("deleteIncomingBatchHistByNodeSql"), new Object[] { nodeId });
253 logger.info("Purged all " + count + " incoming batch hist for node " + nodeId);
254 }
255
256 public void purgeAllOutgoingEventsForNode(String nodeId) {
257 long ts = System.currentTimeMillis();
258 int count = jdbcTemplate.update(getSql("deleteOutgoingEventsByNodeSql"), new Object[] {nodeId});
259 logger.info("Deleted " + count + " data events for node " + nodeId + " in " + (System.currentTimeMillis()-ts) + "ms.");
260 }
261
262 public void setClusterService(IClusterService clusterService) {
263 this.clusterService = clusterService;
264 }
265
266 }