View Issue Details

IDProjectCategoryView StatusLast Update
0002375SymmetricDSNew Featurepublic2015-10-02 10:23
ReportermscAssigned Tochenson 
PrioritynormalSeverityminorReproducibilityalways
Status closedResolutionfixed 
Product Version3.7.20 
Target Version3.7.22Fixed in Version3.7.22 
Summary0002375: LoadFilter with SQL-Script support
Descriptionadditional to BSH and JAVA LoadFilter script you can execute a sql-statement with access to :COLUMN and :OLD_COLUMN values (for BEFORE/AFTER/ERROR-scripts only).
TagsNo tags attached.

Activities

msc

2015-08-25 04:49

reporter  

0001-LoadFilter-with-SQL-Script-support-currently-only-SQ.patch (10,088 bytes)
From 4a7c2f0307d2f29ec775b85a41aa69c3bf1c0780 Mon Sep 17 00:00:00 2001
From: Markus Schulz <msc@onesty-tech.de>
Date: Tue, 25 Aug 2015 10:40:41 +0200
Subject: [PATCH] LoadFilter with SQL-Script support (currently only
 SQL-PreparedStatement with COLUMN/OLD_COLUMN variables)

---
 .../asciidoc/configuration/load-filters/scripts.ad |   18 +--
 .../load/DynamicDatabaseWriterFilter.java          |    2 +
 .../symmetric/load/SQLDatabaseWriterFilter.java    |  163 ++++++++++++++++++++
 .../org/jumpmind/symmetric/model/LoadFilter.java   |    2 +-
 4 files changed, 175 insertions(+), 10 deletions(-)
 create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java

diff --git a/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad b/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad
index 84430fb..b76b023 100644
--- a/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad
+++ b/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad
@@ -29,15 +29,15 @@ Handle Error Script:: A script to execute if data cannot be processed.
 .Variables available within scripts
 [cols="3,^1,^1,5"]
 |===
-|Variable|BSH|JAVA|Description
-
-|engine|X||The Symmetric engine object.
-|COLUMN_NAME|X||The source values for the row being inserted, updated or deleted.
-|OLD_COLUMN_NAME|X||The old values for the row being inserted, updated or deleted.
-|context|X|X|The data context object for the data being inserted, updated or deleted. .
-|table|X|X|The table object for the table being inserted, updated or deleted.
-|data|X|X|The `CsvData` object for the data change.
-|error|X|X|`java.lang.Exception`
+|Variable|BSH|SQL|JAVA|Description
+
+|engine|X|||The Symmetric engine object.
+|COLUMN_NAME|X|X||The source values for the row being inserted, updated or deleted.
+|OLD_COLUMN_NAME|X|X||The old values for the row being inserted, updated or deleted.
+|context|X||X|The data context object for the data being inserted, updated or deleted. .
+|table|X||X|The table object for the table being inserted, updated or deleted.
+|data|X||X|The `CsvData` object for the data change.
+|error|X||X|`java.lang.Exception`
 
 |===
 
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java
index 97684f9..bc7e269 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java
@@ -77,6 +77,8 @@ public abstract class DynamicDatabaseWriterFilter implements IDatabaseWriterFilt
                     databaseWriterFilters.add(new BshDatabaseWriterFilter(engine, entry.getValue()));
                 } else if (entry.getKey().equals(LoadFilterType.JAVA)) {
                     databaseWriterFilters.add(new JavaDatabaseWriterFilter(engine, entry.getValue()));
+                } else if (entry.getKey().equals(LoadFilterType.SQL)) {
+                    databaseWriterFilters.add(new SQLDatabaseWriterFilter(engine, entry.getValue()));
                 }
             }
         }
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java
new file mode 100644
index 0000000..33053c4
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java
@@ -0,0 +1,163 @@
+package org.jumpmind.symmetric.load;
+
+import bsh.TargetError;
+import org.jumpmind.db.model.Table;
+import org.jumpmind.db.sql.*;
+import org.jumpmind.symmetric.*;
+import org.jumpmind.symmetric.common.Constants;
+import org.jumpmind.symmetric.io.data.*;
+import org.jumpmind.symmetric.model.*;
+import org.jumpmind.util.*;
+
+import java.util.*;
+
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+
+/**
+ * User: Markus Schulz <msc@antzsystem.de>
+ * Date: 24.08.15
+ * Time: 10:53
+ */
+public class SQLDatabaseWriterFilter extends DynamicDatabaseWriterFilter {
+
+	protected static final ISqlRowMapper<Boolean> lookupColumnRowMapper = new ISqlRowMapper<Boolean>() {
+		@Override
+		public Boolean mapRow(Row row) {
+			return Boolean.TRUE.equals(row.values().iterator().next());
+		}
+	};
+
+	private static final String OLD_ = "OLD_";
+
+	public SQLDatabaseWriterFilter(ISymmetricEngine engine, Map<String, List<LoadFilter>> loadFilters) {
+		super(engine, loadFilters);
+	}
+
+	@Override
+	protected boolean processLoadFilters(DataContext context, Table table, CsvData data, Exception error,
+		WriteMethod writeMethod, List<LoadFilter> loadFiltersForTable) {
+
+		boolean writeRow = true;
+		LoadFilter currentFilter = null;
+		List<Boolean> values = null;
+		try {
+			LinkedCaseInsensitiveMap<Object> namedParams = null;
+			for (LoadFilter filter : loadFiltersForTable) {
+				currentFilter = filter;
+				values = null;
+				if (filter.isFilterOnDelete() && data.getDataEventType().equals(DataEventType.DELETE)
+					|| filter.isFilterOnInsert() && data.getDataEventType().equals(DataEventType.INSERT)
+					|| filter.isFilterOnUpdate() && data.getDataEventType().equals(DataEventType.UPDATE)) {
+					String sql = null;
+					if (writeMethod.equals(WriteMethod.BEFORE_WRITE) && filter.getBeforeWriteScript() != null) {
+						sql = doTokenReplacementOnSql(context, filter.getBeforeWriteScript());
+					}
+					else if (writeMethod.equals(WriteMethod.AFTER_WRITE) && filter.getAfterWriteScript() != null) {
+						sql = doTokenReplacementOnSql(context, filter.getAfterWriteScript());
+					}
+					else if (writeMethod.equals(WriteMethod.HANDLE_ERROR) && filter.getHandleErrorScript() != null) {
+						sql = doTokenReplacementOnSql(context, filter.getHandleErrorScript());
+					}
+					if (sql != null && !sql.trim().isEmpty()) {
+						if (namedParams == null) {
+							namedParams = getVariablesMap(table, data);
+						}
+						ISqlTransaction transaction = context.findTransaction();
+						values = transaction.query(sql, lookupColumnRowMapper, namedParams);
+					}
+
+					if (values != null && values.size() > 0) {
+						writeRow = values.get(0);
+					}
+				}
+			}
+		}
+		catch (Exception ex) {
+			processError(currentFilter, table, ex);
+		}
+		return writeRow;
+	}
+
+	private LinkedCaseInsensitiveMap<Object> getVariablesMap(Table table, CsvData data) {
+		LinkedCaseInsensitiveMap<Object> namedParams = new LinkedCaseInsensitiveMap<Object>();
+		if (data != null) {
+			Map<String, String> sourceValues = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
+			if (sourceValues.size() > 0) {
+				for (String columnName : sourceValues.keySet()) {
+					namedParams.put(columnName, sourceValues.get(columnName));
+					namedParams.put(columnName.toUpperCase(), sourceValues.get(columnName));
+				}
+			}
+			else {
+				Map<String, String> pkValues = data.toColumnNameValuePairs(
+					table.getPrimaryKeyColumnNames(), CsvData.PK_DATA);
+				for (String columnName : pkValues.keySet()) {
+					namedParams.put(columnName, pkValues.get(columnName));
+					namedParams.put(columnName.toUpperCase(), pkValues.get(columnName));
+				}
+			}
+
+			Map<String, String> oldValues = data.toColumnNameValuePairs(table.getColumnNames(),
+				CsvData.OLD_DATA);
+			for (String columnName : oldValues.keySet()) {
+				namedParams.put(OLD_ + columnName, sourceValues.get(columnName));
+				namedParams.put(OLD_ + columnName.toUpperCase(), sourceValues.get(columnName));
+			}
+		}
+		return namedParams;
+	}
+
+	@Override
+	protected void executeScripts(DataContext context, String key, Set<String> scripts, boolean isFailOnError) {
+		if (scripts != null) {
+			try {
+				ISqlTransaction transaction = context.findTransaction();
+				for (String script : scripts) {
+					String sql = doTokenReplacementOnSql(context, script);
+					transaction.query(sql, lookupColumnRowMapper, null);
+				}
+			}
+			catch (Exception e) {
+				if (isFailOnError) {
+					throw (RuntimeException) e;
+				}
+				else {
+					log.error("Failed while executing sql script", e);
+				}
+			}
+		}
+	}
+
+	protected String doTokenReplacementOnSql(DataContext context, String sql) {
+		if (isNotBlank(sql)) {
+			Data csvData = (Data) context.get(Constants.DATA_CONTEXT_CURRENT_CSV_DATA);
+
+			if (csvData != null && csvData.getTriggerHistory() != null) {
+				sql = FormatUtils
+					.replaceToken(sql, "sourceCatalogName", csvData.getTriggerHistory().getSourceCatalogName(), true);
+			}
+
+			if (csvData != null && csvData.getTriggerHistory() != null) {
+				sql = FormatUtils
+					.replaceToken(sql, "sourceSchemaName", csvData.getTriggerHistory().getSourceSchemaName(), true);
+			}
+		}
+		return sql;
+	}
+
+
+	protected void processError(LoadFilter currentFilter, Table table, Throwable ex) {
+		if (ex instanceof TargetError) {
+			ex = ((TargetError) ex).getTarget();
+		}
+		String formattedMessage = String
+			.format("Error executing sql script for load filter %s on table %s. The error was: %s",
+				new Object[]{currentFilter != null ? currentFilter.getLoadFilterId() : "N/A", table.getName(),
+								 ex.getMessage()});
+		log.error(formattedMessage);
+		if (currentFilter.isFailOnError()) {
+			throw new SymmetricException(formattedMessage, ex);
+		}
+	}
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java
index 0647c24..be9940b 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java
@@ -32,7 +32,7 @@ public class LoadFilter implements Serializable {
 
     static final Logger logger = LoggerFactory.getLogger(LoadFilter.class);
     
-    public enum LoadFilterType { BSH, JAVA };       
+    public enum LoadFilterType { BSH, JAVA, SQL };
 
     private String loadFilterId;
     
-- 
1.7.10.4

msc

2015-08-25 05:20

reporter   ~0000713

GitHub Patch: https://github.com/NiasSt90/symmetric-ds/commit/716853ce8fb83d317112ba4c016a9c15d57b1f64

Related Changesets

SymmetricDS: 3.7 716853ce

2015-08-25 04:40:41

msc

Details Diff
0002375: LoadFilter with SQL-Script support
currently only SQL-PreparedStatement with COLUMN/OLD_COLUMN variables support

0002375
mod - symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java Diff File
add - symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java Diff File

SymmetricDS: 3.7 00246c88

2015-09-25 10:46:56

chenson

Details Diff
0002375: LoadFilter with SQL-Script support
0002375
mod - symmetric-assemble/src/asciidoc/configuration/load-filters.ad Diff File
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java Diff File
mod - symmetric-core/src/main/resources/symmetric-schema.xml Diff File

Issue History

Date Modified Username Field Change
2015-08-25 04:49 msc New Issue
2015-08-25 04:49 msc File Added: 0001-LoadFilter-with-SQL-Script-support-currently-only-SQ.patch
2015-08-25 05:20 msc Note Added: 0000713
2015-09-11 16:14 chenson Target Version => 3.7.22
2015-09-24 11:34 chenson Fixed in Version => 3.7.22
2015-09-24 15:00 msc Changeset attached => SymmetricDS 3.7 716853ce
2015-09-25 10:47 chenson Status new => resolved
2015-09-25 10:47 chenson Resolution open => fixed
2015-09-25 10:47 chenson Assigned To => chenson
2015-09-25 11:00 chenson Changeset attached => SymmetricDS 3.7 00246c88
2015-10-02 10:23 chenson Status resolved => closed