View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  package org.jumpmind.symmetric.ext;
21  
22  import java.util.Collection;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  
30  import org.apache.commons.lang.ArrayUtils;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.jdom.Document;
34  import org.jdom.Element;
35  import org.jdom.Namespace;
36  import org.jdom.output.Format;
37  import org.jdom.output.XMLOutputter;
38  import org.jumpmind.symmetric.load.IDataLoader;
39  import org.jumpmind.symmetric.load.IDataLoaderContext;
40  import org.jumpmind.symmetric.model.DataEventType;
41  import org.jumpmind.symmetric.model.IncomingBatchHistory;
42  
43  /***
44   * This is an optional data loader filter/listener that is capable of
45   * translating table data to XML and publishing it to JMS for consumption by the
46   * enterprise. It uses JDOM internally to create an XML representation of
47   * SymmetricDS data.
48   * </p>
49   */
50  public class XmlPublisherFilter implements IPublisherFilter, INodeGroupExtensionPoint {
51  
52      private static final Log logger = LogFactory.getLog(XmlPublisherFilter.class);
53  
54      private final String XML_CACHE = "XML_CACHE_" + this.hashCode();
55  
56      protected IPublisher publisher;
57  
58      private Set<String> tableNamesToPublishAsGroup;
59  
60      private String xmlTagNameToUseForGroup = "batch";
61  
62      private List<String> groupByColumnNames;
63  
64      private String[] nodeGroups;
65  
66      private boolean loadDataInTargetDatabase = true;
67  
68      private boolean autoRegister = true;
69      
70      private Format xmlFormat;
71  
72      private ITimeGenerator timeStringGenerator = new ITimeGenerator() {
73          public String getTime() {
74              return Long.toString(System.currentTimeMillis());
75          }
76      };
77      
78      public XmlPublisherFilter() {
79          xmlFormat = Format.getCompactFormat();
80          xmlFormat.setOmitDeclaration(true);
81      }
82  
83      public boolean isAutoRegister() {
84          return autoRegister;
85      }
86  
87      public String[] getNodeGroupIdsToApplyTo() {
88          return nodeGroups;
89      }
90  
91      public boolean filterDelete(IDataLoaderContext ctx, String[] keys) {
92          if (tableNamesToPublishAsGroup == null || tableNamesToPublishAsGroup.contains(ctx.getTableName())) {
93              Element xml = getXmlFromCache(ctx, null, keys);
94              if (xml != null) {
95                  toXmlElement(DataEventType.UPDATE, xml, ctx, null, keys);
96              }
97          }
98          return loadDataInTargetDatabase;
99      }
100 
101     public boolean filterUpdate(IDataLoaderContext ctx, String[] data, String[] keys) {
102         if (tableNamesToPublishAsGroup == null || tableNamesToPublishAsGroup.contains(ctx.getTableName())) {
103             Element xml = getXmlFromCache(ctx, data, keys);
104             if (xml != null) {
105                 toXmlElement(DataEventType.UPDATE, xml, ctx, data, keys);
106             }
107         }
108         return loadDataInTargetDatabase;
109     }
110 
111     public boolean filterInsert(IDataLoaderContext ctx, String[] data) {
112         if (tableNamesToPublishAsGroup == null || tableNamesToPublishAsGroup.contains(ctx.getTableName())) {
113             Element xml = getXmlFromCache(ctx, data, null);
114             if (xml != null) {
115                 toXmlElement(DataEventType.INSERT, xml, ctx, data, null);
116             }
117         }
118         return loadDataInTargetDatabase;
119     }
120 
121     private Element getXmlFromCache(IDataLoaderContext ctx, String[] data, String[] keys) {
122         Element xml = null;
123         Map<String, Element> ctxCache = getXmlCache(ctx);
124         String txId = toXmlGroupId(ctx, data, keys);
125         if (txId != null) {
126             xml = ctxCache.get(txId);
127             if (xml == null) {
128                 xml = new Element(xmlTagNameToUseForGroup);
129                 xml.addNamespaceDeclaration(getXmlNamespace());
130                 xml.setAttribute("id", txId);
131                 addFormattedExtraGroupAttributes(ctx, xml);
132                 ctxCache.put(txId, xml);
133             }
134         }
135         return xml;
136     }
137 
138     private final static Namespace getXmlNamespace() {
139         return Namespace.getNamespace("xsi", "http://www.w3.org/2001/XMLSchema-instance");
140     }
141 
142     /***
143      * Give the opportunity for the user of this publisher to add in additional
144      * attributes. The default implementation adds in the nodeId from the
145      * {@link IDataLoaderContext}.
146      * 
147      * @param ctx
148      * @param xml
149      *                append XML attributes to this buffer
150      */
151     protected void addFormattedExtraGroupAttributes(IDataLoaderContext ctx, Element xml) {
152         xml.setAttribute("nodeid", ctx.getNodeId());
153         if (timeStringGenerator != null) {
154             xml.setAttribute("time", timeStringGenerator.getTime());
155         }
156     }
157 
158     @SuppressWarnings("unchecked")
159     protected Map<String, Element> getXmlCache(IDataLoaderContext ctx) {
160         Map<String, Object> cache = ctx.getContextCache();
161         Map<String, Element> xmlCache = (Map<String, Element>) cache.get(XML_CACHE);
162         if (xmlCache == null) {
163             xmlCache = new HashMap<String, Element>();
164             cache.put(XML_CACHE, xmlCache);
165         }
166         return xmlCache;
167     }
168 
169     @SuppressWarnings("unchecked")
170     protected boolean doesXmlExistToPublish(IDataLoaderContext ctx) {
171         Map<String, Object> cache = ctx.getContextCache();
172         Map<String, StringBuilder> xmlCache = (Map<String, StringBuilder>) cache.get(XML_CACHE);
173         return xmlCache != null && xmlCache.size() > 0;
174     }
175 
176     private void toXmlElement(DataEventType dml, Element xml, IDataLoaderContext ctx, String[] data, String[] keys) {
177         Element row = new Element("row");
178         xml.addContent(row);
179         row.setAttribute("entity", ctx.getTableName());
180         row.setAttribute("dml", dml.getCode());
181 
182         String[] colNames = null;
183 
184         if (data == null) {
185             colNames = ctx.getKeyNames();
186             data = keys;
187         } else {
188             colNames = ctx.getColumnNames();
189         }
190 
191         for (int i = 0; i < data.length; i++) {
192             String col = colNames[i];
193             Element dataElement = new Element("data");
194             row.addContent(dataElement);
195             dataElement.setAttribute("key", col);
196             if (data[i] != null) {
197                 dataElement.setText(data[i]);
198             } else {
199                 dataElement.setAttribute("nil", "true", getXmlNamespace());
200             }
201         }
202     }
203 
204     private String toXmlGroupId(IDataLoaderContext ctx, String[] data, String[] keys) {
205         if (groupByColumnNames != null) {
206             StringBuilder id = new StringBuilder();
207 
208             if (keys != null) {
209                 String[] columns = ctx.getKeyNames();
210                 for (String col : groupByColumnNames) {
211                     int index = ArrayUtils.indexOf(columns, col, 0);
212                     if (index >= 0) {
213                         id.append(data[index]);
214                     } else {
215                         id = new StringBuilder();
216                         break;
217                     }
218                 }
219             }
220 
221             if (id.length() == 0) {
222                 String[] columns = ctx.getColumnNames();
223                 for (String col : groupByColumnNames) {
224                     int index = ArrayUtils.indexOf(columns, col, 0);
225                     if (index >= 0) {
226                         id.append(data[index]);
227                     } else {
228                         return null;
229                     }
230                 }
231             }
232 
233             if (id.length() > 0) {
234                 return id.toString().replaceAll("-", "");
235             }
236         } else {
237             logger.warn("You did not specify 'groupByColumnNames'.  We cannot find any matches in the data to publish as XML if you don't.  You might as well turn off this filter!");
238         }
239         return null;
240     }
241 
242     private void finalizeXmlAndPublish(IDataLoaderContext ctx) {
243         Map<String, Element> ctxCache = getXmlCache(ctx);
244         Collection<Element> buffers = ctxCache.values();
245         for (Iterator<Element> iterator = buffers.iterator(); iterator.hasNext();) {
246             String xml = new XMLOutputter(xmlFormat).outputString(new Document(iterator.next()));
247             if (logger.isDebugEnabled()) {
248                 logger.debug("Sending XML to IPublisher -> " + xml);
249             }
250             iterator.remove();
251             publisher.publish(ctx, xml.toString());
252         }
253 
254     }
255 
256     public void batchComplete(IDataLoader loader, IncomingBatchHistory hist) {
257         IDataLoaderContext ctx = loader.getContext();
258         if (doesXmlExistToPublish(ctx)) {
259             finalizeXmlAndPublish(ctx);
260         }
261     }
262 
263     public void setTableNamesToPublishAsGroup(Set<String> tableNamesToPublishAsGroup) {
264         this.tableNamesToPublishAsGroup = tableNamesToPublishAsGroup;
265     }
266     
267     public void setTableNameToPublish(String tableName) {
268         this.tableNamesToPublishAsGroup = new HashSet<String>(1);
269         this.tableNamesToPublishAsGroup.add(tableName);
270     }
271 
272     public void setXmlTagNameToUseForGroup(String xmlTagNameToUseForGroup) {
273         this.xmlTagNameToUseForGroup = xmlTagNameToUseForGroup;
274     }
275 
276     /***
277      * This attribute is required.  It needs to identify the columns that will be used to key on
278      * rows in the specified tables that need to be grouped together in an 'XML batch.'
279      */
280     public void setGroupByColumnNames(List<String> groupByColumnNames) {
281         this.groupByColumnNames = groupByColumnNames;
282     }
283 
284     public void setLoadDataInTargetDatabase(boolean loadDataInTargetDatabase) {
285         this.loadDataInTargetDatabase = loadDataInTargetDatabase;
286     }
287 
288     public void setPublisher(IPublisher publisher) {
289         this.publisher = publisher;
290     }
291 
292     public void setNodeGroups(String[] nodeGroups) {
293         this.nodeGroups = nodeGroups;
294     }
295 
296     public void setAutoRegister(boolean autoRegister) {
297         this.autoRegister = autoRegister;
298     }
299 
300     interface ITimeGenerator {
301         public String getTime();
302     }
303 
304     /***
305      * Used to populate the time attribute of an XML message.
306      */
307     public void setTimeStringGenerator(ITimeGenerator timeStringGenerator) {
308         this.timeStringGenerator = timeStringGenerator;
309     }
310 
311     public void setXmlFormat(Format xmlFormat) {
312         this.xmlFormat = xmlFormat;
313     }
314 
315 }