1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.service.impl;
23
24 import java.sql.ResultSet;
25 import java.sql.SQLException;
26 import java.sql.Types;
27 import java.util.List;
28
29 import org.apache.commons.lang.StringUtils;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.jumpmind.symmetric.common.ParameterConstants;
33 import org.jumpmind.symmetric.db.IDbDialect;
34 import org.jumpmind.symmetric.model.IncomingBatch;
35 import org.jumpmind.symmetric.model.IncomingBatchHistory;
36 import org.jumpmind.symmetric.service.IIncomingBatchService;
37 import org.jumpmind.symmetric.util.MaxRowsStatementCreator;
38 import org.springframework.dao.DataIntegrityViolationException;
39 import org.springframework.dao.EmptyResultDataAccessException;
40 import org.springframework.jdbc.core.RowMapper;
41
42 public class IncomingBatchService extends AbstractService implements IIncomingBatchService {
43
44 private static final Log logger = LogFactory.getLog(IncomingBatchService.class);
45
46 private IDbDialect dbDialect;
47
48 public IncomingBatch findIncomingBatch(long batchId, String nodeId) {
49 try {
50 return (IncomingBatch) jdbcTemplate.queryForObject(getSql("findIncomingBatchSql"), new Object[] { batchId,
51 nodeId }, new IncomingBatchMapper());
52 } catch (EmptyResultDataAccessException e) {
53 return null;
54 }
55 }
56
57 @SuppressWarnings("unchecked")
58 public List<IncomingBatch> findIncomingBatchErrors(int maxRows) {
59 return (List<IncomingBatch>) jdbcTemplate.query(new MaxRowsStatementCreator(
60 getSql("findIncomingBatchErrorsSql"), maxRows), new IncomingBatchMapper());
61 }
62
63 @SuppressWarnings("unchecked")
64 public List<IncomingBatchHistory> findIncomingBatchHistory(long batchId, String nodeId) {
65 return (List<IncomingBatchHistory>) jdbcTemplate.query(getSql("findIncomingBatchHistorySql"), new Object[] {
66 batchId, nodeId }, new IncomingBatchHistoryMapper());
67 }
68
69 public boolean acquireIncomingBatch(final IncomingBatch status) {
70 Object savepoint = dbDialect.createSavepointForFallback();
71 boolean okayToProcess = true;
72 try {
73 insertIncomingBatch(status);
74 dbDialect.releaseSavepoint(savepoint);
75 } catch (DataIntegrityViolationException e) {
76 dbDialect.rollbackToSavepoint(savepoint);
77 status.setRetry(true);
78 okayToProcess = updateIncomingBatch(status) > 0
79 || (!parameterService.is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED));
80 if (okayToProcess) {
81 logger.warn("Retrying batch " + status.getNodeBatchId());
82 } else {
83 logger.warn("Skipping batch " + status.getNodeBatchId());
84 }
85 }
86 return okayToProcess;
87 }
88
89 public void insertIncomingBatch(IncomingBatch status) {
90 jdbcTemplate.update(getSql("insertIncomingBatchSql"), new Object[] { Long.valueOf(status.getBatchId()),
91 status.getNodeId(), status.getStatus().toString() });
92 }
93
94 public int updateIncomingBatch(IncomingBatch status) {
95 return jdbcTemplate.update(getSql("updateIncomingBatchSql"), new Object[] { status.getStatus().toString(),
96 Long.valueOf(status.getBatchId()), status.getNodeId(), IncomingBatch.Status.ER.toString() });
97 }
98
99 public void insertIncomingBatchHistory(IncomingBatchHistory history) {
100 jdbcTemplate.update(getSql("insertIncomingBatchHistorySql"), new Object[] { Long.valueOf(history.getBatchId()),
101 history.getNodeId(), history.getStatus().toString(), history.getNetworkMillis(),
102 history.getFilterMillis(), history.getDatabaseMillis(), history.getHostName(), history.getByteCount(),
103 history.getStatementCount(), history.getFallbackInsertCount(), history.getFallbackUpdateCount(),
104 history.getMissingDeleteCount(), history.getFailedRowNumber(), history.getStartTime(),
105 history.getEndTime(), history.getSqlState(), history.getSqlCode(),
106 StringUtils.abbreviate(history.getSqlMessage(), 50) }, new int[] { Types.INTEGER, Types.VARCHAR,
107 Types.CHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.VARCHAR, Types.INTEGER, Types.INTEGER,
108 Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP,
109 Types.VARCHAR, Types.INTEGER, Types.VARCHAR });
110 }
111
112 class IncomingBatchMapper implements RowMapper {
113 public Object mapRow(ResultSet rs, int num) throws SQLException {
114 IncomingBatch batch = new IncomingBatch();
115 batch.setBatchId(rs.getLong(1));
116 batch.setNodeId(rs.getString(2));
117 batch.setStatus(IncomingBatch.Status.valueOf(rs.getString(3)));
118 batch.setCreateTime(rs.getTimestamp(4));
119 return batch;
120 }
121 }
122
123 class IncomingBatchHistoryMapper implements RowMapper {
124 public Object mapRow(ResultSet rs, int num) throws SQLException {
125 IncomingBatchHistory history = new IncomingBatchHistory();
126 history.setBatchId(rs.getLong(1));
127 history.setNodeId(rs.getString(2));
128 history.setStatus(IncomingBatchHistory.Status.valueOf(rs.getString(3)));
129 history.setStartTime(rs.getTime(4));
130 history.setEndTime(rs.getTime(5));
131 history.setFailedRowNumber(rs.getLong(6));
132 history.setByteCount(rs.getLong(7));
133 history.setNetworkMillis(rs.getLong(8));
134 history.setFilterMillis(rs.getLong(9));
135 history.setDatabaseMillis(rs.getLong(10));
136 history.setStatementCount(rs.getLong(11));
137 history.setFallbackInsertCount(rs.getLong(12));
138 history.setFallbackUpdateCount(rs.getLong(13));
139 history.setMissingDeleteCount(rs.getLong(14));
140 history.setSqlState(rs.getString(15));
141 history.setSqlCode(rs.getInt(16));
142 history.setSqlMessage(rs.getString(17));
143 return history;
144 }
145 }
146
147 public void setDbDialect(IDbDialect dbDialect) {
148 this.dbDialect = dbDialect;
149 }
150
151 }