1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric;
22
23 import java.sql.SQLException;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Properties;
28 import java.util.Set;
29 import java.util.Timer;
30
31 import javax.sql.DataSource;
32
33 import org.apache.commons.dbcp.BasicDataSource;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.jumpmind.symmetric.common.Constants;
37 import org.jumpmind.symmetric.common.ParameterConstants;
38 import org.jumpmind.symmetric.db.IDbDialect;
39 import org.jumpmind.symmetric.job.PullJob;
40 import org.jumpmind.symmetric.job.PurgeJob;
41 import org.jumpmind.symmetric.job.PushJob;
42 import org.jumpmind.symmetric.model.Node;
43 import org.jumpmind.symmetric.service.IBootstrapService;
44 import org.jumpmind.symmetric.service.IDataService;
45 import org.jumpmind.symmetric.service.INodeService;
46 import org.jumpmind.symmetric.service.IParameterService;
47 import org.jumpmind.symmetric.service.IPullService;
48 import org.jumpmind.symmetric.service.IPurgeService;
49 import org.jumpmind.symmetric.service.IPushService;
50 import org.jumpmind.symmetric.service.IRegistrationService;
51 import org.jumpmind.symmetric.util.AppUtils;
52 import org.springframework.context.ApplicationContext;
53 import org.springframework.context.support.ClassPathXmlApplicationContext;
54
55 /***
56 * This is the preferred way to create, configure, start and manage an instance
57 * of Symmetric. The engine will bootstrap the symmetric.xml Spring context.
58 * <p/> The Symmetric instance is configured by properties configuration files.
59 * By default the engine will look for and override existing properties with
60 * ones found in the properties files. Symmetric looks for: symmetric.properties
61 * in the classpath (it will use the first one it finds), and then for a
62 * symmetric.properties found in the user.home system property location. Next,
63 * if provided, in the constructor of the SymmetricEngine, it will locate and
64 * use the properties file passed to the engine. <p/> When the engine is ready
65 * to be started, the {@link #start()} method should be called. It should only
66 * be called once.
67 */
68 public class SymmetricEngine {
69
70 protected static final Log logger = LogFactory.getLog(SymmetricEngine.class);
71
72 private ApplicationContext applicationContext;
73
74 private IBootstrapService bootstrapService;
75
76 private IParameterService parameterService;
77
78 private INodeService nodeService;
79
80 private IRegistrationService registrationService;
81
82 private IPurgeService purgeService;
83
84 private IDataService dataService;
85
86 private boolean started = false;
87
88 private boolean starting = false;
89
90 private boolean setup = false;
91
92 private IDbDialect dbDialect;
93
94 private Set<Timer> jobs;
95
96 private static Map<String, SymmetricEngine> registeredEnginesByUrl = new HashMap<String, SymmetricEngine>();
97
98 private static Map<String, SymmetricEngine> registeredEnginesByName = new HashMap<String, SymmetricEngine>();
99
100 public SymmetricEngine(String... overridePropertiesResources) {
101 String one = null;
102 String two = null;
103 if (overridePropertiesResources.length > 0) {
104 one = overridePropertiesResources[0];
105 if (overridePropertiesResources.length > 1) {
106 two = overridePropertiesResources[1];
107 }
108 }
109 init(one, two);
110 }
111
112 /***
113 * Create a symmetric node
114 */
115 public SymmetricEngine() {
116 init(createContext());
117 }
118
119 /***
120 * Pass in the Spring context to be used. This had better include the Spring
121 * configuration for required Symmetric services.
122 *
123 * @param ctx
124 * A Spring framework context
125 */
126 protected SymmetricEngine(ApplicationContext ctx) {
127 init(ctx);
128 }
129
130 public void stop() {
131 logger.info("Closing SymmetricDS externalId=" + parameterService.getExternalId() + " version="
132 + Version.version() + " database=" + dbDialect.getName());
133 stopJobs();
134 removeMeFromMap(registeredEnginesByName);
135 removeMeFromMap(registeredEnginesByUrl);
136 DataSource ds = dbDialect.getJdbcTemplate().getDataSource();
137 if (ds instanceof BasicDataSource) {
138 try {
139 ((BasicDataSource) ds).close();
140 } catch (SQLException ex) {
141 logger.error(ex, ex);
142 }
143 }
144 applicationContext = null;
145 bootstrapService = null;
146 parameterService = null;
147 nodeService = null;
148 registrationService = null;
149 purgeService = null;
150 dataService = null;
151 dbDialect = null;
152 started = false;
153 starting = false;
154 }
155
156 private void removeMeFromMap(Map<String, SymmetricEngine> map) {
157 Set<String> keys = new HashSet<String>(map.keySet());
158 for (String key : keys) {
159 if (map.get(key).equals(this)) {
160 map.remove(key);
161 }
162 }
163 }
164
165 private ApplicationContext createContext() {
166 return new ClassPathXmlApplicationContext("classpath:/symmetric.xml");
167 }
168
169 /***
170 * @param overridePropertiesResource1
171 * Provide a Spring resource path to a properties file to be
172 * used for configuration
173 * @param overridePropertiesResource2
174 * Provide a Spring resource path to a properties file to be
175 * used for configuration
176 */
177 private void init(String overridePropertiesResource1, String overridePropertiesResource2) {
178
179
180
181
182 synchronized (SymmetricEngine.class) {
183 System.setProperty(Constants.OVERRIDE_PROPERTIES_FILE_1, overridePropertiesResource1 == null ? ""
184 : overridePropertiesResource1);
185 System.setProperty(Constants.OVERRIDE_PROPERTIES_FILE_2, overridePropertiesResource2 == null ? ""
186 : overridePropertiesResource2);
187 this.init(createContext());
188 }
189 }
190
191 private void init(ApplicationContext applicationContext) {
192 this.applicationContext = applicationContext;
193 bootstrapService = (IBootstrapService) applicationContext.getBean(Constants.BOOTSTRAP_SERVICE);
194 parameterService = (IParameterService) applicationContext.getBean(Constants.PARAMETER_SERVICE);
195 nodeService = (INodeService) applicationContext.getBean(Constants.NODE_SERVICE);
196 registrationService = (IRegistrationService) applicationContext.getBean(Constants.REGISTRATION_SERVICE);
197 purgeService = (IPurgeService) applicationContext.getBean(Constants.PURGE_SERVICE);
198 dataService = (IDataService) applicationContext.getBean(Constants.DATA_SERVICE);
199 dbDialect = (IDbDialect) applicationContext.getBean(Constants.DB_DIALECT);
200 }
201
202 /***
203 * Register this instance of the engine so it can be found by other
204 * processes in the JVM.
205 *
206 * @see #findEngineByUrl(String)
207 */
208 private void registerEngine() {
209 registeredEnginesByUrl.put(parameterService.getMyUrl(), this);
210 registeredEnginesByName.put(getEngineName(), this);
211 }
212
213 /***
214 * This is done dynamically because some application servers do not allow
215 * the default MBeanServer to be accessed for security reasons (OC4J).
216 */
217 private void startDefaultServerJMXExport() {
218 if (parameterService.is(ParameterConstants.JMX_LEGACY_BEANS_ENABLED)) {
219 try {
220 getApplicationContext().getBean(Constants.DEFAULT_JMX_SERVER_EXPORTER);
221 } catch (Exception ex) {
222 logger.warn("Unable to register JMX beans with the default MBeanServer. " + ex.getMessage());
223 }
224 }
225 }
226
227 private void startJob(String name) {
228 if (jobs == null) {
229 jobs = new HashSet<Timer>();
230 }
231 logger.info("Starting " + name);
232 Timer timer = AppUtils.find(name, this);
233 jobs.add(timer);
234 }
235
236 /***
237 * Start the jobs if they are configured to be started in
238 * symmetric.properties
239 */
240 private void startJobs() {
241 if (Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PUSH_JOB))) {
242 startJob(Constants.PUSH_JOB_TIMER);
243 }
244
245 if (Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PULL_JOB))) {
246 startJob(Constants.PULL_JOB_TIMER);
247 }
248
249 if (Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PURGE_JOB))) {
250 startJob(Constants.PURGE_JOB_TIMER);
251 }
252
253 if (Boolean.TRUE.toString()
254 .equalsIgnoreCase(parameterService.getString(ParameterConstants.START_HEARTBEAT_JOB))) {
255 startJob(Constants.HEARTBEAT_JOB_TIMER);
256 }
257
258 if (Boolean.TRUE.toString().equalsIgnoreCase(
259 parameterService.getString(ParameterConstants.START_SYNCTRIGGERS_JOB))) {
260 startJob(Constants.SYNC_TRIGGERS_JOB_TIMER);
261 }
262
263 if (Boolean.TRUE.toString().equalsIgnoreCase(
264 parameterService.getString(ParameterConstants.START_STATISTIC_FLUSH_JOB))) {
265 startJob(Constants.STATISTIC_FLUSH_JOB_TIMER);
266 }
267
268 }
269
270 private void stopJobs() {
271 if (jobs != null) {
272 for (Timer job : jobs) {
273 try {
274 job.cancel();
275 } catch (RuntimeException e) {
276 logger.error(e, e);
277 }
278 }
279 }
280 }
281
282 /***
283 * Get a list of configured properties for Symmetric. Read-only.
284 */
285 public Properties getProperties() {
286 Properties p = new Properties();
287 p.putAll(parameterService.getAllParameters());
288 return p;
289 }
290
291 public String getEngineName() {
292 return dbDialect.getEngineName();
293 }
294
295 /***
296 * Will setup the symmetric tables, if not already setup and if the engine
297 * is configured to do so.
298 */
299 public synchronized void setup() {
300 if (!setup) {
301 bootstrapService.setupDatabase();
302 setup = true;
303 }
304 }
305
306 /***
307 * Must be called to start symmetric.
308 */
309 public synchronized void start() {
310 if (!starting && !started) {
311 starting = true;
312 setup();
313 registerEngine();
314 startDefaultServerJMXExport();
315 Node node = nodeService.findIdentity();
316 if (node != null) {
317 logger.info("Starting registered node [group=" + node.getNodeGroupId() + ", id=" + node.getNodeId()
318 + ", externalId=" + node.getExternalId() + "]");
319 } else {
320 logger.info("Starting unregistered node [group=" + parameterService.getNodeGroupId() + ", externalId="
321 + parameterService.getExternalId() + "]");
322 }
323 bootstrapService.validateConfiguration();
324 bootstrapService.syncTriggers();
325 startJobs();
326 started = true;
327 logger.info("Started SymmetricDS externalId=" + parameterService.getExternalId() + " version="
328 + Version.version() + " database=" + dbDialect.getName());
329 starting = false;
330
331 }
332 }
333
334 /***
335 * Queue up an initial load or a reload to a node.
336 */
337 public String reloadNode(String nodeId) {
338 return dataService.reloadNode(nodeId);
339 }
340
341 public String sendSQL(String nodeId, String tableName, String sql) {
342 return dataService.sendSQL(nodeId, tableName, sql);
343 }
344
345 /***
346 * This can be called if the push job has not been enabled. It will perform
347 * a push the same way the {@link PushJob} would have.
348 *
349 * @see IPushService#pushData()
350 */
351 public void push() {
352 if (!Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PUSH_JOB))) {
353 ((IPushService) applicationContext.getBean(Constants.PUSH_SERVICE)).pushData();
354 } else {
355 throw new UnsupportedOperationException("Cannot actuate a push if it is already scheduled.");
356 }
357 }
358
359 /***
360 * Call this to resync triggers
361 *
362 * @see IBootstrapService#syncTriggers()
363 */
364 public void syncTriggers() {
365 bootstrapService.syncTriggers();
366 }
367
368 /***
369 * This can be called if the pull job has not been enabled. It will perform
370 * a pull the same way the {@link PullJob} would have.
371 *
372 * @see IPullService#pullData()
373 */
374 public void pull() {
375 if (!Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PULL_JOB))) {
376 ((IPullService) applicationContext.getBean(Constants.PULL_SERVICE)).pullData();
377 } else {
378 throw new UnsupportedOperationException("Cannot actuate a push if it is already scheduled.");
379 }
380 }
381
382 /***
383 * This can be called to do a purge. It may be called only if the
384 * {@link PurgeJob} has not been enabled.
385 *
386 * @see IPurgeService#purge()
387 */
388 public void purge() {
389 if (!Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PURGE_JOB))) {
390 purgeService.purge();
391 } else {
392 throw new UnsupportedOperationException("Cannot actuate a purge if it is already scheduled.");
393 }
394 }
395
396 /***
397 * Push a copy of the node onto the push queue so the symmetric node
398 * 'checks' in with it's root node.
399 *
400 * @see IBootstrapService#heartbeat()
401 */
402 public void heartbeat() {
403 bootstrapService.heartbeat();
404 }
405
406 /***
407 * Open up registration for node to attach.
408 *
409 * @see IRegistrationService#openRegistration(String, String)
410 */
411 public void openRegistration(String groupId, String externalId) {
412 registrationService.openRegistration(groupId, externalId);
413 }
414
415 public void reOpenRegistration(String nodeId) {
416 registrationService.reOpenRegistration(nodeId);
417 }
418
419 /***
420 * Check to see if this node has been registered.
421 *
422 * @return true if the node is registered
423 */
424 public boolean isRegistered() {
425 return nodeService.findIdentity() != null;
426 }
427
428 /***
429 * Check to see if this node has been started.
430 *
431 * @return true if the node is started
432 */
433 public boolean isStarted() {
434 return started;
435 }
436
437 /***
438 * Check to see if this node is starting.
439 *
440 * @return true if the node is starting
441 */
442
443 public boolean isStarting() {
444 return starting;
445 }
446
447 /***
448 * Expose access to the Spring context. This is for advanced use only.
449 *
450 * @return
451 */
452 public ApplicationContext getApplicationContext() {
453 return applicationContext;
454 }
455
456 /***
457 * Locate a {@link SymmetricEngine} in the same JVM
458 */
459 public static SymmetricEngine findEngineByUrl(String url) {
460 return registeredEnginesByUrl.get(url);
461 }
462
463 public static SymmetricEngine findEngineByName(String name) {
464 return registeredEnginesByName.get(name);
465 }
466
467 }