View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Eric Long <erilong@users.sourceforge.net>,
5    *               Chris Henson <chenson42@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.service.impl;
23  
24  import java.sql.ResultSet;
25  import java.sql.SQLException;
26  import java.sql.Types;
27  import java.util.List;
28  
29  import org.apache.commons.lang.StringUtils;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.jumpmind.symmetric.common.ParameterConstants;
33  import org.jumpmind.symmetric.db.IDbDialect;
34  import org.jumpmind.symmetric.model.IncomingBatch;
35  import org.jumpmind.symmetric.model.IncomingBatchHistory;
36  import org.jumpmind.symmetric.service.IIncomingBatchService;
37  import org.jumpmind.symmetric.util.MaxRowsStatementCreator;
38  import org.springframework.dao.DataIntegrityViolationException;
39  import org.springframework.dao.EmptyResultDataAccessException;
40  import org.springframework.jdbc.core.RowMapper;
41  
42  public class IncomingBatchService extends AbstractService implements IIncomingBatchService {
43  
44      private static final Log logger = LogFactory.getLog(IncomingBatchService.class);
45  
46      private IDbDialect dbDialect;
47  
48      public IncomingBatch findIncomingBatch(long batchId, String nodeId) {
49          try {
50              return (IncomingBatch) jdbcTemplate.queryForObject(getSql("findIncomingBatchSql"), new Object[] { batchId,
51                      nodeId }, new IncomingBatchMapper());
52          } catch (EmptyResultDataAccessException e) {
53              return null;
54          }
55      }
56  
57      @SuppressWarnings("unchecked")
58      public List<IncomingBatch> findIncomingBatchErrors(int maxRows) {
59          return (List<IncomingBatch>) jdbcTemplate.query(new MaxRowsStatementCreator(
60                  getSql("findIncomingBatchErrorsSql"), maxRows), new IncomingBatchMapper());
61      }
62  
63      @SuppressWarnings("unchecked")
64      public List<IncomingBatchHistory> findIncomingBatchHistory(long batchId, String nodeId) {
65          return (List<IncomingBatchHistory>) jdbcTemplate.query(getSql("findIncomingBatchHistorySql"), new Object[] {
66                  batchId, nodeId }, new IncomingBatchHistoryMapper());
67      }
68  
69      public boolean acquireIncomingBatch(final IncomingBatch status) {
70          Object savepoint = dbDialect.createSavepointForFallback();
71          boolean okayToProcess = true;
72          try {
73              insertIncomingBatch(status);
74              dbDialect.releaseSavepoint(savepoint);
75          } catch (DataIntegrityViolationException e) {
76              dbDialect.rollbackToSavepoint(savepoint);
77              status.setRetry(true);
78              okayToProcess = updateIncomingBatch(status) > 0
79                      || (!parameterService.is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED));
80              if (okayToProcess) {
81                  logger.warn("Retrying batch " + status.getNodeBatchId());
82              } else {
83                  logger.warn("Skipping batch " + status.getNodeBatchId());
84              }
85          }
86          return okayToProcess;
87      }
88  
89      public void insertIncomingBatch(IncomingBatch status) {
90          jdbcTemplate.update(getSql("insertIncomingBatchSql"), new Object[] { Long.valueOf(status.getBatchId()),
91                  status.getNodeId(), status.getStatus().toString() });
92      }
93  
94      public int updateIncomingBatch(IncomingBatch status) {
95          return jdbcTemplate.update(getSql("updateIncomingBatchSql"), new Object[] { status.getStatus().toString(),
96                  Long.valueOf(status.getBatchId()), status.getNodeId(), IncomingBatch.Status.ER.toString() });
97      }
98  
99      public void insertIncomingBatchHistory(IncomingBatchHistory history) {
100         jdbcTemplate.update(getSql("insertIncomingBatchHistorySql"), new Object[] { Long.valueOf(history.getBatchId()),
101                 history.getNodeId(), history.getStatus().toString(), history.getNetworkMillis(),
102                 history.getFilterMillis(), history.getDatabaseMillis(), history.getHostName(), history.getByteCount(),
103                 history.getStatementCount(), history.getFallbackInsertCount(), history.getFallbackUpdateCount(),
104                 history.getMissingDeleteCount(), history.getFailedRowNumber(), history.getStartTime(),
105                 history.getEndTime(), history.getSqlState(), history.getSqlCode(),
106                 StringUtils.abbreviate(history.getSqlMessage(), 50) }, new int[] { Types.INTEGER, Types.VARCHAR,
107                 Types.CHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.VARCHAR, Types.INTEGER, Types.INTEGER,
108                 Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP,
109                 Types.VARCHAR, Types.INTEGER, Types.VARCHAR });
110     }
111 
112     class IncomingBatchMapper implements RowMapper {
113         public Object mapRow(ResultSet rs, int num) throws SQLException {
114             IncomingBatch batch = new IncomingBatch();
115             batch.setBatchId(rs.getLong(1));
116             batch.setNodeId(rs.getString(2));
117             batch.setStatus(IncomingBatch.Status.valueOf(rs.getString(3)));
118             batch.setCreateTime(rs.getTimestamp(4));
119             return batch;
120         }
121     }
122 
123     class IncomingBatchHistoryMapper implements RowMapper {
124         public Object mapRow(ResultSet rs, int num) throws SQLException {
125             IncomingBatchHistory history = new IncomingBatchHistory();
126             history.setBatchId(rs.getLong(1));
127             history.setNodeId(rs.getString(2));
128             history.setStatus(IncomingBatchHistory.Status.valueOf(rs.getString(3)));
129             history.setStartTime(rs.getTime(4));
130             history.setEndTime(rs.getTime(5));
131             history.setFailedRowNumber(rs.getLong(6));
132             history.setByteCount(rs.getLong(7));
133             history.setNetworkMillis(rs.getLong(8));
134             history.setFilterMillis(rs.getLong(9));
135             history.setDatabaseMillis(rs.getLong(10));
136             history.setStatementCount(rs.getLong(11));
137             history.setFallbackInsertCount(rs.getLong(12));
138             history.setFallbackUpdateCount(rs.getLong(13));
139             history.setMissingDeleteCount(rs.getLong(14));
140             history.setSqlState(rs.getString(15));
141             history.setSqlCode(rs.getInt(16));
142             history.setSqlMessage(rs.getString(17));
143             return history;
144         }
145     }
146 
147     public void setDbDialect(IDbDialect dbDialect) {
148         this.dbDialect = dbDialect;
149     }
150 
151 }