1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.jumpmind.symmetric.ext;
21
22 import java.util.Map;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.jumpmind.symmetric.load.IDataLoader;
27 import org.jumpmind.symmetric.load.IDataLoaderContext;
28 import org.jumpmind.symmetric.model.IncomingBatchHistory;
29 import org.springframework.beans.factory.BeanNameAware;
30
31 /***
32 * An abstract convenience class meant to be implemented by classes that need to
33 * publish text messages
34 */
35 abstract public class AbstractTextPublisherFilter implements IPublisherFilter, INodeGroupExtensionPoint, BeanNameAware {
36
37 private static final Log logger = LogFactory.getLog(AbstractTextPublisherFilter.class);
38
39 private final String MSG_CACHE = "msg_CACHE" + hashCode();
40
41 protected IPublisher publisher;
42
43 private boolean loadDataInTargetDatabase = true;
44
45 protected String tableName;
46
47 private boolean autoRegister = true;
48
49 private String[] nodeGroupIdsToApplyTo;
50
51 private int messagesSinceLastLogOutput = 0;
52
53 private long minTimeInMsBetweenLogOutput = 30000;
54
55 private long lastTimeInMsOutputLogged = System.currentTimeMillis();
56
57 private String beanName;
58
59 protected abstract String addTextHeader(IDataLoaderContext ctx);
60
61 protected abstract String addTextElementForDelete(IDataLoaderContext ctx, String[] keys);
62
63 protected abstract String addTextElementForUpdate(IDataLoaderContext ctx, String[] data, String[] keys);
64
65 protected abstract String addTextElementForInsert(IDataLoaderContext ctx, String[] data);
66
67 protected abstract String addTextFooter(IDataLoaderContext ctx);
68
69 public void setBeanName(String name) {
70 this.beanName = name;
71 }
72
73 public boolean filterDelete(IDataLoaderContext ctx, String[] keys) {
74 if (tableName != null && tableName.equals(ctx.getTableName())) {
75 String msg = addTextElementForDelete(ctx, keys);
76 if (msg != null) {
77 getFromCache(ctx).append(msg);
78 }
79 }
80 return loadDataInTargetDatabase;
81 }
82
83 public boolean filterUpdate(IDataLoaderContext ctx, String[] data, String[] keys) {
84 if (tableName != null && tableName.equals(ctx.getTableName())) {
85 String msg = addTextElementForUpdate(ctx, data, keys);
86 if (msg != null) {
87 getFromCache(ctx).append(msg);
88 }
89 }
90 return loadDataInTargetDatabase;
91 }
92
93 public boolean filterInsert(IDataLoaderContext ctx, String[] data) {
94 if (tableName != null && tableName.equals(ctx.getTableName())) {
95 String msg = addTextElementForInsert(ctx, data);
96 if (msg != null) {
97 getFromCache(ctx).append(msg);
98 }
99 }
100 return loadDataInTargetDatabase;
101 }
102
103 protected StringBuilder getFromCache(IDataLoaderContext ctx) {
104 Map<String, Object> cache = ctx.getContextCache();
105 StringBuilder msgCache = (StringBuilder) cache.get(MSG_CACHE);
106 if (msgCache == null) {
107 msgCache = new StringBuilder(addTextHeader(ctx));
108 cache.put(MSG_CACHE, msgCache);
109 }
110 return msgCache;
111 }
112
113 @SuppressWarnings("unchecked")
114 protected boolean doesTextExistToPublish(IDataLoaderContext ctx) {
115 Map<String, Object> cache = ctx.getContextCache();
116 StringBuilder msgCache = (StringBuilder) cache.get(MSG_CACHE);
117 return msgCache != null && msgCache.length() > 0;
118 }
119
120 private void finalizeAndPublish(IDataLoaderContext ctx) {
121 StringBuilder msg = getFromCache(ctx);
122 if (msg.length() > 0) {
123 msg.append(addTextFooter(ctx));
124 if (logger.isDebugEnabled()) {
125 logger.debug("publishing text message -> " + msg);
126 }
127 ctx.getContextCache().remove(MSG_CACHE);
128 publisher.publish(ctx, msg.toString());
129 }
130 }
131
132 public void batchComplete(IDataLoader loader, IncomingBatchHistory hist) {
133 IDataLoaderContext ctx = loader.getContext();
134 if (doesTextExistToPublish(ctx)) {
135 finalizeAndPublish(ctx);
136 logCount();
137 }
138 }
139
140 protected void logCount() {
141 messagesSinceLastLogOutput++;
142 long timeInMsSinceLastLogOutput = System.currentTimeMillis() - lastTimeInMsOutputLogged;
143 if (timeInMsSinceLastLogOutput > minTimeInMsBetweenLogOutput) {
144 if (logger.isInfoEnabled()) {
145 logger.info(beanName + " published " + messagesSinceLastLogOutput + " messages in the last "
146 + timeInMsSinceLastLogOutput + "ms");
147 }
148 lastTimeInMsOutputLogged = System.currentTimeMillis();
149 messagesSinceLastLogOutput = 0;
150 }
151 }
152
153 public void setLoadDataInTargetDatabase(boolean loadDataInTargetDatabase) {
154 this.loadDataInTargetDatabase = loadDataInTargetDatabase;
155 }
156
157 public void setPublisher(IPublisher publisher) {
158 this.publisher = publisher;
159 }
160
161 public void setAutoRegister(boolean autoRegister) {
162 this.autoRegister = autoRegister;
163 }
164
165 public boolean isAutoRegister() {
166 return autoRegister;
167 }
168
169 public String[] getNodeGroupIdsToApplyTo() {
170 return nodeGroupIdsToApplyTo;
171 }
172
173 public void setNodeGroupIdToApplyTo(String nodeGroupdId) {
174 this.nodeGroupIdsToApplyTo = new String[] { nodeGroupdId };
175 }
176
177 public void setMessagesSinceLastLogOutput(int messagesSinceLastLogOutput) {
178 this.messagesSinceLastLogOutput = messagesSinceLastLogOutput;
179 }
180
181 public void setMinTimeInMsBetweenLogOutput(long timeInMsBetweenLogOutput) {
182 this.minTimeInMsBetweenLogOutput = timeInMsBetweenLogOutput;
183 }
184
185 public void setTableName(String tableName) {
186 this.tableName = tableName;
187 }
188
189 }