View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  package org.jumpmind.symmetric.db;
21  
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  
28  import org.apache.commons.codec.binary.Base64;
29  import org.apache.commons.lang.StringUtils;
30  import org.apache.commons.lang.time.FastDateFormat;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.ddlutils.model.Column;
34  import org.apache.ddlutils.model.Table;
35  import org.hsqldb.types.Binary;
36  import org.jumpmind.symmetric.SymmetricEngine;
37  import org.jumpmind.symmetric.common.Constants;
38  import org.jumpmind.symmetric.model.Data;
39  import org.jumpmind.symmetric.model.DataEventType;
40  import org.jumpmind.symmetric.model.Trigger;
41  import org.jumpmind.symmetric.model.TriggerHistory;
42  import org.jumpmind.symmetric.service.IBootstrapService;
43  import org.jumpmind.symmetric.service.IConfigurationService;
44  import org.jumpmind.symmetric.service.IDataService;
45  import org.jumpmind.symmetric.service.INodeService;
46  import org.jumpmind.symmetric.util.AppUtils;
47  
48  /***
49   * This class implements the functionality needed by (most) java-based symmetric
50   * triggers.
51   */
52  public abstract class AbstractEmbeddedTrigger {
53  
54      protected static final Log logger = LogFactory.getLog(AbstractEmbeddedTrigger.class);
55  
56      protected static final FastDateFormat dateFormatter = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.S");
57  
58      protected IDataService dataService;
59  
60      protected IConfigurationService configurationService;
61      
62      protected IBootstrapService bootstrapService;
63  
64      protected INodeService nodeService;
65  
66      protected IDbDialect dbDialect;
67  
68      protected Table table;
69  
70      protected TriggerHistory triggerHistory;
71  
72      protected Trigger trigger;
73  
74      protected DataEventType triggerType;
75  
76      protected String tableName;
77  
78      protected Set<String> excludedColumns;
79  
80      protected List<String> includedColumns;
81  
82      protected boolean initialize(DataEventType triggerType, String tableName) {
83          this.triggerType = triggerType;
84          this.tableName = tableName;
85          SymmetricEngine engine = SymmetricEngine.findEngineByName(getEngineName().toLowerCase());
86          this.dataService = getDataService(engine);        
87          this.bootstrapService = getBootstrapService(engine);
88          this.configurationService = getConfigurationService(engine);
89          this.nodeService = getNodeService(engine);
90          this.dbDialect = getDbDialect(engine);
91          this.triggerHistory = configurationService.getHistoryRecordFor(getTriggerHistId());
92          this.trigger = bootstrapService.getCachedTriggers(true).get(triggerHistory.getTriggerId());
93          if (trigger == null) {
94              logger.warn(String.format("Could not find an %s trigger in the cache for table %s and a hist id of %s.", triggerType.name(), tableName, getTriggerHistId()));
95              return false;
96          }
97          this.table = dbDialect.getMetaDataFor(null, trigger.getSourceSchemaName(), tableName, true);
98          initColumnNames(trigger);
99          return true;
100     }
101 
102     protected abstract String getEngineName();
103 
104     protected abstract int getTriggerHistId();
105 
106     protected abstract String getTransactionId(Object[] oldRow, Object[] newRow);
107 
108     protected String formatRowData(Object[] oldRow, Object[] newRow) {
109         if (triggerType == DataEventType.UPDATE || triggerType == DataEventType.INSERT) {
110             return formatAsCsv(getOrderedColumnValues(newRow));
111         } else {
112             return null;
113         }
114     }
115 
116     protected String formatPkRowData(Object[] oldRow, Object[] newRow) {
117         if (triggerType == DataEventType.UPDATE || triggerType == DataEventType.DELETE) {
118             return formatAsCsv(getPrimaryKeys(oldRow));
119         } else {
120             return null;
121         }
122     }
123 
124     protected String formatAsCsv(Object[] data) {
125         StringBuilder b = new StringBuilder();
126         if (data != null) {
127             for (Object object : data) {
128                 if (object != null) {
129                     if (object instanceof String) {
130                         b.append("\"");
131                         b.append(StringUtils
132                                 .replace(StringUtils.replace(object.toString(), "//", "////"), "\"", "//\""));
133                         b.append("\"");
134                     } else if (object instanceof Number) {
135                         b.append("\"");
136                         b.append(object);
137                         b.append("\"");
138                     } else if (object instanceof Date) {
139                         b.append(dateFormatter.format((Date) object));
140                     } else if (object instanceof byte[]) {
141                         b.append("\"");
142                         b.append(Base64.encodeBase64((byte[]) object));
143                         b.append("\"");
144                     } else if (object instanceof Binary) {
145                         b.append("\"");
146                         Binary d = (Binary) object;
147                         b.append(new String(Base64.encodeBase64(d.getBytes())));
148                         b.append("\"");
149                     } else if (object instanceof Boolean) {
150                         b.append(((Boolean) object) ? "\"1\"" : "\"0\"");
151                     } else {
152                         throw new IllegalStateException("Could not format " + object + " which is of type "
153                                 + object.getClass().getName());
154                     }
155                 }
156                 b.append(",");
157             }
158             b.deleteCharAt(b.length() - 1);
159         }
160         return b.toString();
161     }
162 
163     protected Data createData(Object[] oldRow, Object[] newRow) {
164         Data data = new Data(StringUtils.isBlank(trigger.getTargetTableName()) ? tableName : trigger
165                 .getTargetTableName(), triggerType, formatRowData(oldRow, newRow), formatPkRowData(oldRow, newRow),
166                 triggerHistory);
167         if (triggerType == DataEventType.UPDATE && trigger.isSyncColumnLevel()) {
168             data.setOldData(formatAsCsv(getOrderedColumnValues(oldRow)));
169         }
170         return data;
171     }
172 
173     protected Object[] getPrimaryKeys(Object[] allValues) {
174         Column[] keys = table.getPrimaryKeyColumns();
175         if (keys == null) {
176             keys = table.getColumns();
177         }
178         Object[] keyValues = new Object[keys.length];
179         for (int i = 0; i < keys.length; i++) {
180             keyValues[i] = allValues[table.getColumnIndex(keys[i])];
181         }
182         return keyValues;
183     }
184 
185     protected Object[] getOrderedColumnValues(Object[] allValues) {
186         Column[] columns = table.getColumns();
187         Object[] values = new Object[columns.length - excludedColumns.size()];
188         int x = 0;
189         for (int i = 0; i < columns.length; i++) {
190             if (!excludedColumns.contains(columns[i].getName().toLowerCase())) {
191                 values[x++] = allValues[i];
192             }
193         }
194         return values;
195     }
196 
197     private void initColumnNames(Trigger trigger) {
198         excludedColumns = new HashSet<String>();
199         String nameString = trigger.getExcludedColumnNames();
200         if (!StringUtils.isBlank(nameString)) {
201             String[] values = nameString.split(",");
202             for (String string : values) {
203                 excludedColumns.add(string.toLowerCase());
204             }
205         }
206 
207         includedColumns = new ArrayList<String>();
208         Column[] columns = table.getColumns();
209         for (int i = 0; i < columns.length; i++) {
210             String name = columns[i].getName().toLowerCase();
211             if (!excludedColumns.contains(name)) {
212                 includedColumns.add(name);
213             }
214         }
215     }
216 
217     private IDbDialect getDbDialect(SymmetricEngine engine) {
218         return AppUtils.find(Constants.DB_DIALECT, engine);
219     }
220 
221     private IConfigurationService getConfigurationService(SymmetricEngine engine) {
222         return AppUtils.find(Constants.CONFIG_SERVICE, engine);
223     }
224 
225     private INodeService getNodeService(SymmetricEngine engine) {
226         return AppUtils.find(Constants.NODE_SERVICE, engine);
227     }
228     
229     private IBootstrapService getBootstrapService(SymmetricEngine engine) {
230         return AppUtils.find(Constants.BOOTSTRAP_SERVICE, engine);
231     }
232 
233     private IDataService getDataService(SymmetricEngine engine) {
234         return AppUtils.find(Constants.DATA_SERVICE, engine);
235     }
236 
237 }