View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *               Eric Long <erilong@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.service.impl;
23  
24  import java.sql.ResultSet;
25  import java.sql.SQLException;
26  import java.util.ArrayList;
27  import java.util.Date;
28  import java.util.List;
29  
30  import org.jumpmind.symmetric.common.Constants;
31  import org.jumpmind.symmetric.model.BatchInfo;
32  import org.jumpmind.symmetric.model.Data;
33  import org.jumpmind.symmetric.model.DataEvent;
34  import org.jumpmind.symmetric.model.DataEventType;
35  import org.jumpmind.symmetric.model.OutgoingBatch;
36  import org.jumpmind.symmetric.model.OutgoingBatchHistory;
37  import org.jumpmind.symmetric.model.TriggerHistory;
38  import org.jumpmind.symmetric.service.IAcknowledgeService;
39  import org.jumpmind.symmetric.service.IDataService;
40  import org.jumpmind.symmetric.service.IOutgoingBatchService;
41  import org.jumpmind.symmetric.test.AbstractDatabaseTest;
42  import org.jumpmind.symmetric.test.TestConstants;
43  import org.junit.Assert;
44  import org.junit.Before;
45  import org.junit.Test;
46  import org.springframework.jdbc.core.RowMapper;
47  
48  
49  public class AcknowledgeServiceTest extends AbstractDatabaseTest {
50  
51      protected IAcknowledgeService ackService;
52  
53      protected IOutgoingBatchService outgoingBatchService;
54  
55      protected IDataService dataService;
56  
57      public AcknowledgeServiceTest() throws Exception {
58          super();
59      }
60  
61      public AcknowledgeServiceTest(String dbName) {
62          super(dbName);
63      }
64  
65      @Before
66      public void setUp() {
67          ackService = (IAcknowledgeService)find(Constants.ACKNOWLEDGE_SERVICE);
68          outgoingBatchService = (IOutgoingBatchService)find(Constants.OUTGOING_BATCH_SERVICE);
69          dataService = (IDataService)find(Constants.DATA_SERVICE);
70      }
71  
72      @Test
73      public void okTest() {
74          cleanSlate();
75          ackService.ack(new BatchInfo(1));
76  
77          List<OutgoingBatchHistory> history = getOutgoingBatchHistory(1);
78          Assert.assertEquals(history.size(), 1);
79          OutgoingBatchHistory hist = history.get(0);
80          Assert.assertEquals(hist.getBatchId(), 1);
81          Assert.assertEquals(hist.getStatus(), OutgoingBatchHistory.Status.OK);
82      }
83  
84      private void cleanSlate() {
85          cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
86                  TestConstants.TEST_PREFIX + "outgoing_batch_hist", TestConstants.TEST_PREFIX + "outgoing_batch");
87      }
88  
89      @Test
90      public void unspecifiedErrorTest() {
91          cleanSlate();
92          OutgoingBatch batch = createOutgoingBatch();
93          createDataEvents(batch, 5);
94          errorTestCore(batch.getBatchId(), -1, -1);
95      }
96  
97      @Test
98      public void errorTest() {
99          cleanSlate();
100         OutgoingBatch batch = createOutgoingBatch();
101         long dataId[] = createDataEvents(batch, 5);
102         errorTestCore(batch.getBatchId(), 3, dataId[2]);
103     }
104 
105     @Test
106     public void errorTestBoundary1() {
107         cleanSlate();
108         OutgoingBatch batch = createOutgoingBatch();
109         long dataId[] = createDataEvents(batch, 5);
110         errorTestCore(batch.getBatchId(), 1, dataId[0]);
111     }
112 
113     @Test
114     public void errorTestBoundary2() {
115         cleanSlate();
116         OutgoingBatch batch = createOutgoingBatch();
117         long dataId[] = createDataEvents(batch, 5);
118         errorTestCore(batch.getBatchId(), 5, dataId[dataId.length - 1]);
119     }
120 
121     @Test
122     public void errorErrorTest() {
123         cleanSlate();
124         OutgoingBatch batch = createOutgoingBatch();
125         createDataEvents(batch, 5);
126         errorTestCore(batch.getBatchId(), 7, -1);
127     }
128 
129     protected void errorTestCore(long batchId, int errorLine, long expectedResults) {
130         ackService.ack(new BatchInfo(batchId, errorLine));
131         List<OutgoingBatchHistory> history = getOutgoingBatchHistory(batchId);
132         Assert.assertEquals(history.size(), 1);
133         OutgoingBatchHistory hist = history.get(0);
134         Assert.assertEquals(hist.getBatchId(), batchId);
135         Assert.assertEquals(hist.getStatus(), OutgoingBatchHistory.Status.ER);
136         Assert.assertEquals(hist.getFailedDataId(), expectedResults);
137     }
138 
139     @SuppressWarnings("unchecked")
140     protected List<OutgoingBatchHistory> getOutgoingBatchHistory(long batchId) {
141         final String sql = "select batch_id, status, data_event_count, start_time, " + "failed_data_id from "
142                 + TestConstants.TEST_PREFIX + "outgoing_batch_hist where batch_id = ?";
143         final List<OutgoingBatchHistory> list = new ArrayList<OutgoingBatchHistory>();
144         getJdbcTemplate().query(sql, new Object[] { batchId }, new RowMapper() {
145             public Object[] mapRow(ResultSet rs, int row) throws SQLException {
146                 OutgoingBatchHistory item = new OutgoingBatchHistory();
147                 item.setBatchId(rs.getLong(1));
148                 item.setStatus(OutgoingBatchHistory.Status.valueOf(rs.getString(2)));
149                 item.setDataEventCount(rs.getLong(3));
150                 item.setStartTime(rs.getTimestamp(4));
151                 item.setFailedDataId(rs.getLong(5));
152                 list.add(item);
153                 return null;
154             }
155         });
156         return list;
157     }
158 
159     protected OutgoingBatch createOutgoingBatch() {
160         OutgoingBatch batch = new OutgoingBatch();
161         batch.setNodeId(TestConstants.TEST_CLIENT_EXTERNAL_ID);
162         batch.setChannelId(TestConstants.TEST_CHANNEL_ID);
163         batch.setBatchType("EV");
164         batch.setStatus("SE");
165         batch.setCreateTime(new Date());
166         outgoingBatchService.insertOutgoingBatch(batch);
167         return batch;
168     }
169 
170     protected long[] createDataEvents(OutgoingBatch batch, int size) {
171         TriggerHistory audit = new TriggerHistory();
172         audit.setTriggerHistoryId(TestConstants.TEST_AUDIT_ID);
173         final long[] id = new long[size];
174         for (int i = 0; i < size; i++) {
175             Data data = new Data("table1", DataEventType.INSERT, "some data", "some data", audit);
176             id[i] = dataService.insertData(data);
177             DataEvent dataEvent = new DataEvent(id[i], TestConstants.TEST_CLIENT_EXTERNAL_ID,
178                     TestConstants.TEST_CHANNEL_ID);
179             dataEvent.setBatchId(Long.valueOf(batch.getBatchId()));
180             dataEvent.setBatched(true);
181             dataService.insertDataEvent(dataEvent);
182         }
183         return id;
184     }
185 
186 }