View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  package org.jumpmind.symmetric.ext;
21  
22  import java.util.Map;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.jumpmind.symmetric.load.IDataLoader;
27  import org.jumpmind.symmetric.load.IDataLoaderContext;
28  import org.jumpmind.symmetric.model.IncomingBatchHistory;
29  import org.springframework.beans.factory.BeanNameAware;
30  
31  /***
32   * An abstract convenience class meant to be implemented by classes that need to
33   * publish text messages
34   */
35  abstract public class AbstractTextPublisherFilter implements IPublisherFilter, INodeGroupExtensionPoint, BeanNameAware {
36  
37      private static final Log logger = LogFactory.getLog(AbstractTextPublisherFilter.class);
38  
39      private final String MSG_CACHE = "msg_CACHE" + hashCode();
40  
41      protected IPublisher publisher;
42  
43      private boolean loadDataInTargetDatabase = true;
44  
45      protected String tableName;
46  
47      private boolean autoRegister = true;
48  
49      private String[] nodeGroupIdsToApplyTo;
50  
51      private int messagesSinceLastLogOutput = 0;
52  
53      private long minTimeInMsBetweenLogOutput = 30000;
54  
55      private long lastTimeInMsOutputLogged = System.currentTimeMillis();
56  
57      private String beanName;
58  
59      protected abstract String addTextHeader(IDataLoaderContext ctx);
60  
61      protected abstract String addTextElementForDelete(IDataLoaderContext ctx, String[] keys);
62  
63      protected abstract String addTextElementForUpdate(IDataLoaderContext ctx, String[] data, String[] keys);
64  
65      protected abstract String addTextElementForInsert(IDataLoaderContext ctx, String[] data);
66  
67      protected abstract String addTextFooter(IDataLoaderContext ctx);
68  
69      public void setBeanName(String name) {
70          this.beanName = name;
71      }
72  
73      public boolean filterDelete(IDataLoaderContext ctx, String[] keys) {
74          if (tableName != null && tableName.equals(ctx.getTableName())) {
75              String msg = addTextElementForDelete(ctx, keys);
76              if (msg != null) {
77                  getFromCache(ctx).append(msg);
78              }
79          }
80          return loadDataInTargetDatabase;
81      }
82  
83      public boolean filterUpdate(IDataLoaderContext ctx, String[] data, String[] keys) {
84          if (tableName != null && tableName.equals(ctx.getTableName())) {
85              String msg = addTextElementForUpdate(ctx, data, keys);
86              if (msg != null) {
87                  getFromCache(ctx).append(msg);
88              }
89          }
90          return loadDataInTargetDatabase;
91      }
92  
93      public boolean filterInsert(IDataLoaderContext ctx, String[] data) {
94          if (tableName != null && tableName.equals(ctx.getTableName())) {
95              String msg = addTextElementForInsert(ctx, data);
96              if (msg != null) {
97                  getFromCache(ctx).append(msg);
98              }
99          }
100         return loadDataInTargetDatabase;
101     }
102 
103     protected StringBuilder getFromCache(IDataLoaderContext ctx) {
104         Map<String, Object> cache = ctx.getContextCache();
105         StringBuilder msgCache = (StringBuilder) cache.get(MSG_CACHE);
106         if (msgCache == null) {
107             msgCache = new StringBuilder(addTextHeader(ctx));
108             cache.put(MSG_CACHE, msgCache);
109         }
110         return msgCache;
111     }
112 
113     @SuppressWarnings("unchecked")
114     protected boolean doesTextExistToPublish(IDataLoaderContext ctx) {
115         Map<String, Object> cache = ctx.getContextCache();
116         StringBuilder msgCache = (StringBuilder) cache.get(MSG_CACHE);
117         return msgCache != null && msgCache.length() > 0;
118     }
119 
120     private void finalizeAndPublish(IDataLoaderContext ctx) {
121         StringBuilder msg = getFromCache(ctx);
122         if (msg.length() > 0) {
123             msg.append(addTextFooter(ctx));
124             if (logger.isDebugEnabled()) {
125                 logger.debug("publishing text message -> " + msg);
126             }
127             ctx.getContextCache().remove(MSG_CACHE);
128             publisher.publish(ctx, msg.toString());
129         }
130     }
131 
132     public void batchComplete(IDataLoader loader, IncomingBatchHistory hist) {
133         IDataLoaderContext ctx = loader.getContext();
134         if (doesTextExistToPublish(ctx)) {
135             finalizeAndPublish(ctx);
136             logCount();
137         }
138     }
139 
140     protected void logCount() {
141         messagesSinceLastLogOutput++;
142         long timeInMsSinceLastLogOutput = System.currentTimeMillis() - lastTimeInMsOutputLogged;
143         if (timeInMsSinceLastLogOutput > minTimeInMsBetweenLogOutput) {
144             if (logger.isInfoEnabled()) {
145                 logger.info(beanName + " published " + messagesSinceLastLogOutput + " messages in the last "
146                         + timeInMsSinceLastLogOutput + "ms");
147             }
148             lastTimeInMsOutputLogged = System.currentTimeMillis();
149             messagesSinceLastLogOutput = 0;
150         }
151     }
152 
153     public void setLoadDataInTargetDatabase(boolean loadDataInTargetDatabase) {
154         this.loadDataInTargetDatabase = loadDataInTargetDatabase;
155     }
156 
157     public void setPublisher(IPublisher publisher) {
158         this.publisher = publisher;
159     }
160 
161     public void setAutoRegister(boolean autoRegister) {
162         this.autoRegister = autoRegister;
163     }
164 
165     public boolean isAutoRegister() {
166         return autoRegister;
167     }
168 
169     public String[] getNodeGroupIdsToApplyTo() {
170         return nodeGroupIdsToApplyTo;
171     }
172 
173     public void setNodeGroupIdToApplyTo(String nodeGroupdId) {
174         this.nodeGroupIdsToApplyTo = new String[] { nodeGroupdId };
175     }
176 
177     public void setMessagesSinceLastLogOutput(int messagesSinceLastLogOutput) {
178         this.messagesSinceLastLogOutput = messagesSinceLastLogOutput;
179     }
180 
181     public void setMinTimeInMsBetweenLogOutput(long timeInMsBetweenLogOutput) {
182         this.minTimeInMsBetweenLogOutput = timeInMsBetweenLogOutput;
183     }
184 
185     public void setTableName(String tableName) {
186         this.tableName = tableName;
187     }
188 
189 }