1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.job;
22
23 import java.util.Timer;
24 import java.util.TimerTask;
25
26 import javax.sql.DataSource;
27
28 import org.apache.commons.dbcp.BasicDataSource;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.jumpmind.symmetric.SymmetricEngine;
32 import org.jumpmind.symmetric.common.Constants;
33 import org.jumpmind.symmetric.common.ParameterConstants;
34 import org.jumpmind.symmetric.service.IParameterService;
35 import org.jumpmind.symmetric.service.IRegistrationService;
36 import org.springframework.beans.factory.BeanFactory;
37 import org.springframework.beans.factory.BeanFactoryAware;
38 import org.springframework.beans.factory.BeanNameAware;
39
40 abstract public class AbstractJob extends TimerTask implements BeanFactoryAware, BeanNameAware {
41 DataSource dataSource;
42
43 protected static final Log logger = LogFactory.getLog(AbstractJob.class);
44
45 private boolean needsRescheduled;
46
47 private String rescheduleDelayParameter;
48
49 private BeanFactory beanFactory;
50
51 protected IParameterService parameterService;
52
53 private String beanName;
54
55 private boolean requiresRegistration = true;
56
57 private SymmetricEngine engine;
58
59 @Override
60 public boolean cancel() {
61 logger.info("This job, " + beanName + ", has been cancelled.");
62 return super.cancel();
63 }
64
65 @Override
66 public void run() {
67 try {
68 if (engine == null) {
69 engine = SymmetricEngine.findEngineByName(parameterService.getString(ParameterConstants.ENGINE_NAME));
70 }
71
72 if (engine == null) {
73 logger.info("Could not find a reference to the SymmetricEngine from " + beanName);
74 } else if (engine.isStarted()) {
75 IRegistrationService service = (IRegistrationService) beanFactory
76 .getBean(Constants.REGISTRATION_SERVICE);
77 if (!requiresRegistration || (requiresRegistration && service.isRegisteredWithServer())) {
78 doJob();
79 } else {
80 getLogger().warn("Did not run job because the engine is not registered.");
81 }
82 } else {
83 logger.info("The engine is not currently started.");
84 }
85 } catch (final Throwable ex) {
86 getLogger().error(ex, ex);
87 } finally {
88 if (isNeedsRescheduled() && engine != null && (engine.isStarted() || engine.isStarting())) {
89 reschedule();
90 }
91 }
92 }
93
94 abstract void doJob() throws Exception;
95
96 abstract Log getLogger();
97
98 protected void reschedule() {
99 final Timer timer = new Timer();
100 timer.schedule((TimerTask) beanFactory.getBean(beanName), parameterService.getLong(rescheduleDelayParameter));
101 if (getLogger().isDebugEnabled()) {
102 getLogger().debug(
103 "Rescheduling " + beanName + " with " + parameterService.getLong(rescheduleDelayParameter)
104 + " ms delay.");
105 }
106 }
107
108 protected void printDatabaseStats() {
109 if (getLogger().isDebugEnabled() && dataSource instanceof BasicDataSource) {
110 final BasicDataSource ds = (BasicDataSource) dataSource;
111 getLogger().debug("There are currently " + ds.getNumActive() + " active database connections.");
112 }
113 }
114
115 public void setBeanFactory(final BeanFactory beanFactory) {
116 this.beanFactory = beanFactory;
117 }
118
119 public void setBeanName(final String beanName) {
120 this.beanName = beanName;
121 }
122
123 public void setDataSource(final DataSource dataSource) {
124 this.dataSource = dataSource;
125 }
126
127 public boolean isNeedsRescheduled() {
128 return needsRescheduled;
129 }
130
131 public void setNeedsRescheduled(boolean needsRescheduled) {
132 this.needsRescheduled = needsRescheduled;
133 }
134
135 public void setRescheduleDelayParameter(String rescheduleDelay) {
136 this.rescheduleDelayParameter = rescheduleDelay;
137 }
138
139 public void setParameterService(IParameterService parameterService) {
140 this.parameterService = parameterService;
141 }
142
143 public void setRequiresRegistration(boolean requiresRegistration) {
144 this.requiresRegistration = requiresRegistration;
145 }
146
147 }