View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.jumpmind.symmetric;
22  
23  import java.sql.SQLException;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Map;
27  import java.util.Properties;
28  import java.util.Set;
29  import java.util.Timer;
30  
31  import javax.sql.DataSource;
32  
33  import org.apache.commons.dbcp.BasicDataSource;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.jumpmind.symmetric.common.Constants;
37  import org.jumpmind.symmetric.common.ParameterConstants;
38  import org.jumpmind.symmetric.db.IDbDialect;
39  import org.jumpmind.symmetric.job.PullJob;
40  import org.jumpmind.symmetric.job.PurgeJob;
41  import org.jumpmind.symmetric.job.PushJob;
42  import org.jumpmind.symmetric.model.Node;
43  import org.jumpmind.symmetric.service.IBootstrapService;
44  import org.jumpmind.symmetric.service.IDataService;
45  import org.jumpmind.symmetric.service.INodeService;
46  import org.jumpmind.symmetric.service.IParameterService;
47  import org.jumpmind.symmetric.service.IPullService;
48  import org.jumpmind.symmetric.service.IPurgeService;
49  import org.jumpmind.symmetric.service.IPushService;
50  import org.jumpmind.symmetric.service.IRegistrationService;
51  import org.jumpmind.symmetric.util.AppUtils;
52  import org.springframework.context.ApplicationContext;
53  import org.springframework.context.support.ClassPathXmlApplicationContext;
54  
55  /***
56   * This is the preferred way to create, configure, start and manage an instance
57   * of Symmetric. The engine will bootstrap the symmetric.xml Spring context.
58   * <p/> The Symmetric instance is configured by properties configuration files.
59   * By default the engine will look for and override existing properties with
60   * ones found in the properties files. Symmetric looks for: symmetric.properties
61   * in the classpath (it will use the first one it finds), and then for a
62   * symmetric.properties found in the user.home system property location. Next,
63   * if provided, in the constructor of the SymmetricEngine, it will locate and
64   * use the properties file passed to the engine. <p/> When the engine is ready
65   * to be started, the {@link #start()} method should be called. It should only
66   * be called once.
67   */
68  public class SymmetricEngine {
69  
70      protected static final Log logger = LogFactory.getLog(SymmetricEngine.class);
71  
72      private ApplicationContext applicationContext;
73  
74      private IBootstrapService bootstrapService;
75  
76      private IParameterService parameterService;
77  
78      private INodeService nodeService;
79  
80      private IRegistrationService registrationService;
81  
82      private IPurgeService purgeService;
83  
84      private IDataService dataService;
85  
86      private boolean started = false;
87  
88      private boolean starting = false;
89  
90      private boolean setup = false;
91  
92      private IDbDialect dbDialect;
93  
94      private Set<Timer> jobs;
95  
96      private static Map<String, SymmetricEngine> registeredEnginesByUrl = new HashMap<String, SymmetricEngine>();
97  
98      private static Map<String, SymmetricEngine> registeredEnginesByName = new HashMap<String, SymmetricEngine>();
99  
100     public SymmetricEngine(String... overridePropertiesResources) {
101         String one = null;
102         String two = null;
103         if (overridePropertiesResources.length > 0) {
104             one = overridePropertiesResources[0];
105             if (overridePropertiesResources.length > 1) {
106                 two = overridePropertiesResources[1];
107             }
108         }
109         init(one, two);
110     }
111 
112     /***
113      * Create a symmetric node
114      */
115     public SymmetricEngine() {
116         init(createContext());
117     }
118 
119     /***
120      * Pass in the Spring context to be used. This had better include the Spring
121      * configuration for required Symmetric services.
122      * 
123      * @param ctx
124      *                A Spring framework context
125      */
126     protected SymmetricEngine(ApplicationContext ctx) {
127         init(ctx);
128     }
129 
130     public void stop() {
131         logger.info("Closing SymmetricDS externalId=" + parameterService.getExternalId() + " version="
132                 + Version.version() + " database=" + dbDialect.getName());
133         stopJobs();
134         removeMeFromMap(registeredEnginesByName);
135         removeMeFromMap(registeredEnginesByUrl);
136         DataSource ds = dbDialect.getJdbcTemplate().getDataSource();
137         if (ds instanceof BasicDataSource) {
138             try {
139                 ((BasicDataSource) ds).close();
140             } catch (SQLException ex) {
141                 logger.error(ex, ex);
142             }
143         }
144         applicationContext = null;
145         bootstrapService = null;
146         parameterService = null;
147         nodeService = null;
148         registrationService = null;
149         purgeService = null;
150         dataService = null;
151         dbDialect = null;
152         started = false;
153         starting = false;
154     }
155 
156     private void removeMeFromMap(Map<String, SymmetricEngine> map) {
157         Set<String> keys = new HashSet<String>(map.keySet());
158         for (String key : keys) {
159             if (map.get(key).equals(this)) {
160                 map.remove(key);
161             }
162         }
163     }
164 
165     private ApplicationContext createContext() {
166         return new ClassPathXmlApplicationContext("classpath:/symmetric.xml");
167     }
168 
169     /***
170      * @param overridePropertiesResource1
171      *                Provide a Spring resource path to a properties file to be
172      *                used for configuration
173      * @param overridePropertiesResource2
174      *                Provide a Spring resource path to a properties file to be
175      *                used for configuration
176      */
177     private void init(String overridePropertiesResource1, String overridePropertiesResource2) {
178         // Setting system properties is probably not the best way to accomplish
179         // this setup.
180         // Synchronizing on the class so creating multiple engines is thread
181         // safe.
182         synchronized (SymmetricEngine.class) {
183             System.setProperty(Constants.OVERRIDE_PROPERTIES_FILE_1, overridePropertiesResource1 == null ? ""
184                     : overridePropertiesResource1);
185             System.setProperty(Constants.OVERRIDE_PROPERTIES_FILE_2, overridePropertiesResource2 == null ? ""
186                     : overridePropertiesResource2);
187             this.init(createContext());
188         }
189     }
190 
191     private void init(ApplicationContext applicationContext) {
192         this.applicationContext = applicationContext;
193         bootstrapService = (IBootstrapService) applicationContext.getBean(Constants.BOOTSTRAP_SERVICE);
194         parameterService = (IParameterService) applicationContext.getBean(Constants.PARAMETER_SERVICE);
195         nodeService = (INodeService) applicationContext.getBean(Constants.NODE_SERVICE);
196         registrationService = (IRegistrationService) applicationContext.getBean(Constants.REGISTRATION_SERVICE);
197         purgeService = (IPurgeService) applicationContext.getBean(Constants.PURGE_SERVICE);
198         dataService = (IDataService) applicationContext.getBean(Constants.DATA_SERVICE);
199         dbDialect = (IDbDialect) applicationContext.getBean(Constants.DB_DIALECT);
200     }
201 
202     /***
203      * Register this instance of the engine so it can be found by other
204      * processes in the JVM.
205      * 
206      * @see #findEngineByUrl(String)
207      */
208     private void registerEngine() {
209         registeredEnginesByUrl.put(parameterService.getMyUrl(), this);
210         registeredEnginesByName.put(getEngineName(), this);
211     }
212 
213     /***
214      * This is done dynamically because some application servers do not allow
215      * the default MBeanServer to be accessed for security reasons (OC4J).
216      */
217     private void startDefaultServerJMXExport() {
218         if (parameterService.is(ParameterConstants.JMX_LEGACY_BEANS_ENABLED)) {
219             try {
220                 getApplicationContext().getBean(Constants.DEFAULT_JMX_SERVER_EXPORTER);
221             } catch (Exception ex) {
222                 logger.warn("Unable to register JMX beans with the default MBeanServer. " + ex.getMessage());
223             }
224         }
225     }
226 
227     private void startJob(String name) {
228         if (jobs == null) {
229             jobs = new HashSet<Timer>();
230         }
231         logger.info("Starting " + name);
232         Timer timer = AppUtils.find(name, this);
233         jobs.add(timer);
234     }
235 
236     /***
237      * Start the jobs if they are configured to be started in
238      * symmetric.properties
239      */
240     private void startJobs() {
241         if (Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PUSH_JOB))) {
242             startJob(Constants.PUSH_JOB_TIMER);
243         }
244 
245         if (Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PULL_JOB))) {
246             startJob(Constants.PULL_JOB_TIMER);
247         }
248 
249         if (Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PURGE_JOB))) {
250             startJob(Constants.PURGE_JOB_TIMER);
251         }
252 
253         if (Boolean.TRUE.toString()
254                 .equalsIgnoreCase(parameterService.getString(ParameterConstants.START_HEARTBEAT_JOB))) {
255             startJob(Constants.HEARTBEAT_JOB_TIMER);
256         }
257 
258         if (Boolean.TRUE.toString().equalsIgnoreCase(
259                 parameterService.getString(ParameterConstants.START_SYNCTRIGGERS_JOB))) {
260             startJob(Constants.SYNC_TRIGGERS_JOB_TIMER);
261         }
262 
263         if (Boolean.TRUE.toString().equalsIgnoreCase(
264                 parameterService.getString(ParameterConstants.START_STATISTIC_FLUSH_JOB))) {
265             startJob(Constants.STATISTIC_FLUSH_JOB_TIMER);
266         }
267 
268     }
269 
270     private void stopJobs() {
271         if (jobs != null) {
272             for (Timer job : jobs) {
273                 try {
274                     job.cancel();
275                 } catch (RuntimeException e) {
276                     logger.error(e, e);
277                 }
278             }
279         }
280     }
281 
282     /***
283      * Get a list of configured properties for Symmetric. Read-only.
284      */
285     public Properties getProperties() {
286         Properties p = new Properties();
287         p.putAll(parameterService.getAllParameters());
288         return p;
289     }
290 
291     public String getEngineName() {
292         return dbDialect.getEngineName();
293     }
294 
295     /***
296      * Will setup the symmetric tables, if not already setup and if the engine
297      * is configured to do so.
298      */
299     public synchronized void setup() {
300         if (!setup) {
301             bootstrapService.setupDatabase();
302             setup = true;
303         }
304     }
305 
306     /***
307      * Must be called to start symmetric.
308      */
309     public synchronized void start() {
310         if (!starting && !started) {
311             starting = true;
312             setup();
313             registerEngine();
314             startDefaultServerJMXExport();
315             Node node = nodeService.findIdentity();
316             if (node != null) {
317                 logger.info("Starting registered node [group=" + node.getNodeGroupId() + ", id=" + node.getNodeId()
318                         + ", externalId=" + node.getExternalId() + "]");
319             } else {
320                 logger.info("Starting unregistered node [group=" + parameterService.getNodeGroupId() + ", externalId="
321                         + parameterService.getExternalId() + "]");
322             }
323             bootstrapService.validateConfiguration();
324             bootstrapService.syncTriggers();
325             startJobs();
326             started = true;
327             logger.info("Started SymmetricDS externalId=" + parameterService.getExternalId() + " version="
328                     + Version.version() + " database=" + dbDialect.getName());
329             starting = false;
330 
331         }
332     }
333 
334     /***
335      * Queue up an initial load or a reload to a node.
336      */
337     public String reloadNode(String nodeId) {
338         return dataService.reloadNode(nodeId);
339     }
340 
341     public String sendSQL(String nodeId, String tableName, String sql) {
342         return dataService.sendSQL(nodeId, tableName, sql);
343     }
344 
345     /***
346      * This can be called if the push job has not been enabled. It will perform
347      * a push the same way the {@link PushJob} would have.
348      * 
349      * @see IPushService#pushData()
350      */
351     public void push() {
352         if (!Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PUSH_JOB))) {
353             ((IPushService) applicationContext.getBean(Constants.PUSH_SERVICE)).pushData();
354         } else {
355             throw new UnsupportedOperationException("Cannot actuate a push if it is already scheduled.");
356         }
357     }
358 
359     /***
360      * Call this to resync triggers
361      * 
362      * @see IBootstrapService#syncTriggers()
363      */
364     public void syncTriggers() {
365         bootstrapService.syncTriggers();
366     }
367 
368     /***
369      * This can be called if the pull job has not been enabled. It will perform
370      * a pull the same way the {@link PullJob} would have.
371      * 
372      * @see IPullService#pullData()
373      */
374     public void pull() {
375         if (!Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PULL_JOB))) {
376             ((IPullService) applicationContext.getBean(Constants.PULL_SERVICE)).pullData();
377         } else {
378             throw new UnsupportedOperationException("Cannot actuate a push if it is already scheduled.");
379         }
380     }
381 
382     /***
383      * This can be called to do a purge. It may be called only if the
384      * {@link PurgeJob} has not been enabled.
385      * 
386      * @see IPurgeService#purge()
387      */
388     public void purge() {
389         if (!Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PURGE_JOB))) {
390             purgeService.purge();
391         } else {
392             throw new UnsupportedOperationException("Cannot actuate a purge if it is already scheduled.");
393         }
394     }
395 
396     /***
397      * Push a copy of the node onto the push queue so the symmetric node
398      * 'checks' in with it's root node.
399      * 
400      * @see IBootstrapService#heartbeat()
401      */
402     public void heartbeat() {
403         bootstrapService.heartbeat();
404     }
405 
406     /***
407      * Open up registration for node to attach.
408      * 
409      * @see IRegistrationService#openRegistration(String, String)
410      */
411     public void openRegistration(String groupId, String externalId) {
412         registrationService.openRegistration(groupId, externalId);
413     }
414 
415     public void reOpenRegistration(String nodeId) {
416         registrationService.reOpenRegistration(nodeId);
417     }
418     
419     /***
420      * Check to see if this node has been registered.
421      * 
422      * @return true if the node is registered
423      */
424     public boolean isRegistered() {
425         return nodeService.findIdentity() != null;
426     }
427 
428     /***
429      * Check to see if this node has been started.
430      * 
431      * @return true if the node is started
432      */
433     public boolean isStarted() {
434         return started;
435     }
436 
437     /***
438      * Check to see if this node is starting.
439      * 
440      * @return true if the node is starting
441      */
442 
443     public boolean isStarting() {
444         return starting;
445     }
446 
447     /***
448      * Expose access to the Spring context. This is for advanced use only.
449      * 
450      * @return
451      */
452     public ApplicationContext getApplicationContext() {
453         return applicationContext;
454     }
455 
456     /***
457      * Locate a {@link SymmetricEngine} in the same JVM
458      */
459     public static SymmetricEngine findEngineByUrl(String url) {
460         return registeredEnginesByUrl.get(url);
461     }
462 
463     public static SymmetricEngine findEngineByName(String name) {
464         return registeredEnginesByName.get(name);
465     }
466 
467 }