View Issue Details

IDProjectCategoryView StatusLast Update
0001485SymmetricDSNew Featurepublic2014-06-17 00:27
Reporternswendal Assigned Tochenson  
Prioritynormal 
Status closedResolutionfixed 
Product Version3.5.11 
Target Version3.6.0Fixed in Version3.6.0 
Summary0001485: Create a delete_after_sync function for file sync
DescriptionThis feature will delete the file after it successful ack. I created a new flag in the file_trigger table.
Additional InformationThe attached "delete_after_sync.patch" also contains the patch code for issue 0001441 (sync_on_ctl_file). This change must go after 1441 as I coded to delete the control file if needed.
TagsNo tags attached.

Activities

nswendal

2013-12-11 04:02

reporter  

delete_after_sync.patch (32,251 bytes)   
### Eclipse Workspace Patch 1.0
#P symmetric-core
Index: src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java	(working copy)
@@ -115,12 +115,27 @@
         }
 
         public void onFileCreate(File file) {
-            if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCreate()) {
-                log.debug("File create detected: {}", file.getAbsolutePath());
-                this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file,
-                        LastEventType.CREATE));
+            if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()){
+                onCtlFile(file);
+            } else {
+                if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCreate()) {
+                    log.debug("File create detected: {}", file.getAbsolutePath());
+                    this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file,
+                            LastEventType.CREATE));
+                }
             }
         }
+        
+        public void onCtlFile(File file) {
+          if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()){
+              File ctlFile = new File(file.getAbsolutePath() + ".ctl");
+              if (ctlFile.exists()) {
+                  log.debug("Control file detected: {}", file.getAbsolutePath());
+                  this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file,
+                          LastEventType.CREATE));
+              }
+          }
+      }
 
         public void onFileChange(File file) {
             if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnModified()) {
Index: src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java	(working copy)
@@ -298,7 +298,7 @@
                 nodeService, dataExtractorService, dataService, dataLoaderService,
                 transportManager, statisticManager, configurationService);
         this.acknowledgeService = new AcknowledgeService(parameterService, symmetricDialect,
-                outgoingBatchService, registrationService, stagingManager);
+                outgoingBatchService, registrationService, stagingManager, this);
         this.pushService = new PushService(parameterService, symmetricDialect,
                 dataExtractorService, acknowledgeService, transportManager, nodeService,
                 clusterService, nodeCommunicationService, statisticManager);
Index: src/main/java/org/jumpmind/symmetric/service/IDataService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/IDataService.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/IDataService.java	(working copy)
@@ -147,6 +147,8 @@
     
     public ISqlReadCursor<Data> selectDataFor(Batch batch);
     
+    public ISqlReadCursor<Data> selectDataFor(Long batchId, String channelId);
+    
     public List<IReloadListener> getReloadListeners();
         
 }
\ No newline at end of file
Index: src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java	(working copy)
@@ -69,5 +69,7 @@
     public RemoteNodeStatuses pushFilesToNodes(boolean force);
 
     public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node node, IOutgoingTransport outgoingTransport);
+    
+    public void acknowledgeFiles(OutgoingBatch outgoingBatch);
 
 }
Index: src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java	(working copy)
@@ -32,12 +32,13 @@
         // @formatter:off
 
         putSql("selectFileTriggersSql",
-                " select trigger_id, base_dir, recurse,                                     " +
+                " select trigger_id, base_dir, recurse,                                       " +
                 "        includes_files, excludes_files,                                      " +
         		"        sync_on_create, sync_on_modified, sync_on_delete,                    " +
+        		"        sync_on_ctl_file, delete_after_sync,                                 " +
                 "        before_copy_script,                                                  " +
                 "        after_copy_script,                                                   " +
-        		"        create_time, last_update_by,                                          " +
+        		"        create_time, last_update_by,                                         " +
         		"        last_update_time                                                     " +
         		" from $(file_trigger)                                                        ");
 
@@ -46,16 +47,18 @@
 
         putSql("updateFileTriggerSql",
                 " update $(file_trigger) set base_dir=?, recurse=?, includes_files=?,         " +
-        		"  excludes_files=?, sync_on_create=?, sync_on_modified=?, sync_on_delete=?,    " +
-                "  before_copy_script=?, after_copy_script=?,                                   " +
-        		"  last_update_by=?, last_update_time=? where trigger_id=?                      ");
+        		"  excludes_files=?, sync_on_create=?, sync_on_modified=?, sync_on_delete=?,  " +
+        		"  sync_on_ctl_file=?, delete_after_sync=?,                                   " +
+                "  before_copy_script=?, after_copy_script=?,                                 " +
+        		"  last_update_by=?, last_update_time=? where trigger_id=?                    ");
 
         putSql("insertFileTriggerSql",
                 " insert into $(file_trigger) (base_dir, recurse, includes_files,             " +
-                "  excludes_files, sync_on_create, sync_on_modified, sync_on_delete,            " +
-                "  before_copy_script, after_copy_script,                                       " +
-                "  last_update_by, last_update_time, trigger_id, create_time)                   " +
-                " values(?,?,?,?,?,?,?,?,?,?,?,?,?)                                             ");
+                "  excludes_files, sync_on_create, sync_on_modified, sync_on_delete,          " +
+                "  sync_on_ctl_file, delete_after_sync,                                       " +
+                "  before_copy_script, after_copy_script,                                     " +
+                "  last_update_by, last_update_time, trigger_id, create_time)                 " +
+                " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)                                       ");
 
         putSql("selectFileSnapshotSql",
                 " select trigger_id, router_id, relative_dir, file_name, last_event_type, crc32_checksum, " +
Index: src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java	(working copy)
@@ -44,7 +44,14 @@
                         + "  d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join   "
                         + "  $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id                                  "
                         + "  where o.batch_id = ? and o.node_id = ?                                                                                                                                    ");
-
+        
+        putSql("selectEventDataByBatchIdSql",
+                ""
+                        + "select d.data_id, d.table_name, d.event_type, d.row_data, d.pk_data, d.old_data,                                                                          "
+                        + "  d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join   "
+                        + "  $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id                                  "
+                        + "  where o.batch_id = ?    ");
+        
         putSql("selectEventDataIdsSql",
                 ""
                         + "select d.data_id from $(data) d inner join                                                                 "
Index: src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java	(working copy)
@@ -393,7 +393,7 @@
             trigger.setSyncOnIncomingBatch(false);
             boolean syncEnabled = parameterService.is(ParameterConstants.FILE_SYNC_ENABLE);
             trigger.setSyncOnInsert(syncEnabled);
-            trigger.setSyncOnUpdate(syncEnabled);
+            trigger.setSyncOnUpdate(false); // Changed to false because of issues with the traffic file
             trigger.setSyncOnDelete(false);
         } else {
             trigger.setChannelId(Constants.CHANNEL_CONFIG);
Index: src/main/java/org/jumpmind/symmetric/service/impl/DataService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/DataService.java	(revision 7784)
+++ src/main/java/org/jumpmind/symmetric/service/impl/DataService.java	(working copy)
@@ -1359,7 +1359,21 @@
                         dataMapper, new Object[] { batch.getBatchId(), batch.getTargetNodeId() },
                         new int[] { Types.NUMERIC });
     }
-
+    
+    public ISqlReadCursor<Data> selectDataFor(Long batchId, String channelId) {
+        return sqlTemplate
+                .queryForCursor(getDataSelectByBatchSql(batchId, -1l, channelId),
+                        dataMapper, new Object[] { batchId },
+                        new int[] { Types.NUMERIC });
+    }
+    
+    protected String getDataSelectByBatchSql(long batchId, long startDataId, String channelId) {
+        String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : "";
+        return symmetricDialect.massageDataExtractionSql(
+                getSql("selectEventDataByBatchIdSql", startAtDataIdSql, " order by d.data_id asc"),
+                engine.getConfigurationService().getNodeChannel(channelId, false).getChannel());
+    }
+    
     protected String getDataSelectSql(long batchId, long startDataId, String channelId) {
         String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : "";
         return symmetricDialect.massageDataExtractionSql(
Index: src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java	(working copy)
@@ -24,6 +24,7 @@
 import java.util.List;
 
 import org.jumpmind.db.sql.mapper.NumberMapper;
+import org.jumpmind.symmetric.ISymmetricEngine;
 import org.jumpmind.symmetric.common.Constants;
 import org.jumpmind.symmetric.db.ISymmetricDialect;
 import org.jumpmind.symmetric.io.stage.IStagedResource;
@@ -51,14 +52,17 @@
     private IRegistrationService registrationService;
 
     private IStagingManager stagingManger;
+    
+    private ISymmetricEngine engine;
 
     public AcknowledgeService(IParameterService parameterService,
             ISymmetricDialect symmetricDialect, IOutgoingBatchService outgoingBatchService,
-            IRegistrationService registrationService, IStagingManager stagingManager) {
+            IRegistrationService registrationService, IStagingManager stagingManager, ISymmetricEngine engine) {
         super(parameterService, symmetricDialect);
         this.outgoingBatchService = outgoingBatchService;
         this.registrationService = registrationService;
         this.stagingManger = stagingManager;
+        this.engine = engine;
         setSqlMap(new AcknowledgeServiceSqlMap(symmetricDialect.getPlatform(),
                 createSqlReplacementTokens()));
     }
@@ -128,6 +132,12 @@
 
                 //TODO: I should really be able to catch errors here, but can't do to how this is coded
                 outgoingBatchService.updateOutgoingBatch(outgoingBatch);
+                if (status == Status.OK) {
+                    if (outgoingBatch.getChannelId().equals(Constants.CHANNEL_FILESYNC)){
+                        //Acknowledge the file_sync in case the file needs deleted.
+                        engine.getFileSyncService().acknowledgeFiles(outgoingBatch);
+                    }
+                }
             } else {
                 log.error("Could not find batch {}-{} to acknowledge as {}", new Object[] {batch.getNodeId(), batch.getBatchId(),
                         status.name()});
Index: src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java	(working copy)
@@ -36,6 +36,7 @@
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.filefilter.DirectoryFileFilter;
 import org.apache.commons.lang.StringUtils;
+import org.jumpmind.db.sql.ISqlReadCursor;
 import org.jumpmind.db.sql.ISqlRowMapper;
 import org.jumpmind.db.sql.ISqlTransaction;
 import org.jumpmind.db.sql.Row;
@@ -53,6 +54,7 @@
 import org.jumpmind.symmetric.model.FileConflictStrategy;
 import org.jumpmind.symmetric.model.FileSnapshot;
 import org.jumpmind.symmetric.model.FileSnapshot.LastEventType;
+import org.jumpmind.symmetric.model.Data;
 import org.jumpmind.symmetric.model.FileTrigger;
 import org.jumpmind.symmetric.model.FileTriggerRouter;
 import org.jumpmind.symmetric.model.IncomingBatch;
@@ -88,7 +90,7 @@
 
     private Object trackerLock = new Object();
     private ISymmetricEngine engine;
-
+    
     // TODO cache trigger routers
 
     public FileSyncService(ISymmetricEngine engine) {
@@ -182,7 +184,10 @@
                         fileTrigger.getIncludesFiles(), fileTrigger.getExcludesFiles(),
                         fileTrigger.isSyncOnCreate() ? 1 : 0,
                         fileTrigger.isSyncOnModified() ? 1 : 0,
-                        fileTrigger.isSyncOnDelete() ? 1 : 0, fileTrigger.getBeforeCopyScript(),
+                        fileTrigger.isSyncOnDelete() ? 1 : 0, 
+                        fileTrigger.isSyncOnCtlFile() ? 1 : 0,
+                        fileTrigger.isDeleteAfterSync() ? 1 : 0,
+                        fileTrigger.getBeforeCopyScript(),
                         fileTrigger.getAfterCopyScript(), fileTrigger.getLastUpdateBy(),
                         fileTrigger.getLastUpdateTime(), fileTrigger.getTriggerId() }, new int[] {
                         Types.VARCHAR, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR,
@@ -195,6 +200,8 @@
                             fileTrigger.isSyncOnCreate() ? 1 : 0,
                             fileTrigger.isSyncOnModified() ? 1 : 0,
                             fileTrigger.isSyncOnDelete() ? 1 : 0,
+                            fileTrigger.isSyncOnCtlFile() ? 1 : 0,
+                            fileTrigger.isDeleteAfterSync() ? 1 : 0,        
                             fileTrigger.getBeforeCopyScript(), fileTrigger.getAfterCopyScript(),
                             fileTrigger.getLastUpdateBy(), fileTrigger.getLastUpdateTime(),
                             fileTrigger.getTriggerId(), fileTrigger.getCreateTime() }, new int[] {
@@ -437,6 +444,53 @@
             }
         }
     }
+    
+    public void acknowledgeFiles(OutgoingBatch outgoingBatch){
+        log.debug("Acknowledging file_sync outgoing batch.");
+        ISqlReadCursor<Data> cursor = engine.getDataService().selectDataFor(outgoingBatch.getBatchId(), outgoingBatch.getChannelId());
+        Data data = null;
+        List <File> filesToDelete = new ArrayList<File>();
+        for (int i = 0; i < outgoingBatch.getInsertEventCount(); i++) {
+            data = cursor.next();
+            if (data != null) {
+                String[] rowData = data.toParsedRowData();
+                //TODO: this method of getting the FileSnapshot data is not ideal. It would be better to map it to the columns in the table 
+                FileSnapshot fileSnapshot = new FileSnapshot();
+                fileSnapshot.setTriggerId(rowData[0]);
+                fileSnapshot.setRouterId(rowData[1]);
+                fileSnapshot.setRelativeDir(rowData[2]);
+                fileSnapshot.setFileName(rowData[3]);
+                fileSnapshot.setLastEventType(LastEventType.fromCode(rowData[4]));
+                
+                FileTriggerRouter triggerRouter = this.getFileTriggerRouter(
+                        fileSnapshot.getTriggerId(), fileSnapshot.getRouterId());
+                if (triggerRouter != null) {
+                    FileTrigger fileTrigger = triggerRouter.getFileTrigger();
+                    
+                    if(fileTrigger.isDeleteAfterSync()) {
+                        File file = fileTrigger.createSourceFile(fileSnapshot);
+                        if (!file.isDirectory()) {
+                            filesToDelete.add(file);
+                            if(fileTrigger.isSyncOnCtlFile()) {
+                                filesToDelete.add(new File(file.getAbsolutePath() + ".ctl"));
+                            }
+                        }
+                    }
+                }
+            }   
+        }
+        
+        if (filesToDelete != null && filesToDelete.size() > 0) {
+            for (File file : filesToDelete) {
+                if (file != null && file.exists()) {
+                    log.debug("Deleting file: " + file.getAbsolutePath());
+                    file.delete();
+                }
+                file = null;
+            }
+            filesToDelete = null;
+        }
+    }
 
     public void loadFilesFromPush(String nodeId, InputStream in, OutputStream out) {
         INodeService nodeService = engine.getNodeService();
@@ -721,6 +775,8 @@
             fileTrigger.setAfterCopyScript(rs.getString("after_copy_script"));
             fileTrigger.setBeforeCopyScript(rs.getString("before_copy_script"));
             fileTrigger.setSyncOnModified(rs.getBoolean("sync_on_modified"));
+            fileTrigger.setSyncOnCtlFile(rs.getBoolean("sync_on_ctl_file"));
+            fileTrigger.setDeleteAfterSync(rs.getBoolean("delete_after_sync"));
             fileTrigger.setTriggerId(rs.getString("trigger_id"));
             return fileTrigger;
         }
@@ -761,7 +817,6 @@
             fileSnapshot.setTriggerId(rs.getString("trigger_id"));
             fileSnapshot.setRouterId(rs.getString("router_id"));
             return fileSnapshot;
-        }
+        }        
     }
-
 }
Index: src/main/java/org/jumpmind/symmetric/model/FileTrigger.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/model/FileTrigger.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/model/FileTrigger.java	(working copy)
@@ -46,6 +46,8 @@
     private boolean syncOnCreate = true;
     private boolean syncOnModified = true;
     private boolean syncOnDelete = true;
+    private boolean syncOnCtlFile = false;
+    private boolean deleteAfterSync = false;
     private String beforeCopyScript;
     private String afterCopyScript;
     private Date createTime = new Date();
@@ -128,6 +130,22 @@
         this.syncOnDelete = syncOnDelete;
     }
 
+    public boolean isSyncOnCtlFile() {
+        return syncOnCtlFile;
+    }
+
+    public void setSyncOnCtlFile(boolean syncOnCtlFile) {
+        this.syncOnCtlFile = syncOnCtlFile;
+    }
+    
+    public boolean isDeleteAfterSync() {
+        return deleteAfterSync;
+    }
+
+    public void setDeleteAfterSync(boolean deleteAfterSync) {
+        this.deleteAfterSync = deleteAfterSync;
+    }
+    
     public Date getCreateTime() {
         return createTime;
     }
Index: src/main/java/org/jumpmind/symmetric/model/FileConflictStrategy.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/model/FileConflictStrategy.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/model/FileConflictStrategy.java	(working copy)
@@ -23,6 +23,6 @@
 
 public enum FileConflictStrategy {
 
-    SOURCE_WINS, TARGET_WINS, MANUAL
+    SOURCE_WINS, TARGET_WINS, REPORT_ERROR, MANUAL
     
 }
Index: src/main/resources/symmetric-schema.xml
===================================================================
--- src/main/resources/symmetric-schema.xml	(revision 7757)
+++ src/main/resources/symmetric-schema.xml	(working copy)
@@ -696,6 +696,8 @@
         <column name="sync_on_create" type="BOOLEANINT" size="1" required="true" default="1"  description="Whether to capture and send files when they are created." />
         <column name="sync_on_modified" type="BOOLEANINT" size="1" required="true" default="1"  description="Whether to capture and send files when they are modified." />
         <column name="sync_on_delete" type="BOOLEANINT" size="1" required="true" default="1"  description="Whether to capture and remove files when they are deleted." />
+        <column name="sync_on_ctl_file" type="BOOLEANINT" size="1" required="true" default="0"  description="Combined with sync_on_create, determines whether to capture and send files when a matching control file exists." />
+        <column name="delete_after_sync" type="BOOLEANINT" size="1" required="true" default="0"  description="Determines whether to delete the file after it has synced successfully." />
         <column name="before_copy_script" type="LONGVARCHAR" description="A bsh script that is run right before the file copy." />
         <column name="after_copy_script" type="LONGVARCHAR" description="A bsh script that is run right after the file copy." />
         <column name="create_time" type="TIMESTAMP" required="true"  description="Timestamp of when this entry was created." />
#P symmetric-server
Index: src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv
===================================================================
--- src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv	(revision 7748)
+++ src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv	(working copy)
@@ -12,7 +12,7 @@
 schema,
 table,SYM_FILE_TRIGGER
 keys,TRIGGER_ID
-columns,TRIGGER_ID,BASE_DIR,RECURSIVE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
+columns,TRIGGER_ID,BASE_DIR,RECURSIVE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
 sql,delete from sym_file_trigger
 catalog,
 schema,
Index: src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv
===================================================================
--- src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv	(revision 7748)
+++ src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv	(working copy)
@@ -12,7 +12,7 @@
 schema,
 table,SYM_FILE_TRIGGER
 keys,TRIGGER_ID
-columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
+columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
 sql,delete from sym_file_trigger
 catalog,
 schema,
Index: src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv
===================================================================
--- src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv	(revision 7748)
+++ src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv	(working copy)
@@ -12,7 +12,7 @@
 schema,
 table,sym_file_trigger
 keys,TRIGGER_ID
-columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
+columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
 sql,delete from sym_file_trigger
 catalog,
 schema,
Index: src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv
===================================================================
--- src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv	(revision 7748)
+++ src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv	(working copy)
@@ -12,7 +12,7 @@
 schema,
 table,SYM_FILE_TRIGGER
 keys,TRIGGER_ID
-columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
+columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
 sql,delete from sym_file_trigger
 catalog,
 schema,
@@ -127,11 +127,11 @@
 catalog,
 schema,
 table,SYM_FILE_TRIGGER
-insert,"all","target/fs_svr/all","1",,,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
-insert,"create_only","target/fs_svr/create_only","1",,,"1","0","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
-insert,"all_recursive","target/fs_svr/all_recursive","1",,,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
-insert,"csv_only","target/fs_svr/csv_only","1","*.txt",,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
-insert,"choose_target","target/fs_svr/choose_target","1",,,"1","1","1","
+insert,"all","target/fs_svr/all","1",,,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
+insert,"create_only","target/fs_svr/create_only","1",,,"1","0","0","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
+insert,"all_recursive","target/fs_svr/all_recursive","1",,,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
+insert,"csv_only","target/fs_svr/csv_only","1","*.txt",,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
+insert,"choose_target","target/fs_svr/choose_target","1",,,"1","1","1","0","
 a = new java.io.File(\"target/fs_clnt/choose_target/a\");
 b = new java.io.File(\"target/fs_clnt/choose_target/b\");
 if (org.apache.commons.io.FileUtils.sizeOfDirectory(a) > org.apache.commons.io.FileUtils.sizeOfDirectory(b)) {
@@ -140,10 +140,10 @@
    targetBaseDir = a.getAbsolutePath();
 }
 ",,"2013-05-19 10:14:04.830","unit_test","2013-05-19 10:14:04.830"
-insert,"client_src","target/fs_clnt/client_src","1",,,"1","1","1",,,"2013-05-19 13:56:18.456","unit_test","2013-05-19 13:56:18.456"
-insert,"ping_back_clnt","target/fs_clnt/ping_back","1",,,"1","1","1",,,"2013-05-19 13:58:24.779","unit_test","2013-05-19 13:58:24.779"
-insert,"ping_back_svr","target/fs_svr/ping_back","1",,,"1","1","1",,,"2013-05-19 13:59:05.366","unit_test","2013-05-19 13:59:05.366"
-insert,"test_change_filename","target/fs_svr/change_name","0","source.txt",,"1","1","1","targetFileName = \"target.txt\";",,"2013-07-11 14:25:27.690","admin","2013-07-11 14:25:27.690"
+insert,"client_src","target/fs_clnt/client_src","1",,,"1","1","1","0",,,"2013-05-19 13:56:18.456","unit_test","2013-05-19 13:56:18.456"
+insert,"ping_back_clnt","target/fs_clnt/ping_back","1",,,"1","1","1","0",,,"2013-05-19 13:58:24.779","unit_test","2013-05-19 13:58:24.779"
+insert,"ping_back_svr","target/fs_svr/ping_back","1",,,"1","1","1","0",,,"2013-05-19 13:59:05.366","unit_test","2013-05-19 13:59:05.366"
+insert,"test_change_filename","target/fs_svr/change_name","0","source.txt",,"1","1","1","0","targetFileName = \"target.txt\";",,"2013-07-11 14:25:27.690","admin","2013-07-11 14:25:27.690"
 catalog,
 schema,
 table,SYM_FILE_TRIGGER_ROUTER
Index: src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv
===================================================================
--- src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv	(revision 7748)
+++ src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv	(working copy)
@@ -12,7 +12,7 @@
 schema,
 table,SYM_FILE_TRIGGER
 keys,TRIGGER_ID
-columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
+columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
 sql,delete from sym_file_trigger
 catalog,
 schema,
#P symmetric-assemble
Index: src/docbook/file-sync.xml
===================================================================
--- src/docbook/file-sync.xml	(revision 7764)
+++ src/docbook/file-sync.xml	(working copy)
@@ -228,9 +228,9 @@
 
 <programlisting>INSERT INTO sym_file_trigger
   (trigger_id,base_dir,recurse,includes_files,excludes_files,sync_on_create,
-   sync_on_modified,sync_on_delete,before_copy_script,after_copy_script,
+   sync_on_modified,sync_on_delete,sync_on_ctl_file,delete_after_sync,before_copy_script,after_copy_script,
    create_time,last_update_by,last_update_time)
-VALUES ('sync_directory','/filesync/server/all',1,'*.txt',null,1,1,1,
+VALUES ('sync_directory','/filesync/server/all',1,'*.txt',null,1,1,1,0,0,
   'targetBaseDir = "/filesync/clients/" +
   engine.getParameterService().getExternalId();',null,current_timestamp,'example',
   current_timestamp);
@@ -272,10 +272,10 @@
 <![CDATA[
 INSERT INTO sym_file_trigger
   (trigger_id,base_dir,recurse,includes_files,excludes_files,sync_on_create,
-  sync_on_modified,sync_on_delete,before_copy_script,after_copy_script,create_time,
+  sync_on_modified,sync_on_delete,sync_on_ctl_file,delete_after_sync,before_copy_script,after_copy_script,create_time,
   last_update_by,last_update_time)
 VALUES
-  ('node_specific','/filesync/server/nodes',1,null,null,1,1,1,'',null,
+  ('node_specific','/filesync/server/nodes',1,null,null,1,1,1,0,0,'',null,
   current_timestamp,'example',current_timestamp);
 
 INSERT INTO sym_file_trigger_router
delete_after_sync.patch (32,251 bytes)   

nswendal

2013-12-11 13:57

reporter  

delete_after_sync_v2.patch (30,925 bytes)   
### Eclipse Workspace Patch 1.0
#P symmetric-core
Index: src/main/java/org/jumpmind/symmetric/service/impl/DataService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/DataService.java	(revision 7784)
+++ src/main/java/org/jumpmind/symmetric/service/impl/DataService.java	(working copy)
@@ -1359,7 +1359,21 @@
                         dataMapper, new Object[] { batch.getBatchId(), batch.getTargetNodeId() },
                         new int[] { Types.NUMERIC });
     }
-
+    
+    public ISqlReadCursor<Data> selectDataFor(Long batchId, String channelId) {
+        return sqlTemplate
+                .queryForCursor(getDataSelectByBatchSql(batchId, -1l, channelId),
+                        dataMapper, new Object[] { batchId },
+                        new int[] { Types.NUMERIC });
+    }
+    
+    protected String getDataSelectByBatchSql(long batchId, long startDataId, String channelId) {
+        String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : "";
+        return symmetricDialect.massageDataExtractionSql(
+                getSql("selectEventDataByBatchIdSql", startAtDataIdSql, " order by d.data_id asc"),
+                engine.getConfigurationService().getNodeChannel(channelId, false).getChannel());
+    }
+    
     protected String getDataSelectSql(long batchId, long startDataId, String channelId) {
         String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : "";
         return symmetricDialect.massageDataExtractionSql(
Index: src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java	(working copy)
@@ -298,7 +298,7 @@
                 nodeService, dataExtractorService, dataService, dataLoaderService,
                 transportManager, statisticManager, configurationService);
         this.acknowledgeService = new AcknowledgeService(parameterService, symmetricDialect,
-                outgoingBatchService, registrationService, stagingManager);
+                outgoingBatchService, registrationService, stagingManager, this);
         this.pushService = new PushService(parameterService, symmetricDialect,
                 dataExtractorService, acknowledgeService, transportManager, nodeService,
                 clusterService, nodeCommunicationService, statisticManager);
Index: src/main/java/org/jumpmind/symmetric/service/IDataService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/IDataService.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/IDataService.java	(working copy)
@@ -147,6 +147,8 @@
     
     public ISqlReadCursor<Data> selectDataFor(Batch batch);
     
+    public ISqlReadCursor<Data> selectDataFor(Long batchId, String channelId);
+    
     public List<IReloadListener> getReloadListeners();
         
 }
\ No newline at end of file
Index: src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java	(working copy)
@@ -44,7 +44,14 @@
                         + "  d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join   "
                         + "  $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id                                  "
                         + "  where o.batch_id = ? and o.node_id = ?                                                                                                                                    ");
-
+        
+        putSql("selectEventDataByBatchIdSql",
+                ""
+                        + "select d.data_id, d.table_name, d.event_type, d.row_data, d.pk_data, d.old_data,                                                                          "
+                        + "  d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join   "
+                        + "  $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id                                  "
+                        + "  where o.batch_id = ?    ");
+        
         putSql("selectEventDataIdsSql",
                 ""
                         + "select d.data_id from $(data) d inner join                                                                 "
Index: src/main/java/org/jumpmind/symmetric/model/FileTrigger.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/model/FileTrigger.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/model/FileTrigger.java	(working copy)
@@ -46,6 +46,8 @@
     private boolean syncOnCreate = true;
     private boolean syncOnModified = true;
     private boolean syncOnDelete = true;
+    private boolean syncOnCtlFile = false;
+    private boolean deleteAfterSync = false;
     private String beforeCopyScript;
     private String afterCopyScript;
     private Date createTime = new Date();
@@ -128,6 +130,22 @@
         this.syncOnDelete = syncOnDelete;
     }
 
+    public boolean isSyncOnCtlFile() {
+        return syncOnCtlFile;
+    }
+
+    public void setSyncOnCtlFile(boolean syncOnCtlFile) {
+        this.syncOnCtlFile = syncOnCtlFile;
+    }
+    
+    public boolean isDeleteAfterSync() {
+        return deleteAfterSync;
+    }
+
+    public void setDeleteAfterSync(boolean deleteAfterSync) {
+        this.deleteAfterSync = deleteAfterSync;
+    }
+    
     public Date getCreateTime() {
         return createTime;
     }
Index: src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java	(working copy)
@@ -69,5 +69,7 @@
     public RemoteNodeStatuses pushFilesToNodes(boolean force);
 
     public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node node, IOutgoingTransport outgoingTransport);
+    
+    public void acknowledgeFiles(OutgoingBatch outgoingBatch);
 
 }
Index: src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java	(working copy)
@@ -32,12 +32,13 @@
         // @formatter:off
 
         putSql("selectFileTriggersSql",
-                " select trigger_id, base_dir, recurse,                                     " +
+                " select trigger_id, base_dir, recurse,                                       " +
                 "        includes_files, excludes_files,                                      " +
         		"        sync_on_create, sync_on_modified, sync_on_delete,                    " +
+        		"        sync_on_ctl_file, delete_after_sync,                                 " +
                 "        before_copy_script,                                                  " +
                 "        after_copy_script,                                                   " +
-        		"        create_time, last_update_by,                                          " +
+        		"        create_time, last_update_by,                                         " +
         		"        last_update_time                                                     " +
         		" from $(file_trigger)                                                        ");
 
@@ -46,16 +47,18 @@
 
         putSql("updateFileTriggerSql",
                 " update $(file_trigger) set base_dir=?, recurse=?, includes_files=?,         " +
-        		"  excludes_files=?, sync_on_create=?, sync_on_modified=?, sync_on_delete=?,    " +
-                "  before_copy_script=?, after_copy_script=?,                                   " +
-        		"  last_update_by=?, last_update_time=? where trigger_id=?                      ");
+        		"  excludes_files=?, sync_on_create=?, sync_on_modified=?, sync_on_delete=?,  " +
+        		"  sync_on_ctl_file=?, delete_after_sync=?,                                   " +
+                "  before_copy_script=?, after_copy_script=?,                                 " +
+        		"  last_update_by=?, last_update_time=? where trigger_id=?                    ");
 
         putSql("insertFileTriggerSql",
                 " insert into $(file_trigger) (base_dir, recurse, includes_files,             " +
-                "  excludes_files, sync_on_create, sync_on_modified, sync_on_delete,            " +
-                "  before_copy_script, after_copy_script,                                       " +
-                "  last_update_by, last_update_time, trigger_id, create_time)                   " +
-                " values(?,?,?,?,?,?,?,?,?,?,?,?,?)                                             ");
+                "  excludes_files, sync_on_create, sync_on_modified, sync_on_delete,          " +
+                "  sync_on_ctl_file, delete_after_sync,                                       " +
+                "  before_copy_script, after_copy_script,                                     " +
+                "  last_update_by, last_update_time, trigger_id, create_time)                 " +
+                " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)                                       ");
 
         putSql("selectFileSnapshotSql",
                 " select trigger_id, router_id, relative_dir, file_name, last_event_type, crc32_checksum, " +
Index: src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java	(working copy)
@@ -36,6 +36,7 @@
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.filefilter.DirectoryFileFilter;
 import org.apache.commons.lang.StringUtils;
+import org.jumpmind.db.sql.ISqlReadCursor;
 import org.jumpmind.db.sql.ISqlRowMapper;
 import org.jumpmind.db.sql.ISqlTransaction;
 import org.jumpmind.db.sql.Row;
@@ -53,6 +54,7 @@
 import org.jumpmind.symmetric.model.FileConflictStrategy;
 import org.jumpmind.symmetric.model.FileSnapshot;
 import org.jumpmind.symmetric.model.FileSnapshot.LastEventType;
+import org.jumpmind.symmetric.model.Data;
 import org.jumpmind.symmetric.model.FileTrigger;
 import org.jumpmind.symmetric.model.FileTriggerRouter;
 import org.jumpmind.symmetric.model.IncomingBatch;
@@ -88,7 +90,7 @@
 
     private Object trackerLock = new Object();
     private ISymmetricEngine engine;
-
+    
     // TODO cache trigger routers
 
     public FileSyncService(ISymmetricEngine engine) {
@@ -182,7 +184,10 @@
                         fileTrigger.getIncludesFiles(), fileTrigger.getExcludesFiles(),
                         fileTrigger.isSyncOnCreate() ? 1 : 0,
                         fileTrigger.isSyncOnModified() ? 1 : 0,
-                        fileTrigger.isSyncOnDelete() ? 1 : 0, fileTrigger.getBeforeCopyScript(),
+                        fileTrigger.isSyncOnDelete() ? 1 : 0, 
+                        fileTrigger.isSyncOnCtlFile() ? 1 : 0,
+                        fileTrigger.isDeleteAfterSync() ? 1 : 0,
+                        fileTrigger.getBeforeCopyScript(),
                         fileTrigger.getAfterCopyScript(), fileTrigger.getLastUpdateBy(),
                         fileTrigger.getLastUpdateTime(), fileTrigger.getTriggerId() }, new int[] {
                         Types.VARCHAR, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR,
@@ -195,6 +200,8 @@
                             fileTrigger.isSyncOnCreate() ? 1 : 0,
                             fileTrigger.isSyncOnModified() ? 1 : 0,
                             fileTrigger.isSyncOnDelete() ? 1 : 0,
+                            fileTrigger.isSyncOnCtlFile() ? 1 : 0,
+                            fileTrigger.isDeleteAfterSync() ? 1 : 0,        
                             fileTrigger.getBeforeCopyScript(), fileTrigger.getAfterCopyScript(),
                             fileTrigger.getLastUpdateBy(), fileTrigger.getLastUpdateTime(),
                             fileTrigger.getTriggerId(), fileTrigger.getCreateTime() }, new int[] {
@@ -437,6 +444,53 @@
             }
         }
     }
+    
+    public void acknowledgeFiles(OutgoingBatch outgoingBatch){
+        log.debug("Acknowledging file_sync outgoing batch.");
+        ISqlReadCursor<Data> cursor = engine.getDataService().selectDataFor(outgoingBatch.getBatchId(), outgoingBatch.getChannelId());
+        Data data = null;
+        List <File> filesToDelete = new ArrayList<File>();
+        for (int i = 0; i < outgoingBatch.getInsertEventCount(); i++) {
+            data = cursor.next();
+            if (data != null) {
+                String[] rowData = data.toParsedRowData();
+                //TODO: this method of getting the FileSnapshot data is not ideal. It would be better to map it to the columns in the table 
+                FileSnapshot fileSnapshot = new FileSnapshot();
+                fileSnapshot.setTriggerId(rowData[0]);
+                fileSnapshot.setRouterId(rowData[1]);
+                fileSnapshot.setRelativeDir(rowData[2]);
+                fileSnapshot.setFileName(rowData[3]);
+                fileSnapshot.setLastEventType(LastEventType.fromCode(rowData[4]));
+                
+                FileTriggerRouter triggerRouter = this.getFileTriggerRouter(
+                        fileSnapshot.getTriggerId(), fileSnapshot.getRouterId());
+                if (triggerRouter != null) {
+                    FileTrigger fileTrigger = triggerRouter.getFileTrigger();
+                    
+                    if(fileTrigger.isDeleteAfterSync()) {
+                        File file = fileTrigger.createSourceFile(fileSnapshot);
+                        if (!file.isDirectory()) {
+                            filesToDelete.add(file);
+                            if(fileTrigger.isSyncOnCtlFile()) {
+                                filesToDelete.add(new File(file.getAbsolutePath() + ".ctl"));
+                            }
+                        }
+                    }
+                }
+            }   
+        }
+        
+        if (filesToDelete != null && filesToDelete.size() > 0) {
+            for (File file : filesToDelete) {
+                if (file != null && file.exists()) {
+                    log.debug("Deleting file: " + file.getAbsolutePath());
+                    file.delete();
+                }
+                file = null;
+            }
+            filesToDelete = null;
+        }
+    }
 
     public void loadFilesFromPush(String nodeId, InputStream in, OutputStream out) {
         INodeService nodeService = engine.getNodeService();
@@ -721,6 +775,8 @@
             fileTrigger.setAfterCopyScript(rs.getString("after_copy_script"));
             fileTrigger.setBeforeCopyScript(rs.getString("before_copy_script"));
             fileTrigger.setSyncOnModified(rs.getBoolean("sync_on_modified"));
+            fileTrigger.setSyncOnCtlFile(rs.getBoolean("sync_on_ctl_file"));
+            fileTrigger.setDeleteAfterSync(rs.getBoolean("delete_after_sync"));
             fileTrigger.setTriggerId(rs.getString("trigger_id"));
             return fileTrigger;
         }
@@ -761,7 +817,6 @@
             fileSnapshot.setTriggerId(rs.getString("trigger_id"));
             fileSnapshot.setRouterId(rs.getString("router_id"));
             return fileSnapshot;
-        }
+        }        
     }
-
 }
Index: src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java	(working copy)
@@ -24,6 +24,7 @@
 import java.util.List;
 
 import org.jumpmind.db.sql.mapper.NumberMapper;
+import org.jumpmind.symmetric.ISymmetricEngine;
 import org.jumpmind.symmetric.common.Constants;
 import org.jumpmind.symmetric.db.ISymmetricDialect;
 import org.jumpmind.symmetric.io.stage.IStagedResource;
@@ -51,14 +52,17 @@
     private IRegistrationService registrationService;
 
     private IStagingManager stagingManger;
+    
+    private ISymmetricEngine engine;
 
     public AcknowledgeService(IParameterService parameterService,
             ISymmetricDialect symmetricDialect, IOutgoingBatchService outgoingBatchService,
-            IRegistrationService registrationService, IStagingManager stagingManager) {
+            IRegistrationService registrationService, IStagingManager stagingManager, ISymmetricEngine engine) {
         super(parameterService, symmetricDialect);
         this.outgoingBatchService = outgoingBatchService;
         this.registrationService = registrationService;
         this.stagingManger = stagingManager;
+        this.engine = engine;
         setSqlMap(new AcknowledgeServiceSqlMap(symmetricDialect.getPlatform(),
                 createSqlReplacementTokens()));
     }
@@ -128,6 +132,12 @@
 
                 //TODO: I should really be able to catch errors here, but can't do to how this is coded
                 outgoingBatchService.updateOutgoingBatch(outgoingBatch);
+                if (status == Status.OK) {
+                    if (outgoingBatch.getChannelId().equals(Constants.CHANNEL_FILESYNC)){
+                        //Acknowledge the file_sync in case the file needs deleted.
+                        engine.getFileSyncService().acknowledgeFiles(outgoingBatch);
+                    }
+                }
             } else {
                 log.error("Could not find batch {}-{} to acknowledge as {}", new Object[] {batch.getNodeId(), batch.getBatchId(),
                         status.name()});
Index: src/main/resources/symmetric-schema.xml
===================================================================
--- src/main/resources/symmetric-schema.xml	(revision 7757)
+++ src/main/resources/symmetric-schema.xml	(working copy)
@@ -696,6 +696,8 @@
         <column name="sync_on_create" type="BOOLEANINT" size="1" required="true" default="1"  description="Whether to capture and send files when they are created." />
         <column name="sync_on_modified" type="BOOLEANINT" size="1" required="true" default="1"  description="Whether to capture and send files when they are modified." />
         <column name="sync_on_delete" type="BOOLEANINT" size="1" required="true" default="1"  description="Whether to capture and remove files when they are deleted." />
+        <column name="sync_on_ctl_file" type="BOOLEANINT" size="1" required="true" default="0"  description="Combined with sync_on_create, determines whether to capture and send files when a matching control file exists." />
+        <column name="delete_after_sync" type="BOOLEANINT" size="1" required="true" default="0"  description="Determines whether to delete the file after it has synced successfully." />
         <column name="before_copy_script" type="LONGVARCHAR" description="A bsh script that is run right before the file copy." />
         <column name="after_copy_script" type="LONGVARCHAR" description="A bsh script that is run right after the file copy." />
         <column name="create_time" type="TIMESTAMP" required="true"  description="Timestamp of when this entry was created." />
Index: src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java
===================================================================
--- src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java	(revision 7757)
+++ src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java	(working copy)
@@ -115,12 +115,27 @@
         }
 
         public void onFileCreate(File file) {
-            if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCreate()) {
-                log.debug("File create detected: {}", file.getAbsolutePath());
-                this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file,
-                        LastEventType.CREATE));
+            if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()){
+                onCtlFile(file);
+            } else {
+                if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCreate()) {
+                    log.debug("File create detected: {}", file.getAbsolutePath());
+                    this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file,
+                            LastEventType.CREATE));
+                }
             }
         }
+        
+        public void onCtlFile(File file) {
+          if (snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnCtlFile()){
+              File ctlFile = new File(file.getAbsolutePath() + ".ctl");
+              if (ctlFile.exists()) {
+                  log.debug("Control file detected: {}", file.getAbsolutePath());
+                  this.snapshot.add(new FileSnapshot(snapshot.getFileTriggerRouter(), file,
+                          LastEventType.CREATE));
+              }
+          }
+      }
 
         public void onFileChange(File file) {
             if (populateAll || snapshot.getFileTriggerRouter().getFileTrigger().isSyncOnModified()) {
#P symmetric-server
Index: src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv
===================================================================
--- src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv	(revision 7748)
+++ src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv	(working copy)
@@ -12,7 +12,7 @@
 schema,
 table,SYM_FILE_TRIGGER
 keys,TRIGGER_ID
-columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
+columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
 sql,delete from sym_file_trigger
 catalog,
 schema,
Index: src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv
===================================================================
--- src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv	(revision 7748)
+++ src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv	(working copy)
@@ -12,7 +12,7 @@
 schema,
 table,SYM_FILE_TRIGGER
 keys,TRIGGER_ID
-columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
+columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
 sql,delete from sym_file_trigger
 catalog,
 schema,
@@ -127,11 +127,11 @@
 catalog,
 schema,
 table,SYM_FILE_TRIGGER
-insert,"all","target/fs_svr/all","1",,,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
-insert,"create_only","target/fs_svr/create_only","1",,,"1","0","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
-insert,"all_recursive","target/fs_svr/all_recursive","1",,,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
-insert,"csv_only","target/fs_svr/csv_only","1","*.txt",,"1","1","1",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
-insert,"choose_target","target/fs_svr/choose_target","1",,,"1","1","1","
+insert,"all","target/fs_svr/all","1",,,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
+insert,"create_only","target/fs_svr/create_only","1",,,"1","0","0","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
+insert,"all_recursive","target/fs_svr/all_recursive","1",,,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
+insert,"csv_only","target/fs_svr/csv_only","1","*.txt",,"1","1","1","0",,,"2013-05-19 10:13:26.966","unit_test","2013-05-19 10:13:26.966"
+insert,"choose_target","target/fs_svr/choose_target","1",,,"1","1","1","0","
 a = new java.io.File(\"target/fs_clnt/choose_target/a\");
 b = new java.io.File(\"target/fs_clnt/choose_target/b\");
 if (org.apache.commons.io.FileUtils.sizeOfDirectory(a) > org.apache.commons.io.FileUtils.sizeOfDirectory(b)) {
@@ -140,10 +140,10 @@
    targetBaseDir = a.getAbsolutePath();
 }
 ",,"2013-05-19 10:14:04.830","unit_test","2013-05-19 10:14:04.830"
-insert,"client_src","target/fs_clnt/client_src","1",,,"1","1","1",,,"2013-05-19 13:56:18.456","unit_test","2013-05-19 13:56:18.456"
-insert,"ping_back_clnt","target/fs_clnt/ping_back","1",,,"1","1","1",,,"2013-05-19 13:58:24.779","unit_test","2013-05-19 13:58:24.779"
-insert,"ping_back_svr","target/fs_svr/ping_back","1",,,"1","1","1",,,"2013-05-19 13:59:05.366","unit_test","2013-05-19 13:59:05.366"
-insert,"test_change_filename","target/fs_svr/change_name","0","source.txt",,"1","1","1","targetFileName = \"target.txt\";",,"2013-07-11 14:25:27.690","admin","2013-07-11 14:25:27.690"
+insert,"client_src","target/fs_clnt/client_src","1",,,"1","1","1","0",,,"2013-05-19 13:56:18.456","unit_test","2013-05-19 13:56:18.456"
+insert,"ping_back_clnt","target/fs_clnt/ping_back","1",,,"1","1","1","0",,,"2013-05-19 13:58:24.779","unit_test","2013-05-19 13:58:24.779"
+insert,"ping_back_svr","target/fs_svr/ping_back","1",,,"1","1","1","0",,,"2013-05-19 13:59:05.366","unit_test","2013-05-19 13:59:05.366"
+insert,"test_change_filename","target/fs_svr/change_name","0","source.txt",,"1","1","1","0","targetFileName = \"target.txt\";",,"2013-07-11 14:25:27.690","admin","2013-07-11 14:25:27.690"
 catalog,
 schema,
 table,SYM_FILE_TRIGGER_ROUTER
Index: src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv
===================================================================
--- src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv	(revision 7748)
+++ src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv	(working copy)
@@ -12,7 +12,7 @@
 schema,
 table,SYM_FILE_TRIGGER
 keys,TRIGGER_ID
-columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
+columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
 sql,delete from sym_file_trigger
 catalog,
 schema,
Index: src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv
===================================================================
--- src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv	(revision 7748)
+++ src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv	(working copy)
@@ -12,7 +12,7 @@
 schema,
 table,SYM_FILE_TRIGGER
 keys,TRIGGER_ID
-columns,TRIGGER_ID,BASE_DIR,RECURSIVE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
+columns,TRIGGER_ID,BASE_DIR,RECURSIVE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
 sql,delete from sym_file_trigger
 catalog,
 schema,
Index: src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv
===================================================================
--- src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv	(revision 7748)
+++ src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv	(working copy)
@@ -12,7 +12,7 @@
 schema,
 table,sym_file_trigger
 keys,TRIGGER_ID
-columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
+columns,TRIGGER_ID,BASE_DIR,RECURSE,INCLUDES_FILES,EXCLUDES_FILES,SYNC_ON_CREATE,SYNC_ON_MODIFIED,SYNC_ON_DELETE,SYNC_ON_CTL_FILE,DELETE_AFTER_SYNC,BEFORE_COPY_SCRIPT,AFTER_COPY_SCRIPT,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
 sql,delete from sym_file_trigger
 catalog,
 schema,
#P symmetric-assemble
Index: src/docbook/file-sync.xml
===================================================================
--- src/docbook/file-sync.xml	(revision 7764)
+++ src/docbook/file-sync.xml	(working copy)
@@ -228,9 +228,9 @@
 
 <programlisting>INSERT INTO sym_file_trigger
   (trigger_id,base_dir,recurse,includes_files,excludes_files,sync_on_create,
-   sync_on_modified,sync_on_delete,before_copy_script,after_copy_script,
+   sync_on_modified,sync_on_delete,sync_on_ctl_file,delete_after_sync,before_copy_script,after_copy_script,
    create_time,last_update_by,last_update_time)
-VALUES ('sync_directory','/filesync/server/all',1,'*.txt',null,1,1,1,
+VALUES ('sync_directory','/filesync/server/all',1,'*.txt',null,1,1,1,0,0,
   'targetBaseDir = "/filesync/clients/" +
   engine.getParameterService().getExternalId();',null,current_timestamp,'example',
   current_timestamp);
@@ -272,10 +272,10 @@
 <![CDATA[
 INSERT INTO sym_file_trigger
   (trigger_id,base_dir,recurse,includes_files,excludes_files,sync_on_create,
-  sync_on_modified,sync_on_delete,before_copy_script,after_copy_script,create_time,
+  sync_on_modified,sync_on_delete,sync_on_ctl_file,delete_after_sync,before_copy_script,after_copy_script,create_time,
   last_update_by,last_update_time)
 VALUES
-  ('node_specific','/filesync/server/nodes',1,null,null,1,1,1,'',null,
+  ('node_specific','/filesync/server/nodes',1,null,null,1,1,1,0,0,'',null,
   current_timestamp,'example',current_timestamp);
 
 INSERT INTO sym_file_trigger_router
delete_after_sync_v2.patch (30,925 bytes)   

Related Changesets

SymmetricDS: master a8bcad15

2014-02-17 21:11:10

chenson

Details Diff
0001485: Create a delete_after_sync function for file sync
0001441: Add ability to sync on control file
Affected Issues
0001485
mod - symmetric-assemble/src/docbook/file-sync.xml Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/file/FileTriggerTracker.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/model/FileTrigger.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java Diff File
mod - symmetric-core/src/main/resources/symmetric-schema.xml Diff File
mod - symmetric-server/src/test/resources/org/jumpmind/symmetric/test/ConflictResolutionTest.csv Diff File
mod - symmetric-server/src/test/resources/org/jumpmind/symmetric/test/FileSyncTest.csv Diff File
mod - symmetric-server/src/test/resources/org/jumpmind/symmetric/test/NonDmlEventsTest.csv Diff File
mod - symmetric-server/src/test/resources/org/jumpmind/symmetric/test/RestServiceTest.csv Diff File
mod - symmetric-server/src/test/resources/org/jumpmind/symmetric/test/WildcardTest.csv Diff File

Issue History

Date Modified Username Field Change
2013-12-11 04:02 nswendal New Issue
2013-12-11 04:02 nswendal File Added: delete_after_sync.patch
2013-12-11 13:57 nswendal File Added: delete_after_sync_v2.patch
2014-02-18 01:38 chenson Assigned To => chenson
2014-02-18 01:38 chenson Status new => assigned
2014-02-18 01:38 chenson Target Version => 3.6.0
2014-02-18 02:12 chenson Status assigned => resolved
2014-02-18 02:12 chenson Fixed in Version => 3.6.0
2014-02-18 02:12 chenson Resolution open => fixed
2014-02-18 03:00 Changeset attached => SymmetricDS trunk r7958
2014-06-17 00:27 chenson Status resolved => closed
2015-07-31 01:49 chenson Changeset attached => SymmetricDS master a8bcad15