1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.service.impl;
23
24 import java.sql.ResultSet;
25 import java.sql.SQLException;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
29
30 import org.jumpmind.symmetric.common.Constants;
31 import org.jumpmind.symmetric.model.BatchInfo;
32 import org.jumpmind.symmetric.model.Data;
33 import org.jumpmind.symmetric.model.DataEvent;
34 import org.jumpmind.symmetric.model.DataEventType;
35 import org.jumpmind.symmetric.model.OutgoingBatch;
36 import org.jumpmind.symmetric.model.OutgoingBatchHistory;
37 import org.jumpmind.symmetric.model.TriggerHistory;
38 import org.jumpmind.symmetric.service.IAcknowledgeService;
39 import org.jumpmind.symmetric.service.IDataService;
40 import org.jumpmind.symmetric.service.IOutgoingBatchService;
41 import org.jumpmind.symmetric.test.AbstractDatabaseTest;
42 import org.jumpmind.symmetric.test.TestConstants;
43 import org.junit.Assert;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.springframework.jdbc.core.RowMapper;
47
48
49 public class AcknowledgeServiceTest extends AbstractDatabaseTest {
50
51 protected IAcknowledgeService ackService;
52
53 protected IOutgoingBatchService outgoingBatchService;
54
55 protected IDataService dataService;
56
57 public AcknowledgeServiceTest() throws Exception {
58 super();
59 }
60
61 public AcknowledgeServiceTest(String dbName) {
62 super(dbName);
63 }
64
65 @Before
66 public void setUp() {
67 ackService = (IAcknowledgeService)find(Constants.ACKNOWLEDGE_SERVICE);
68 outgoingBatchService = (IOutgoingBatchService)find(Constants.OUTGOING_BATCH_SERVICE);
69 dataService = (IDataService)find(Constants.DATA_SERVICE);
70 }
71
72 @Test
73 public void okTest() {
74 cleanSlate();
75 ackService.ack(new BatchInfo(1));
76
77 List<OutgoingBatchHistory> history = getOutgoingBatchHistory(1);
78 Assert.assertEquals(history.size(), 1);
79 OutgoingBatchHistory hist = history.get(0);
80 Assert.assertEquals(hist.getBatchId(), 1);
81 Assert.assertEquals(hist.getStatus(), OutgoingBatchHistory.Status.OK);
82 }
83
84 private void cleanSlate() {
85 cleanSlate(TestConstants.TEST_PREFIX + "data_event", TestConstants.TEST_PREFIX + "data",
86 TestConstants.TEST_PREFIX + "outgoing_batch_hist", TestConstants.TEST_PREFIX + "outgoing_batch");
87 }
88
89 @Test
90 public void unspecifiedErrorTest() {
91 cleanSlate();
92 OutgoingBatch batch = createOutgoingBatch();
93 createDataEvents(batch, 5);
94 errorTestCore(batch.getBatchId(), -1, -1);
95 }
96
97 @Test
98 public void errorTest() {
99 cleanSlate();
100 OutgoingBatch batch = createOutgoingBatch();
101 long dataId[] = createDataEvents(batch, 5);
102 errorTestCore(batch.getBatchId(), 3, dataId[2]);
103 }
104
105 @Test
106 public void errorTestBoundary1() {
107 cleanSlate();
108 OutgoingBatch batch = createOutgoingBatch();
109 long dataId[] = createDataEvents(batch, 5);
110 errorTestCore(batch.getBatchId(), 1, dataId[0]);
111 }
112
113 @Test
114 public void errorTestBoundary2() {
115 cleanSlate();
116 OutgoingBatch batch = createOutgoingBatch();
117 long dataId[] = createDataEvents(batch, 5);
118 errorTestCore(batch.getBatchId(), 5, dataId[dataId.length - 1]);
119 }
120
121 @Test
122 public void errorErrorTest() {
123 cleanSlate();
124 OutgoingBatch batch = createOutgoingBatch();
125 createDataEvents(batch, 5);
126 errorTestCore(batch.getBatchId(), 7, -1);
127 }
128
129 protected void errorTestCore(long batchId, int errorLine, long expectedResults) {
130 ackService.ack(new BatchInfo(batchId, errorLine));
131 List<OutgoingBatchHistory> history = getOutgoingBatchHistory(batchId);
132 Assert.assertEquals(history.size(), 1);
133 OutgoingBatchHistory hist = history.get(0);
134 Assert.assertEquals(hist.getBatchId(), batchId);
135 Assert.assertEquals(hist.getStatus(), OutgoingBatchHistory.Status.ER);
136 Assert.assertEquals(hist.getFailedDataId(), expectedResults);
137 }
138
139 @SuppressWarnings("unchecked")
140 protected List<OutgoingBatchHistory> getOutgoingBatchHistory(long batchId) {
141 final String sql = "select batch_id, status, data_event_count, start_time, " + "failed_data_id from "
142 + TestConstants.TEST_PREFIX + "outgoing_batch_hist where batch_id = ?";
143 final List<OutgoingBatchHistory> list = new ArrayList<OutgoingBatchHistory>();
144 getJdbcTemplate().query(sql, new Object[] { batchId }, new RowMapper() {
145 public Object[] mapRow(ResultSet rs, int row) throws SQLException {
146 OutgoingBatchHistory item = new OutgoingBatchHistory();
147 item.setBatchId(rs.getLong(1));
148 item.setStatus(OutgoingBatchHistory.Status.valueOf(rs.getString(2)));
149 item.setDataEventCount(rs.getLong(3));
150 item.setStartTime(rs.getTimestamp(4));
151 item.setFailedDataId(rs.getLong(5));
152 list.add(item);
153 return null;
154 }
155 });
156 return list;
157 }
158
159 protected OutgoingBatch createOutgoingBatch() {
160 OutgoingBatch batch = new OutgoingBatch();
161 batch.setNodeId(TestConstants.TEST_CLIENT_EXTERNAL_ID);
162 batch.setChannelId(TestConstants.TEST_CHANNEL_ID);
163 batch.setBatchType("EV");
164 batch.setStatus("SE");
165 batch.setCreateTime(new Date());
166 outgoingBatchService.insertOutgoingBatch(batch);
167 return batch;
168 }
169
170 protected long[] createDataEvents(OutgoingBatch batch, int size) {
171 TriggerHistory audit = new TriggerHistory();
172 audit.setTriggerHistoryId(TestConstants.TEST_AUDIT_ID);
173 final long[] id = new long[size];
174 for (int i = 0; i < size; i++) {
175 Data data = new Data("table1", DataEventType.INSERT, "some data", "some data", audit);
176 id[i] = dataService.insertData(data);
177 DataEvent dataEvent = new DataEvent(id[i], TestConstants.TEST_CLIENT_EXTERNAL_ID,
178 TestConstants.TEST_CHANNEL_ID);
179 dataEvent.setBatchId(Long.valueOf(batch.getBatchId()));
180 dataEvent.setBatched(true);
181 dataService.insertDataEvent(dataEvent);
182 }
183 return id;
184 }
185
186 }