1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.jumpmind.symmetric.db;
21
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27
28 import org.apache.commons.codec.binary.Base64;
29 import org.apache.commons.lang.StringUtils;
30 import org.apache.commons.lang.time.FastDateFormat;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.ddlutils.model.Column;
34 import org.apache.ddlutils.model.Table;
35 import org.hsqldb.types.Binary;
36 import org.jumpmind.symmetric.SymmetricEngine;
37 import org.jumpmind.symmetric.common.Constants;
38 import org.jumpmind.symmetric.model.Data;
39 import org.jumpmind.symmetric.model.DataEventType;
40 import org.jumpmind.symmetric.model.Trigger;
41 import org.jumpmind.symmetric.model.TriggerHistory;
42 import org.jumpmind.symmetric.service.IBootstrapService;
43 import org.jumpmind.symmetric.service.IConfigurationService;
44 import org.jumpmind.symmetric.service.IDataService;
45 import org.jumpmind.symmetric.service.INodeService;
46 import org.jumpmind.symmetric.util.AppUtils;
47
48 /***
49 * This class implements the functionality needed by (most) java-based symmetric
50 * triggers.
51 */
52 public abstract class AbstractEmbeddedTrigger {
53
54 protected static final Log logger = LogFactory.getLog(AbstractEmbeddedTrigger.class);
55
56 protected static final FastDateFormat dateFormatter = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.S");
57
58 protected IDataService dataService;
59
60 protected IConfigurationService configurationService;
61
62 protected IBootstrapService bootstrapService;
63
64 protected INodeService nodeService;
65
66 protected IDbDialect dbDialect;
67
68 protected Table table;
69
70 protected TriggerHistory triggerHistory;
71
72 protected Trigger trigger;
73
74 protected DataEventType triggerType;
75
76 protected String tableName;
77
78 protected Set<String> excludedColumns;
79
80 protected List<String> includedColumns;
81
82 protected boolean initialize(DataEventType triggerType, String tableName) {
83 this.triggerType = triggerType;
84 this.tableName = tableName;
85 SymmetricEngine engine = SymmetricEngine.findEngineByName(getEngineName().toLowerCase());
86 this.dataService = getDataService(engine);
87 this.bootstrapService = getBootstrapService(engine);
88 this.configurationService = getConfigurationService(engine);
89 this.nodeService = getNodeService(engine);
90 this.dbDialect = getDbDialect(engine);
91 this.triggerHistory = configurationService.getHistoryRecordFor(getTriggerHistId());
92 this.trigger = bootstrapService.getCachedTriggers(true).get(triggerHistory.getTriggerId());
93 if (trigger == null) {
94 logger.warn(String.format("Could not find an %s trigger in the cache for table %s and a hist id of %s.", triggerType.name(), tableName, getTriggerHistId()));
95 return false;
96 }
97 this.table = dbDialect.getMetaDataFor(null, trigger.getSourceSchemaName(), tableName, true);
98 initColumnNames(trigger);
99 return true;
100 }
101
102 protected abstract String getEngineName();
103
104 protected abstract int getTriggerHistId();
105
106 protected abstract String getTransactionId(Object[] oldRow, Object[] newRow);
107
108 protected String formatRowData(Object[] oldRow, Object[] newRow) {
109 if (triggerType == DataEventType.UPDATE || triggerType == DataEventType.INSERT) {
110 return formatAsCsv(getOrderedColumnValues(newRow));
111 } else {
112 return null;
113 }
114 }
115
116 protected String formatPkRowData(Object[] oldRow, Object[] newRow) {
117 if (triggerType == DataEventType.UPDATE || triggerType == DataEventType.DELETE) {
118 return formatAsCsv(getPrimaryKeys(oldRow));
119 } else {
120 return null;
121 }
122 }
123
124 protected String formatAsCsv(Object[] data) {
125 StringBuilder b = new StringBuilder();
126 if (data != null) {
127 for (Object object : data) {
128 if (object != null) {
129 if (object instanceof String) {
130 b.append("\"");
131 b.append(StringUtils
132 .replace(StringUtils.replace(object.toString(), "//", "////"), "\"", "//\""));
133 b.append("\"");
134 } else if (object instanceof Number) {
135 b.append("\"");
136 b.append(object);
137 b.append("\"");
138 } else if (object instanceof Date) {
139 b.append(dateFormatter.format((Date) object));
140 } else if (object instanceof byte[]) {
141 b.append("\"");
142 b.append(Base64.encodeBase64((byte[]) object));
143 b.append("\"");
144 } else if (object instanceof Binary) {
145 b.append("\"");
146 Binary d = (Binary) object;
147 b.append(new String(Base64.encodeBase64(d.getBytes())));
148 b.append("\"");
149 } else if (object instanceof Boolean) {
150 b.append(((Boolean) object) ? "\"1\"" : "\"0\"");
151 } else {
152 throw new IllegalStateException("Could not format " + object + " which is of type "
153 + object.getClass().getName());
154 }
155 }
156 b.append(",");
157 }
158 b.deleteCharAt(b.length() - 1);
159 }
160 return b.toString();
161 }
162
163 protected Data createData(Object[] oldRow, Object[] newRow) {
164 Data data = new Data(StringUtils.isBlank(trigger.getTargetTableName()) ? tableName : trigger
165 .getTargetTableName(), triggerType, formatRowData(oldRow, newRow), formatPkRowData(oldRow, newRow),
166 triggerHistory);
167 if (triggerType == DataEventType.UPDATE && trigger.isSyncColumnLevel()) {
168 data.setOldData(formatAsCsv(getOrderedColumnValues(oldRow)));
169 }
170 return data;
171 }
172
173 protected Object[] getPrimaryKeys(Object[] allValues) {
174 Column[] keys = table.getPrimaryKeyColumns();
175 if (keys == null) {
176 keys = table.getColumns();
177 }
178 Object[] keyValues = new Object[keys.length];
179 for (int i = 0; i < keys.length; i++) {
180 keyValues[i] = allValues[table.getColumnIndex(keys[i])];
181 }
182 return keyValues;
183 }
184
185 protected Object[] getOrderedColumnValues(Object[] allValues) {
186 Column[] columns = table.getColumns();
187 Object[] values = new Object[columns.length - excludedColumns.size()];
188 int x = 0;
189 for (int i = 0; i < columns.length; i++) {
190 if (!excludedColumns.contains(columns[i].getName().toLowerCase())) {
191 values[x++] = allValues[i];
192 }
193 }
194 return values;
195 }
196
197 private void initColumnNames(Trigger trigger) {
198 excludedColumns = new HashSet<String>();
199 String nameString = trigger.getExcludedColumnNames();
200 if (!StringUtils.isBlank(nameString)) {
201 String[] values = nameString.split(",");
202 for (String string : values) {
203 excludedColumns.add(string.toLowerCase());
204 }
205 }
206
207 includedColumns = new ArrayList<String>();
208 Column[] columns = table.getColumns();
209 for (int i = 0; i < columns.length; i++) {
210 String name = columns[i].getName().toLowerCase();
211 if (!excludedColumns.contains(name)) {
212 includedColumns.add(name);
213 }
214 }
215 }
216
217 private IDbDialect getDbDialect(SymmetricEngine engine) {
218 return AppUtils.find(Constants.DB_DIALECT, engine);
219 }
220
221 private IConfigurationService getConfigurationService(SymmetricEngine engine) {
222 return AppUtils.find(Constants.CONFIG_SERVICE, engine);
223 }
224
225 private INodeService getNodeService(SymmetricEngine engine) {
226 return AppUtils.find(Constants.NODE_SERVICE, engine);
227 }
228
229 private IBootstrapService getBootstrapService(SymmetricEngine engine) {
230 return AppUtils.find(Constants.BOOTSTRAP_SERVICE, engine);
231 }
232
233 private IDataService getDataService(SymmetricEngine engine) {
234 return AppUtils.find(Constants.DATA_SERVICE, engine);
235 }
236
237 }