1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.jumpmind.symmetric.ext;
21
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29
30 import org.apache.commons.lang.ArrayUtils;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.jdom.Document;
34 import org.jdom.Element;
35 import org.jdom.Namespace;
36 import org.jdom.output.Format;
37 import org.jdom.output.XMLOutputter;
38 import org.jumpmind.symmetric.load.IDataLoader;
39 import org.jumpmind.symmetric.load.IDataLoaderContext;
40 import org.jumpmind.symmetric.model.DataEventType;
41 import org.jumpmind.symmetric.model.IncomingBatchHistory;
42
43 /***
44 * This is an optional data loader filter/listener that is capable of
45 * translating table data to XML and publishing it to JMS for consumption by the
46 * enterprise. It uses JDOM internally to create an XML representation of
47 * SymmetricDS data.
48 * </p>
49 */
50 public class XmlPublisherFilter implements IPublisherFilter, INodeGroupExtensionPoint {
51
52 private static final Log logger = LogFactory.getLog(XmlPublisherFilter.class);
53
54 private final String XML_CACHE = "XML_CACHE_" + this.hashCode();
55
56 protected IPublisher publisher;
57
58 private Set<String> tableNamesToPublishAsGroup;
59
60 private String xmlTagNameToUseForGroup = "batch";
61
62 private List<String> groupByColumnNames;
63
64 private String[] nodeGroups;
65
66 private boolean loadDataInTargetDatabase = true;
67
68 private boolean autoRegister = true;
69
70 private Format xmlFormat;
71
72 private ITimeGenerator timeStringGenerator = new ITimeGenerator() {
73 public String getTime() {
74 return Long.toString(System.currentTimeMillis());
75 }
76 };
77
78 public XmlPublisherFilter() {
79 xmlFormat = Format.getCompactFormat();
80 xmlFormat.setOmitDeclaration(true);
81 }
82
83 public boolean isAutoRegister() {
84 return autoRegister;
85 }
86
87 public String[] getNodeGroupIdsToApplyTo() {
88 return nodeGroups;
89 }
90
91 public boolean filterDelete(IDataLoaderContext ctx, String[] keys) {
92 if (tableNamesToPublishAsGroup == null || tableNamesToPublishAsGroup.contains(ctx.getTableName())) {
93 Element xml = getXmlFromCache(ctx, null, keys);
94 if (xml != null) {
95 toXmlElement(DataEventType.UPDATE, xml, ctx, null, keys);
96 }
97 }
98 return loadDataInTargetDatabase;
99 }
100
101 public boolean filterUpdate(IDataLoaderContext ctx, String[] data, String[] keys) {
102 if (tableNamesToPublishAsGroup == null || tableNamesToPublishAsGroup.contains(ctx.getTableName())) {
103 Element xml = getXmlFromCache(ctx, data, keys);
104 if (xml != null) {
105 toXmlElement(DataEventType.UPDATE, xml, ctx, data, keys);
106 }
107 }
108 return loadDataInTargetDatabase;
109 }
110
111 public boolean filterInsert(IDataLoaderContext ctx, String[] data) {
112 if (tableNamesToPublishAsGroup == null || tableNamesToPublishAsGroup.contains(ctx.getTableName())) {
113 Element xml = getXmlFromCache(ctx, data, null);
114 if (xml != null) {
115 toXmlElement(DataEventType.INSERT, xml, ctx, data, null);
116 }
117 }
118 return loadDataInTargetDatabase;
119 }
120
121 private Element getXmlFromCache(IDataLoaderContext ctx, String[] data, String[] keys) {
122 Element xml = null;
123 Map<String, Element> ctxCache = getXmlCache(ctx);
124 String txId = toXmlGroupId(ctx, data, keys);
125 if (txId != null) {
126 xml = ctxCache.get(txId);
127 if (xml == null) {
128 xml = new Element(xmlTagNameToUseForGroup);
129 xml.addNamespaceDeclaration(getXmlNamespace());
130 xml.setAttribute("id", txId);
131 addFormattedExtraGroupAttributes(ctx, xml);
132 ctxCache.put(txId, xml);
133 }
134 }
135 return xml;
136 }
137
138 private final static Namespace getXmlNamespace() {
139 return Namespace.getNamespace("xsi", "http://www.w3.org/2001/XMLSchema-instance");
140 }
141
142 /***
143 * Give the opportunity for the user of this publisher to add in additional
144 * attributes. The default implementation adds in the nodeId from the
145 * {@link IDataLoaderContext}.
146 *
147 * @param ctx
148 * @param xml
149 * append XML attributes to this buffer
150 */
151 protected void addFormattedExtraGroupAttributes(IDataLoaderContext ctx, Element xml) {
152 xml.setAttribute("nodeid", ctx.getNodeId());
153 if (timeStringGenerator != null) {
154 xml.setAttribute("time", timeStringGenerator.getTime());
155 }
156 }
157
158 @SuppressWarnings("unchecked")
159 protected Map<String, Element> getXmlCache(IDataLoaderContext ctx) {
160 Map<String, Object> cache = ctx.getContextCache();
161 Map<String, Element> xmlCache = (Map<String, Element>) cache.get(XML_CACHE);
162 if (xmlCache == null) {
163 xmlCache = new HashMap<String, Element>();
164 cache.put(XML_CACHE, xmlCache);
165 }
166 return xmlCache;
167 }
168
169 @SuppressWarnings("unchecked")
170 protected boolean doesXmlExistToPublish(IDataLoaderContext ctx) {
171 Map<String, Object> cache = ctx.getContextCache();
172 Map<String, StringBuilder> xmlCache = (Map<String, StringBuilder>) cache.get(XML_CACHE);
173 return xmlCache != null && xmlCache.size() > 0;
174 }
175
176 private void toXmlElement(DataEventType dml, Element xml, IDataLoaderContext ctx, String[] data, String[] keys) {
177 Element row = new Element("row");
178 xml.addContent(row);
179 row.setAttribute("entity", ctx.getTableName());
180 row.setAttribute("dml", dml.getCode());
181
182 String[] colNames = null;
183
184 if (data == null) {
185 colNames = ctx.getKeyNames();
186 data = keys;
187 } else {
188 colNames = ctx.getColumnNames();
189 }
190
191 for (int i = 0; i < data.length; i++) {
192 String col = colNames[i];
193 Element dataElement = new Element("data");
194 row.addContent(dataElement);
195 dataElement.setAttribute("key", col);
196 if (data[i] != null) {
197 dataElement.setText(data[i]);
198 } else {
199 dataElement.setAttribute("nil", "true", getXmlNamespace());
200 }
201 }
202 }
203
204 private String toXmlGroupId(IDataLoaderContext ctx, String[] data, String[] keys) {
205 if (groupByColumnNames != null) {
206 StringBuilder id = new StringBuilder();
207
208 if (keys != null) {
209 String[] columns = ctx.getKeyNames();
210 for (String col : groupByColumnNames) {
211 int index = ArrayUtils.indexOf(columns, col, 0);
212 if (index >= 0) {
213 id.append(data[index]);
214 } else {
215 id = new StringBuilder();
216 break;
217 }
218 }
219 }
220
221 if (id.length() == 0) {
222 String[] columns = ctx.getColumnNames();
223 for (String col : groupByColumnNames) {
224 int index = ArrayUtils.indexOf(columns, col, 0);
225 if (index >= 0) {
226 id.append(data[index]);
227 } else {
228 return null;
229 }
230 }
231 }
232
233 if (id.length() > 0) {
234 return id.toString().replaceAll("-", "");
235 }
236 } else {
237 logger.warn("You did not specify 'groupByColumnNames'. We cannot find any matches in the data to publish as XML if you don't. You might as well turn off this filter!");
238 }
239 return null;
240 }
241
242 private void finalizeXmlAndPublish(IDataLoaderContext ctx) {
243 Map<String, Element> ctxCache = getXmlCache(ctx);
244 Collection<Element> buffers = ctxCache.values();
245 for (Iterator<Element> iterator = buffers.iterator(); iterator.hasNext();) {
246 String xml = new XMLOutputter(xmlFormat).outputString(new Document(iterator.next()));
247 if (logger.isDebugEnabled()) {
248 logger.debug("Sending XML to IPublisher -> " + xml);
249 }
250 iterator.remove();
251 publisher.publish(ctx, xml.toString());
252 }
253
254 }
255
256 public void batchComplete(IDataLoader loader, IncomingBatchHistory hist) {
257 IDataLoaderContext ctx = loader.getContext();
258 if (doesXmlExistToPublish(ctx)) {
259 finalizeXmlAndPublish(ctx);
260 }
261 }
262
263 public void setTableNamesToPublishAsGroup(Set<String> tableNamesToPublishAsGroup) {
264 this.tableNamesToPublishAsGroup = tableNamesToPublishAsGroup;
265 }
266
267 public void setTableNameToPublish(String tableName) {
268 this.tableNamesToPublishAsGroup = new HashSet<String>(1);
269 this.tableNamesToPublishAsGroup.add(tableName);
270 }
271
272 public void setXmlTagNameToUseForGroup(String xmlTagNameToUseForGroup) {
273 this.xmlTagNameToUseForGroup = xmlTagNameToUseForGroup;
274 }
275
276 /***
277 * This attribute is required. It needs to identify the columns that will be used to key on
278 * rows in the specified tables that need to be grouped together in an 'XML batch.'
279 */
280 public void setGroupByColumnNames(List<String> groupByColumnNames) {
281 this.groupByColumnNames = groupByColumnNames;
282 }
283
284 public void setLoadDataInTargetDatabase(boolean loadDataInTargetDatabase) {
285 this.loadDataInTargetDatabase = loadDataInTargetDatabase;
286 }
287
288 public void setPublisher(IPublisher publisher) {
289 this.publisher = publisher;
290 }
291
292 public void setNodeGroups(String[] nodeGroups) {
293 this.nodeGroups = nodeGroups;
294 }
295
296 public void setAutoRegister(boolean autoRegister) {
297 this.autoRegister = autoRegister;
298 }
299
300 interface ITimeGenerator {
301 public String getTime();
302 }
303
304 /***
305 * Used to populate the time attribute of an XML message.
306 */
307 public void setTimeStringGenerator(ITimeGenerator timeStringGenerator) {
308 this.timeStringGenerator = timeStringGenerator;
309 }
310
311 public void setXmlFormat(Format xmlFormat) {
312 this.xmlFormat = xmlFormat;
313 }
314
315 }