View Issue Details
ID | Project | Category | View Status | Date Submitted | Last Update |
---|---|---|---|---|---|
0001485 | SymmetricDS | New Feature | public | 2013-12-11 04:02 | 2014-06-17 00:27 |
Reporter | nswendal | Assigned To | chenson | ||
Priority | normal | ||||
Status | closed | Resolution | fixed | ||
Product Version | 3.5.11 | ||||
Target Version | 3.6.0 | Fixed in Version | 3.6.0 | ||
Summary | 0001485: Create a delete_after_sync function for file sync | ||||
Description | This feature will delete the file after it successful ack. I created a new flag in the file_trigger table. | ||||
Additional Information | The attached "delete_after_sync.patch" also contains the patch code for issue 0001441 (sync_on_ctl_file). This change must go after 1441 as I coded to delete the control file if needed. | ||||
Tags | No tags attached. | ||||
|
delete_after_sync.patch (32,251 bytes)
### Eclipse Workspace Patch 1.0 #P symmetric-core Index: src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java =================================================================== --- src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java (working copy) @@ -115,12 +115,27 @@ } public void onFileCreate(File file) { - if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCreate()) { - log.debug("File create detected: {}", file.getAbsolutePath()); - this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file, - LastEventType.CREATE)); + if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()){ + onCtlFile(file); + } else { + if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCreate()) { + log.debug("File create detected: {}", file.getAbsolutePath()); + this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file, + LastEventType.CREATE)); + } } } + + public void onCtlFile(File file) { + if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()){ + File ctlFile = new File(file.getAbsolutePath() + ".ctl"); + if (ctlFile.exists()) { + log.debug("Control file detected: {}", file.getAbsolutePath()); + this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file, + LastEventType.CREATE)); + } + } + } public void onFileChange(File file) { if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnModified()) { Index: src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java =================================================================== --- src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java (working copy) @@ -298,7 +298,7 @@ nodeService, dataExtractorService, dataService, dataLoaderService, transportManager, statisticManager, configurationService); this.acknowledgeService = new AcknowledgeService(parameterService, symmetricDialect, - outgoingBatchService, registrationService, stagingManager); + outgoingBatchService, registrationService, stagingManager, this); this.pushService = new PushService(parameterService, symmetricDialect, dataExtractorService, acknowledgeService, transportManager, nodeService, clusterService, nodeCommunicationService, statisticManager); Index: src/main/java/org/jumpmind/symmetric/service/IDataService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/IDataService.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/IDataService.java (working copy) @@ -147,6 +147,8 @@ public ISqlReadCursor<Data> selectDataFor(Batch batch); + public ISqlReadCursor<Data> selectDataFor(Long batchId, String channelId); + public List<IReloadListener> getReloadListeners(); } \ No newline at end of file Index: src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java (working copy) @@ -69,5 +69,7 @@ public RemoteNodeStatuses pushFilesToNodes(boolean force); public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node node, IOutgoingTransport outgoingTransport); + + public void acknowledgeFiles(OutgoingBatch outgoingBatch); } Index: src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java (working copy) @@ -32,12 +32,13 @@ // @formatter:off putSql("selectFileTriggersSql", - " select trigger_id, base_dir, recurse, " + + " select trigger_id, base_dir, recurse, " + " includes_files, excludes_files, " + " sync_on_create, sync_on_modified, sync_on_delete, " + + " sync_on_ctl_file, delete_after_sync, " + " before_copy_script, " + " after_copy_script, " + - " create_time, last_update_by, " + + " create_time, last_update_by, " + " last_update_time " + " from $(file_trigger) "); @@ -46,16 +47,18 @@ putSql("updateFileTriggerSql", " update $(file_trigger) set base_dir=?, recurse=?, includes_files=?, " + - " excludes_files=?, sync_on_create=?, sync_on_modified=?, sync_on_delete=?, " + - " before_copy_script=?, after_copy_script=?, " + - " last_update_by=?, last_update_time=? where trigger_id=? "); + " excludes_files=?, sync_on_create=?, sync_on_modified=?, sync_on_delete=?, " + + " sync_on_ctl_file=?, delete_after_sync=?, " + + " before_copy_script=?, after_copy_script=?, " + + " last_update_by=?, last_update_time=? where trigger_id=? "); putSql("insertFileTriggerSql", " insert into $(file_trigger) (base_dir, recurse, includes_files, " + - " excludes_files, sync_on_create, sync_on_modified, sync_on_delete, " + - " before_copy_script, after_copy_script, " + - " last_update_by, last_update_time, trigger_id, create_time) " + - " values(?,?,?,?,?,?,?,?,?,?,?,?,?) "); + " excludes_files, sync_on_create, sync_on_modified, sync_on_delete, " + + " sync_on_ctl_file, delete_after_sync, " + + " before_copy_script, after_copy_script, " + + " last_update_by, last_update_time, trigger_id, create_time) " + + " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "); putSql("selectFileSnapshotSql", " select trigger_id, router_id, relative_dir, file_name, last_event_type, crc32_checksum, " + Index: src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java (working copy) @@ -44,7 +44,14 @@ + " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join " + " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id " + " where o.batch_id = ? and o.node_id = ? "); - + + putSql("selectEventDataByBatchIdSql", + "" + + "select d.data_id, d.table_name, d.event_type, d.row_data, d.pk_data, d.old_data, " + + " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join " + + " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id " + + " where o.batch_id = ? "); + putSql("selectEventDataIdsSql", "" + "select d.data_id from $(data) d inner join " Index: src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java (working copy) @@ -393,7 +393,7 @@ trigger.setSyncOnIncomingBatch(false); boolean syncEnabled = parameterService.is(ParameterConstants.FILE_SYNC_ENABLE); trigger.setSyncOnInsert(syncEnabled); - trigger.setSyncOnUpdate(syncEnabled); + trigger.setSyncOnUpdate(false); // Changed to false because of issues with the traffic file trigger.setSyncOnDelete(false); } else { trigger.setChannelId(Constants.CHANNEL_CONFIG); Index: src/main/java/org/jumpmind/symmetric/service/impl/DataService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/DataService.java (revision 7784) +++ src/main/java/org/jumpmind/symmetric/service/impl/DataService.java (working copy) @@ -1359,7 +1359,21 @@ dataMapper, new Object[] { batch.getBatchId(), batch.getTargetNodeId() }, new int[] { Types.NUMERIC }); } - + + public ISqlReadCursor<Data> selectDataFor(Long batchId, String channelId) { + return sqlTemplate + .queryForCursor(getDataSelectByBatchSql(batchId, -1l, channelId), + dataMapper, new Object[] { batchId }, + new int[] { Types.NUMERIC }); + } + + protected String getDataSelectByBatchSql(long batchId, long startDataId, String channelId) { + String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : ""; + return symmetricDialect.massageDataExtractionSql( + getSql("selectEventDataByBatchIdSql", startAtDataIdSql, " order by d.data_id asc"), + engine.getConfigurationService().getNodeChannel(channelId, false).getChannel()); + } + protected String getDataSelectSql(long batchId, long startDataId, String channelId) { String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : ""; return symmetricDialect.massageDataExtractionSql( Index: src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java (working copy) @@ -24,6 +24,7 @@ import java.util.List; import org.jumpmind.db.sql.mapper.NumberMapper; +import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.stage.IStagedResource; @@ -51,14 +52,17 @@ private IRegistrationService registrationService; private IStagingManager stagingManger; + + private ISymmetricEngine engine; public AcknowledgeService(IParameterService parameterService, ISymmetricDialect symmetricDialect, IOutgoingBatchService outgoingBatchService, - IRegistrationService registrationService, IStagingManager stagingManager) { + IRegistrationService registrationService, IStagingManager stagingManager, ISymmetricEngine engine) { super(parameterService, symmetricDialect); this.outgoingBatchService = outgoingBatchService; this.registrationService = registrationService; this.stagingManger = stagingManager; + this.engine = engine; setSqlMap(new AcknowledgeServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); } @@ -128,6 +132,12 @@ //TODO: I should really be able to catch errors here, but can't do to how this is coded outgoingBatchService.updateOutgoingBatch(outgoingBatch); + if (status == Status.OK) { + if (outgoingBatch.getChannelId().equals(Constants.CHANNEL_FILESYNC)){ + //Acknowledge the file_sync in case the file needs deleted. + engine.getFileSyncService().acknowledgeFiles(outgoingBatch); + } + } } else { log.error("Could not find batch {}-{} to acknowledge as {}", new Object[] {batch.getNodeId(), batch.getBatchId(), status.name()}); Index: src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java (working copy) @@ -36,6 +36,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.io.filefilter.DirectoryFileFilter; import org.apache.commons.lang.StringUtils; +import org.jumpmind.db.sql.ISqlReadCursor; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; @@ -53,6 +54,7 @@ import org.jumpmind.symmetric.model.FileConflictStrategy; import org.jumpmind.symmetric.model.FileSnapshot; import org.jumpmind.symmetric.model.FileSnapshot.LastEventType; +import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.model.FileTrigger; import org.jumpmind.symmetric.model.FileTriggerRouter; import org.jumpmind.symmetric.model.IncomingBatch; @@ -88,7 +90,7 @@ private Object trackerLock = new Object(); private ISymmetricEngine engine; - + // TODO cache trigger routers public FileSyncService(ISymmetricEngine engine) { @@ -182,7 +184,10 @@ fileTrigger.getIncludesFiles(), fileTrigger.getExcludesFiles(), fileTrigger.isSyncOnCreate() ? 1 : 0, fileTrigger.isSyncOnModified() ? 1 : 0, - fileTrigger.isSyncOnDelete() ? 1 : 0, fileTrigger.getBeforeCopyScript(), + fileTrigger.isSyncOnDelete() ? 1 : 0, + fileTrigger.isSyncOnCtlFile() ? 1 : 0, + fileTrigger.isDeleteAfterSync() ? 1 : 0, + fileTrigger.getBeforeCopyScript(), fileTrigger.getAfterCopyScript(), fileTrigger.getLastUpdateBy(), fileTrigger.getLastUpdateTime(), fileTrigger.getTriggerId() }, new int[] { Types.VARCHAR, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, @@ -195,6 +200,8 @@ fileTrigger.isSyncOnCreate() ? 1 : 0, fileTrigger.isSyncOnModified() ? 1 : 0, fileTrigger.isSyncOnDelete() ? 1 : 0, + fileTrigger.isSyncOnCtlFile() ? 1 : 0, + fileTrigger.isDeleteAfterSync() ? 1 : 0, fileTrigger.getBeforeCopyScript(), fileTrigger.getAfterCopyScript(), fileTrigger.getLastUpdateBy(), fileTrigger.getLastUpdateTime(), fileTrigger.getTriggerId(), fileTrigger.getCreateTime() }, new int[] { @@ -437,6 +444,53 @@ } } } + + public void acknowledgeFiles(OutgoingBatch outgoingBatch){ + log.debug("Acknowledging file_sync outgoing batch."); + ISqlReadCursor<Data> cursor = engine.getDataService().selectDataFor(outgoingBatch.getBatchId(), outgoingBatch.getChannelId()); + Data data = null; + List <File> filesToDelete = new ArrayList<File>(); + for (int i = 0; i < outgoingBatch.getInsertEventCount(); i++) { + data = cursor.next(); + if (data != null) { + String[] rowData = data.toParsedRowData(); + //TODO: this method of getting the FileSnapshot data is not ideal. It would be better to map it to the columns in the table + FileSnapshot fileSnapshot = new FileSnapshot(); + fileSnapshot.setTriggerId(rowData[0]); + fileSnapshot.setRouterId(rowData[1]); + fileSnapshot.setRelativeDir(rowData[2]); + fileSnapshot.setFileName(rowData[3]); + fileSnapshot.setLastEventType(LastEventType.fromCode(rowData[4])); + + FileTriggerRouter triggerRouter = this.getFileTriggerRouter( + fileSnapshot.getTriggerId(), fileSnapshot.getRouterId()); + if (triggerRouter != null) { + FileTrigger fileTrigger = triggerRouter.getFileTrigger(); + + if(fileTrigger.isDeleteAfterSync()) { + File file = fileTrigger.createSourceFile(fileSnapshot); + if (!file.isDirectory()) { + filesToDelete.add(file); + if(fileTrigger.isSyncOnCtlFile()) { + filesToDelete.add(new File(file.getAbsolutePath() + ".ctl")); + } + } + } + } + } + } + + if (filesToDelete != null && filesToDelete.size() > 0) { + for (File file : filesToDelete) { + if (file != null && file.exists()) { + log.debug("Deleting file: " + file.getAbsolutePath()); + file.delete(); + } + file = null; + } + filesToDelete = null; + } + } public void loadFilesFromPush(String nodeId, InputStream in, OutputStream out) { INodeService nodeService = engine.getNodeService(); @@ -721,6 +775,8 @@ fileTrigger.setAfterCopyScript(rs.getString("after_copy_script")); fileTrigger.setBeforeCopyScript(rs.getString("before_copy_script")); fileTrigger.setSyncOnModified(rs.getBoolean("sync_on_modified")); + fileTrigger.setSyncOnCtlFile(rs.getBoolean("sync_on_ctl_file")); + fileTrigger.setDeleteAfterSync(rs.getBoolean("delete_after_sync")); fileTrigger.setTriggerId(rs.getString("trigger_id")); return fileTrigger; } @@ -761,7 +817,6 @@ fileSnapshot.setTriggerId(rs.getString("trigger_id")); fileSnapshot.setRouterId(rs.getString("router_id")); return fileSnapshot; - } + } } - } Index: src/main/java/org/jumpmind/symmetric/model/FileTrigger.java =================================================================== --- src/main/java/org/jumpmind/symmetric/model/FileTrigger.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/model/FileTrigger.java (working copy) @@ -46,6 +46,8 @@ private boolean syncOnCreate = true; private boolean syncOnModified = true; private boolean syncOnDelete = true; + private boolean syncOnCtlFile = false; + private boolean deleteAfterSync = false; private String beforeCopyScript; private String afterCopyScript; private Date createTime = new Date(); @@ -128,6 +130,22 @@ this.syncOnDelete = syncOnDelete; } + public boolean isSyncOnCtlFile() { + return syncOnCtlFile; + } + + public void setSyncOnCtlFile(boolean syncOnCtlFile) { + this.syncOnCtlFile = syncOnCtlFile; + } + + public boolean isDeleteAfterSync() { + return deleteAfterSync; + } + + public void setDeleteAfterSync(boolean deleteAfterSync) { + this.deleteAfterSync = deleteAfterSync; + } + public Date getCreateTime() { return createTime; } Index: src/main/java/org/jumpmind/symmetric/model/FileConflictStrategy.java =================================================================== --- src/main/java/org/jumpmind/symmetric/model/FileConflictStrategy.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/model/FileConflictStrategy.java (working copy) @@ -23,6 +23,6 @@ public enum FileConflictStrategy { - SOURCE_WINS, TARGET_WINS, MANUAL + SOURCE_WINS, TARGET_WINS, REPORT_ERROR, MANUAL } Index: src/main/resources/symmetric-schema.xml =================================================================== --- src/main/resources/symmetric-schema.xml (revision 7757) +++ src/main/resources/symmetric-schema.xml (working copy) @@ -696,6 +696,8 @@ <column name="sync_on_create" type="BOOLEANINT" size="1" required="true" default="1" description="Whether to capture and send files when they are created." /> <column name="sync_on_modified" type="BOOLEANINT" size="1" required="true" default="1" description="Whether to capture and send files when they are modified." /> <column name="sync_on_delete" type="BOOLEANINT" size="1" required="true" default="1" description="Whether to capture and remove files when they are deleted." /> + <column name="sync_on_ctl_file" type="BOOLEANINT" size="1" required="true" default="0" description="Combined with sync_on_create, determines whether to capture and send files when a matching control file exists." /> + <column name="delete_after_sync" type="BOOLEANINT" size="1" required="true" default="0" description="Determines whether to delete the file after it has synced successfully." /> <column name="before_copy_script" type="LONGVARCHAR" description="A bsh script that is run right before the file copy." /> <column name="after_copy_script" type="LONGVARCHAR" description="A bsh script that is run right after the file copy." /> <column name="create_time" type="TIMESTAMP" required="true" description="Timestamp of when this entry was created." /> #P symmetric-server Index: src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv =================================================================== --- src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv (revision 7748) +++ src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv (working copy) @@ -12,7 +12,7 @@ schema, table,SYM_FILE_TRIGGER keys,TRIGGER_ID -columns,TRIGGER_ID,BASE_DIR,RECURSIVE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME +columns,TRIGGER_ID,BASE_DIR,RECURSIVE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME sql,delete from sym_file_trigger catalog, schema, Index: src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv =================================================================== --- src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv (revision 7748) +++ src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv (working copy) @@ -12,7 +12,7 @@ schema, table,SYM_FILE_TRIGGER keys,TRIGGER_ID -columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME +columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME sql,delete from sym_file_trigger catalog, schema, Index: src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv =================================================================== --- src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv (revision 7748) +++ src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv (working copy) @@ -12,7 +12,7 @@ schema, table,sym_file_trigger keys,TRIGGER_ID -columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME +columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME sql,delete from sym_file_trigger catalog, schema, Index: src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv =================================================================== --- src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv (revision 7748) +++ src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv (working copy) @@ -12,7 +12,7 @@ schema, table,SYM_FILE_TRIGGER keys,TRIGGER_ID -columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME +columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME sql,delete from sym_file_trigger catalog, schema, @@ -127,11 +127,11 @@ catalog, schema, table,SYM_FILE_TRIGGER -insert,"all","target/fs_svr/all","1",,,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" -insert,"create_only","target/fs_svr/create_only","1",,,"1","0","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" -insert,"all_recursive","target/fs_svr/all_recursive","1",,,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" -insert,"csv_only","target/fs_svr/csv_only","1","*.txt",,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" -insert,"choose_target","target/fs_svr/choose_target","1",,,"1","1","1"," +insert,"all","target/fs_svr/all","1",,,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" +insert,"create_only","target/fs_svr/create_only","1",,,"1","0","0","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" +insert,"all_recursive","target/fs_svr/all_recursive","1",,,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" +insert,"csv_only","target/fs_svr/csv_only","1","*.txt",,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" +insert,"choose_target","target/fs_svr/choose_target","1",,,"1","1","1","0"," a = new java.io.File(\"target/fs_clnt/choose_target/a\"); b = new java.io.File(\"target/fs_clnt/choose_target/b\"); if (org.apache.commons.io.FileUtils.sizeOfDirectory(a) > org.apache.commons.io.FileUtils.sizeOfDirectory(b)) { @@ -140,10 +140,10 @@ targetBaseDir = a.getAbsolutePath(); } ",,"2013-05-19 10:14:04.830","unit_test","2013-05-19 10:14:04.830" -insert,"client_src","target/fs_clnt/client_src","1",,,"1","1","1",,,"2013-05-19 13:56:18.456","unit_test","2013-05-19 13:56:18.456" -insert,"ping_back_clnt","target/fs_clnt/ping_back","1",,,"1","1","1",,,"2013-05-19 13:58:24.779","unit_test","2013-05-19 13:58:24.779" -insert,"ping_back_svr","target/fs_svr/ping_back","1",,,"1","1","1",,,"2013-05-19 13:59:05.366","unit_test","2013-05-19 13:59:05.366" -insert,"test_change_filename","target/fs_svr/change_name","0","source.txt",,"1","1","1","targetFileName = \"target.txt\";",,"2013-07-11 14:25:27.690","admin","2013-07-11 14:25:27.690" +insert,"client_src","target/fs_clnt/client_src","1",,,"1","1","1","0",,,"2013-05-19 13:56:18.456","unit_test","2013-05-19 13:56:18.456" +insert,"ping_back_clnt","target/fs_clnt/ping_back","1",,,"1","1","1","0",,,"2013-05-19 13:58:24.779","unit_test","2013-05-19 13:58:24.779" +insert,"ping_back_svr","target/fs_svr/ping_back","1",,,"1","1","1","0",,,"2013-05-19 13:59:05.366","unit_test","2013-05-19 13:59:05.366" +insert,"test_change_filename","target/fs_svr/change_name","0","source.txt",,"1","1","1","0","targetFileName = \"target.txt\";",,"2013-07-11 14:25:27.690","admin","2013-07-11 14:25:27.690" catalog, schema, table,SYM_FILE_TRIGGER_ROUTER Index: src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv =================================================================== --- src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv (revision 7748) +++ src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv (working copy) @@ -12,7 +12,7 @@ schema, table,SYM_FILE_TRIGGER keys,TRIGGER_ID -columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME +columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME sql,delete from sym_file_trigger catalog, schema, #P symmetric-assemble Index: src/docbook/file-sync.xml =================================================================== --- src/docbook/file-sync.xml (revision 7764) +++ src/docbook/file-sync.xml (working copy) @@ -228,9 +228,9 @@ <programlisting>INSERT INTO sym_file_trigger (trigger_id,base_dir,recurse,includes_files,excludes_files,sync_on_create, - sync_on_modified,sync_on_delete,before_copy_script,after_copy_script, + sync_on_modified,sync_on_delete,sync_on_ctl_file,delete_after_sync,before_copy_script,after_copy_script, create_time,last_update_by,last_update_time) -VALUES ('sync_directory','/filesync/server/all',1,'*.txt',null,1,1,1, +VALUES ('sync_directory','/filesync/server/all',1,'*.txt',null,1,1,1,0,0, 'targetBaseDir = "/filesync/clients/" + engine.getParameterService().getExternalId();',null,current_timestamp,'example', current_timestamp); @@ -272,10 +272,10 @@ <![CDATA[ INSERT INTO sym_file_trigger (trigger_id,base_dir,recurse,includes_files,excludes_files,sync_on_create, - sync_on_modified,sync_on_delete,before_copy_script,after_copy_script,create_time, + sync_on_modified,sync_on_delete,sync_on_ctl_file,delete_after_sync,before_copy_script,after_copy_script,create_time, last_update_by,last_update_time) VALUES - ('node_specific','/filesync/server/nodes',1,null,null,1,1,1,'',null, + ('node_specific','/filesync/server/nodes',1,null,null,1,1,1,0,0,'',null, current_timestamp,'example',current_timestamp); INSERT INTO sym_file_trigger_router |
|
delete_after_sync_v2.patch (30,925 bytes)
### Eclipse Workspace Patch 1.0 #P symmetric-core Index: src/main/java/org/jumpmind/symmetric/service/impl/DataService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/DataService.java (revision 7784) +++ src/main/java/org/jumpmind/symmetric/service/impl/DataService.java (working copy) @@ -1359,7 +1359,21 @@ dataMapper, new Object[] { batch.getBatchId(), batch.getTargetNodeId() }, new int[] { Types.NUMERIC }); } - + + public ISqlReadCursor<Data> selectDataFor(Long batchId, String channelId) { + return sqlTemplate + .queryForCursor(getDataSelectByBatchSql(batchId, -1l, channelId), + dataMapper, new Object[] { batchId }, + new int[] { Types.NUMERIC }); + } + + protected String getDataSelectByBatchSql(long batchId, long startDataId, String channelId) { + String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : ""; + return symmetricDialect.massageDataExtractionSql( + getSql("selectEventDataByBatchIdSql", startAtDataIdSql, " order by d.data_id asc"), + engine.getConfigurationService().getNodeChannel(channelId, false).getChannel()); + } + protected String getDataSelectSql(long batchId, long startDataId, String channelId) { String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : ""; return symmetricDialect.massageDataExtractionSql( Index: src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java =================================================================== --- src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java (working copy) @@ -298,7 +298,7 @@ nodeService, dataExtractorService, dataService, dataLoaderService, transportManager, statisticManager, configurationService); this.acknowledgeService = new AcknowledgeService(parameterService, symmetricDialect, - outgoingBatchService, registrationService, stagingManager); + outgoingBatchService, registrationService, stagingManager, this); this.pushService = new PushService(parameterService, symmetricDialect, dataExtractorService, acknowledgeService, transportManager, nodeService, clusterService, nodeCommunicationService, statisticManager); Index: src/main/java/org/jumpmind/symmetric/service/IDataService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/IDataService.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/IDataService.java (working copy) @@ -147,6 +147,8 @@ public ISqlReadCursor<Data> selectDataFor(Batch batch); + public ISqlReadCursor<Data> selectDataFor(Long batchId, String channelId); + public List<IReloadListener> getReloadListeners(); } \ No newline at end of file Index: src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java (working copy) @@ -44,7 +44,14 @@ + " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join " + " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id " + " where o.batch_id = ? and o.node_id = ? "); - + + putSql("selectEventDataByBatchIdSql", + "" + + "select d.data_id, d.table_name, d.event_type, d.row_data, d.pk_data, d.old_data, " + + " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join " + + " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id " + + " where o.batch_id = ? "); + putSql("selectEventDataIdsSql", "" + "select d.data_id from $(data) d inner join " Index: src/main/java/org/jumpmind/symmetric/model/FileTrigger.java =================================================================== --- src/main/java/org/jumpmind/symmetric/model/FileTrigger.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/model/FileTrigger.java (working copy) @@ -46,6 +46,8 @@ private boolean syncOnCreate = true; private boolean syncOnModified = true; private boolean syncOnDelete = true; + private boolean syncOnCtlFile = false; + private boolean deleteAfterSync = false; private String beforeCopyScript; private String afterCopyScript; private Date createTime = new Date(); @@ -128,6 +130,22 @@ this.syncOnDelete = syncOnDelete; } + public boolean isSyncOnCtlFile() { + return syncOnCtlFile; + } + + public void setSyncOnCtlFile(boolean syncOnCtlFile) { + this.syncOnCtlFile = syncOnCtlFile; + } + + public boolean isDeleteAfterSync() { + return deleteAfterSync; + } + + public void setDeleteAfterSync(boolean deleteAfterSync) { + this.deleteAfterSync = deleteAfterSync; + } + public Date getCreateTime() { return createTime; } Index: src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java (working copy) @@ -69,5 +69,7 @@ public RemoteNodeStatuses pushFilesToNodes(boolean force); public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node node, IOutgoingTransport outgoingTransport); + + public void acknowledgeFiles(OutgoingBatch outgoingBatch); } Index: src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java (working copy) @@ -32,12 +32,13 @@ // @formatter:off putSql("selectFileTriggersSql", - " select trigger_id, base_dir, recurse, " + + " select trigger_id, base_dir, recurse, " + " includes_files, excludes_files, " + " sync_on_create, sync_on_modified, sync_on_delete, " + + " sync_on_ctl_file, delete_after_sync, " + " before_copy_script, " + " after_copy_script, " + - " create_time, last_update_by, " + + " create_time, last_update_by, " + " last_update_time " + " from $(file_trigger) "); @@ -46,16 +47,18 @@ putSql("updateFileTriggerSql", " update $(file_trigger) set base_dir=?, recurse=?, includes_files=?, " + - " excludes_files=?, sync_on_create=?, sync_on_modified=?, sync_on_delete=?, " + - " before_copy_script=?, after_copy_script=?, " + - " last_update_by=?, last_update_time=? where trigger_id=? "); + " excludes_files=?, sync_on_create=?, sync_on_modified=?, sync_on_delete=?, " + + " sync_on_ctl_file=?, delete_after_sync=?, " + + " before_copy_script=?, after_copy_script=?, " + + " last_update_by=?, last_update_time=? where trigger_id=? "); putSql("insertFileTriggerSql", " insert into $(file_trigger) (base_dir, recurse, includes_files, " + - " excludes_files, sync_on_create, sync_on_modified, sync_on_delete, " + - " before_copy_script, after_copy_script, " + - " last_update_by, last_update_time, trigger_id, create_time) " + - " values(?,?,?,?,?,?,?,?,?,?,?,?,?) "); + " excludes_files, sync_on_create, sync_on_modified, sync_on_delete, " + + " sync_on_ctl_file, delete_after_sync, " + + " before_copy_script, after_copy_script, " + + " last_update_by, last_update_time, trigger_id, create_time) " + + " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "); putSql("selectFileSnapshotSql", " select trigger_id, router_id, relative_dir, file_name, last_event_type, crc32_checksum, " + Index: src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java (working copy) @@ -36,6 +36,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.io.filefilter.DirectoryFileFilter; import org.apache.commons.lang.StringUtils; +import org.jumpmind.db.sql.ISqlReadCursor; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; @@ -53,6 +54,7 @@ import org.jumpmind.symmetric.model.FileConflictStrategy; import org.jumpmind.symmetric.model.FileSnapshot; import org.jumpmind.symmetric.model.FileSnapshot.LastEventType; +import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.model.FileTrigger; import org.jumpmind.symmetric.model.FileTriggerRouter; import org.jumpmind.symmetric.model.IncomingBatch; @@ -88,7 +90,7 @@ private Object trackerLock = new Object(); private ISymmetricEngine engine; - + // TODO cache trigger routers public FileSyncService(ISymmetricEngine engine) { @@ -182,7 +184,10 @@ fileTrigger.getIncludesFiles(), fileTrigger.getExcludesFiles(), fileTrigger.isSyncOnCreate() ? 1 : 0, fileTrigger.isSyncOnModified() ? 1 : 0, - fileTrigger.isSyncOnDelete() ? 1 : 0, fileTrigger.getBeforeCopyScript(), + fileTrigger.isSyncOnDelete() ? 1 : 0, + fileTrigger.isSyncOnCtlFile() ? 1 : 0, + fileTrigger.isDeleteAfterSync() ? 1 : 0, + fileTrigger.getBeforeCopyScript(), fileTrigger.getAfterCopyScript(), fileTrigger.getLastUpdateBy(), fileTrigger.getLastUpdateTime(), fileTrigger.getTriggerId() }, new int[] { Types.VARCHAR, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, @@ -195,6 +200,8 @@ fileTrigger.isSyncOnCreate() ? 1 : 0, fileTrigger.isSyncOnModified() ? 1 : 0, fileTrigger.isSyncOnDelete() ? 1 : 0, + fileTrigger.isSyncOnCtlFile() ? 1 : 0, + fileTrigger.isDeleteAfterSync() ? 1 : 0, fileTrigger.getBeforeCopyScript(), fileTrigger.getAfterCopyScript(), fileTrigger.getLastUpdateBy(), fileTrigger.getLastUpdateTime(), fileTrigger.getTriggerId(), fileTrigger.getCreateTime() }, new int[] { @@ -437,6 +444,53 @@ } } } + + public void acknowledgeFiles(OutgoingBatch outgoingBatch){ + log.debug("Acknowledging file_sync outgoing batch."); + ISqlReadCursor<Data> cursor = engine.getDataService().selectDataFor(outgoingBatch.getBatchId(), outgoingBatch.getChannelId()); + Data data = null; + List <File> filesToDelete = new ArrayList<File>(); + for (int i = 0; i < outgoingBatch.getInsertEventCount(); i++) { + data = cursor.next(); + if (data != null) { + String[] rowData = data.toParsedRowData(); + //TODO: this method of getting the FileSnapshot data is not ideal. It would be better to map it to the columns in the table + FileSnapshot fileSnapshot = new FileSnapshot(); + fileSnapshot.setTriggerId(rowData[0]); + fileSnapshot.setRouterId(rowData[1]); + fileSnapshot.setRelativeDir(rowData[2]); + fileSnapshot.setFileName(rowData[3]); + fileSnapshot.setLastEventType(LastEventType.fromCode(rowData[4])); + + FileTriggerRouter triggerRouter = this.getFileTriggerRouter( + fileSnapshot.getTriggerId(), fileSnapshot.getRouterId()); + if (triggerRouter != null) { + FileTrigger fileTrigger = triggerRouter.getFileTrigger(); + + if(fileTrigger.isDeleteAfterSync()) { + File file = fileTrigger.createSourceFile(fileSnapshot); + if (!file.isDirectory()) { + filesToDelete.add(file); + if(fileTrigger.isSyncOnCtlFile()) { + filesToDelete.add(new File(file.getAbsolutePath() + ".ctl")); + } + } + } + } + } + } + + if (filesToDelete != null && filesToDelete.size() > 0) { + for (File file : filesToDelete) { + if (file != null && file.exists()) { + log.debug("Deleting file: " + file.getAbsolutePath()); + file.delete(); + } + file = null; + } + filesToDelete = null; + } + } public void loadFilesFromPush(String nodeId, InputStream in, OutputStream out) { INodeService nodeService = engine.getNodeService(); @@ -721,6 +775,8 @@ fileTrigger.setAfterCopyScript(rs.getString("after_copy_script")); fileTrigger.setBeforeCopyScript(rs.getString("before_copy_script")); fileTrigger.setSyncOnModified(rs.getBoolean("sync_on_modified")); + fileTrigger.setSyncOnCtlFile(rs.getBoolean("sync_on_ctl_file")); + fileTrigger.setDeleteAfterSync(rs.getBoolean("delete_after_sync")); fileTrigger.setTriggerId(rs.getString("trigger_id")); return fileTrigger; } @@ -761,7 +817,6 @@ fileSnapshot.setTriggerId(rs.getString("trigger_id")); fileSnapshot.setRouterId(rs.getString("router_id")); return fileSnapshot; - } + } } - } Index: src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java =================================================================== --- src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java (working copy) @@ -24,6 +24,7 @@ import java.util.List; import org.jumpmind.db.sql.mapper.NumberMapper; +import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.stage.IStagedResource; @@ -51,14 +52,17 @@ private IRegistrationService registrationService; private IStagingManager stagingManger; + + private ISymmetricEngine engine; public AcknowledgeService(IParameterService parameterService, ISymmetricDialect symmetricDialect, IOutgoingBatchService outgoingBatchService, - IRegistrationService registrationService, IStagingManager stagingManager) { + IRegistrationService registrationService, IStagingManager stagingManager, ISymmetricEngine engine) { super(parameterService, symmetricDialect); this.outgoingBatchService = outgoingBatchService; this.registrationService = registrationService; this.stagingManger = stagingManager; + this.engine = engine; setSqlMap(new AcknowledgeServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); } @@ -128,6 +132,12 @@ //TODO: I should really be able to catch errors here, but can't do to how this is coded outgoingBatchService.updateOutgoingBatch(outgoingBatch); + if (status == Status.OK) { + if (outgoingBatch.getChannelId().equals(Constants.CHANNEL_FILESYNC)){ + //Acknowledge the file_sync in case the file needs deleted. + engine.getFileSyncService().acknowledgeFiles(outgoingBatch); + } + } } else { log.error("Could not find batch {}-{} to acknowledge as {}", new Object[] {batch.getNodeId(), batch.getBatchId(), status.name()}); Index: src/main/resources/symmetric-schema.xml =================================================================== --- src/main/resources/symmetric-schema.xml (revision 7757) +++ src/main/resources/symmetric-schema.xml (working copy) @@ -696,6 +696,8 @@ <column name="sync_on_create" type="BOOLEANINT" size="1" required="true" default="1" description="Whether to capture and send files when they are created." /> <column name="sync_on_modified" type="BOOLEANINT" size="1" required="true" default="1" description="Whether to capture and send files when they are modified." /> <column name="sync_on_delete" type="BOOLEANINT" size="1" required="true" default="1" description="Whether to capture and remove files when they are deleted." /> + <column name="sync_on_ctl_file" type="BOOLEANINT" size="1" required="true" default="0" description="Combined with sync_on_create, determines whether to capture and send files when a matching control file exists." /> + <column name="delete_after_sync" type="BOOLEANINT" size="1" required="true" default="0" description="Determines whether to delete the file after it has synced successfully." /> <column name="before_copy_script" type="LONGVARCHAR" description="A bsh script that is run right before the file copy." /> <column name="after_copy_script" type="LONGVARCHAR" description="A bsh script that is run right after the file copy." /> <column name="create_time" type="TIMESTAMP" required="true" description="Timestamp of when this entry was created." /> Index: src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java =================================================================== --- src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java (revision 7757) +++ src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java (working copy) @@ -115,12 +115,27 @@ } public void onFileCreate(File file) { - if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCreate()) { - log.debug("File create detected: {}", file.getAbsolutePath()); - this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file, - LastEventType.CREATE)); + if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()){ + onCtlFile(file); + } else { + if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCreate()) { + log.debug("File create detected: {}", file.getAbsolutePath()); + this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file, + LastEventType.CREATE)); + } } } + + public void onCtlFile(File file) { + if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()){ + File ctlFile = new File(file.getAbsolutePath() + ".ctl"); + if (ctlFile.exists()) { + log.debug("Control file detected: {}", file.getAbsolutePath()); + this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file, + LastEventType.CREATE)); + } + } + } public void onFileChange(File file) { if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnModified()) { #P symmetric-server Index: src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv =================================================================== --- src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv (revision 7748) +++ src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv (working copy) @@ -12,7 +12,7 @@ schema, table,SYM_FILE_TRIGGER keys,TRIGGER_ID -columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME +columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME sql,delete from sym_file_trigger catalog, schema, Index: src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv =================================================================== --- src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv (revision 7748) +++ src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv (working copy) @@ -12,7 +12,7 @@ schema, table,SYM_FILE_TRIGGER keys,TRIGGER_ID -columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME +columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME sql,delete from sym_file_trigger catalog, schema, @@ -127,11 +127,11 @@ catalog, schema, table,SYM_FILE_TRIGGER -insert,"all","target/fs_svr/all","1",,,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" -insert,"create_only","target/fs_svr/create_only","1",,,"1","0","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" -insert,"all_recursive","target/fs_svr/all_recursive","1",,,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" -insert,"csv_only","target/fs_svr/csv_only","1","*.txt",,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" -insert,"choose_target","target/fs_svr/choose_target","1",,,"1","1","1"," +insert,"all","target/fs_svr/all","1",,,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" +insert,"create_only","target/fs_svr/create_only","1",,,"1","0","0","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" +insert,"all_recursive","target/fs_svr/all_recursive","1",,,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" +insert,"csv_only","target/fs_svr/csv_only","1","*.txt",,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966" +insert,"choose_target","target/fs_svr/choose_target","1",,,"1","1","1","0"," a = new java.io.File(\"target/fs_clnt/choose_target/a\"); b = new java.io.File(\"target/fs_clnt/choose_target/b\"); if (org.apache.commons.io.FileUtils.sizeOfDirectory(a) > org.apache.commons.io.FileUtils.sizeOfDirectory(b)) { @@ -140,10 +140,10 @@ targetBaseDir = a.getAbsolutePath(); } ",,"2013-05-19 10:14:04.830","unit_test","2013-05-19 10:14:04.830" -insert,"client_src","target/fs_clnt/client_src","1",,,"1","1","1",,,"2013-05-19 13:56:18.456","unit_test","2013-05-19 13:56:18.456" -insert,"ping_back_clnt","target/fs_clnt/ping_back","1",,,"1","1","1",,,"2013-05-19 13:58:24.779","unit_test","2013-05-19 13:58:24.779" -insert,"ping_back_svr","target/fs_svr/ping_back","1",,,"1","1","1",,,"2013-05-19 13:59:05.366","unit_test","2013-05-19 13:59:05.366" -insert,"test_change_filename","target/fs_svr/change_name","0","source.txt",,"1","1","1","targetFileName = \"target.txt\";",,"2013-07-11 14:25:27.690","admin","2013-07-11 14:25:27.690" +insert,"client_src","target/fs_clnt/client_src","1",,,"1","1","1","0",,,"2013-05-19 13:56:18.456","unit_test","2013-05-19 13:56:18.456" +insert,"ping_back_clnt","target/fs_clnt/ping_back","1",,,"1","1","1","0",,,"2013-05-19 13:58:24.779","unit_test","2013-05-19 13:58:24.779" +insert,"ping_back_svr","target/fs_svr/ping_back","1",,,"1","1","1","0",,,"2013-05-19 13:59:05.366","unit_test","2013-05-19 13:59:05.366" +insert,"test_change_filename","target/fs_svr/change_name","0","source.txt",,"1","1","1","0","targetFileName = \"target.txt\";",,"2013-07-11 14:25:27.690","admin","2013-07-11 14:25:27.690" catalog, schema, table,SYM_FILE_TRIGGER_ROUTER Index: src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv =================================================================== --- src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv (revision 7748) +++ src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv (working copy) @@ -12,7 +12,7 @@ schema, table,SYM_FILE_TRIGGER keys,TRIGGER_ID -columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME +columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME sql,delete from sym_file_trigger catalog, schema, Index: src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv =================================================================== --- src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv (revision 7748) +++ src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv (working copy) @@ -12,7 +12,7 @@ schema, table,SYM_FILE_TRIGGER keys,TRIGGER_ID -columns,TRIGGER_ID,BASE_DIR,RECURSIVE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME +columns,TRIGGER_ID,BASE_DIR,RECURSIVE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME sql,delete from sym_file_trigger catalog, schema, Index: src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv =================================================================== --- src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv (revision 7748) +++ src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv (working copy) @@ -12,7 +12,7 @@ schema, table,sym_file_trigger keys,TRIGGER_ID -columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME +columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME sql,delete from sym_file_trigger catalog, schema, #P symmetric-assemble Index: src/docbook/file-sync.xml =================================================================== --- src/docbook/file-sync.xml (revision 7764) +++ src/docbook/file-sync.xml (working copy) @@ -228,9 +228,9 @@ <programlisting>INSERT INTO sym_file_trigger (trigger_id,base_dir,recurse,includes_files,excludes_files,sync_on_create, - sync_on_modified,sync_on_delete,before_copy_script,after_copy_script, + sync_on_modified,sync_on_delete,sync_on_ctl_file,delete_after_sync,before_copy_script,after_copy_script, create_time,last_update_by,last_update_time) -VALUES ('sync_directory','/filesync/server/all',1,'*.txt',null,1,1,1, +VALUES ('sync_directory','/filesync/server/all',1,'*.txt',null,1,1,1,0,0, 'targetBaseDir = "/filesync/clients/" + engine.getParameterService().getExternalId();',null,current_timestamp,'example', current_timestamp); @@ -272,10 +272,10 @@ <![CDATA[ INSERT INTO sym_file_trigger (trigger_id,base_dir,recurse,includes_files,excludes_files,sync_on_create, - sync_on_modified,sync_on_delete,before_copy_script,after_copy_script,create_time, + sync_on_modified,sync_on_delete,sync_on_ctl_file,delete_after_sync,before_copy_script,after_copy_script,create_time, last_update_by,last_update_time) VALUES - ('node_specific','/filesync/server/nodes',1,null,null,1,1,1,'',null, + ('node_specific','/filesync/server/nodes',1,null,null,1,1,1,0,0,'',null, current_timestamp,'example',current_timestamp); INSERT INTO sym_file_trigger_router |
SymmetricDS: master a8bcad15 2014-02-17 21:11:10 Details Diff |
0001485: Create a delete_after_sync function for file sync 0001441: Add ability to sync on control file |
Affected Issues 0001485 |
|
mod - symmetric-assemble/src/docbook/file-sync.xml | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/model/FileTrigger.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java | Diff File | ||
mod - symmetric-core/src/main/resources/symmetric-schema.xml | Diff File | ||
mod - symmetric-server/src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv | Diff File | ||
mod - symmetric-server/src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv | Diff File | ||
mod - symmetric-server/src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv | Diff File | ||
mod - symmetric-server/src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv | Diff File | ||
mod - symmetric-server/src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv | Diff File |
Date Modified | Username | Field | Change |
---|---|---|---|
2013-12-11 04:02 | nswendal | New Issue | |
2013-12-11 04:02 | nswendal | File Added: delete_after_sync.patch | |
2013-12-11 13:57 | nswendal | File Added: delete_after_sync_v2.patch | |
2014-02-18 01:38 | chenson | Assigned To | => chenson |
2014-02-18 01:38 | chenson | Status | new => assigned |
2014-02-18 01:38 | chenson | Target Version | => 3.6.0 |
2014-02-18 02:12 | chenson | Status | assigned => resolved |
2014-02-18 02:12 | chenson | Fixed in Version | => 3.6.0 |
2014-02-18 02:12 | chenson | Resolution | open => fixed |
2014-02-18 03:00 | Changeset attached | => SymmetricDS trunk r7958 | |
2014-06-17 00:27 | chenson | Status | resolved => closed |
2015-07-31 01:49 | chenson | Changeset attached | => SymmetricDS master a8bcad15 |