View Issue Details
ID | Project | Category | View Status | Date Submitted | Last Update |
---|---|---|---|---|---|
0002375 | SymmetricDS | New Feature | public | 2015-08-25 08:49 | 2015-10-02 14:23 |
Reporter | msc | Assigned To | chenson | ||
Priority | normal | ||||
Status | closed | Resolution | fixed | ||
Product Version | 3.7.20 | ||||
Target Version | 3.7.22 | Fixed in Version | 3.7.22 | ||
Summary | 0002375: LoadFilter with SQL-Script support | ||||
Description | additional to BSH and JAVA LoadFilter script you can execute a sql-statement with access to :COLUMN and :OLD_COLUMN values (for BEFORE/AFTER/ERROR-scripts only). | ||||
Tags | No tags attached. | ||||
|
0001-LoadFilter-with-SQL-Script-support-currently-only-SQ.patch (10,088 bytes)
From 4a7c2f0307d2f29ec775b85a41aa69c3bf1c0780 Mon Sep 17 00:00:00 2001 From: Markus Schulz <msc@onesty-tech.de> Date: Tue, 25 Aug 2015 10:40:41 +0200 Subject: [PATCH] LoadFilter with SQL-Script support (currently only SQL-PreparedStatement with COLUMN/OLD_COLUMN variables) --- .../asciidoc/configuration/load-filters/scripts.ad | 18 +-- .../load/DynamicDatabaseWriterFilter.java | 2 + .../symmetric/load/SQLDatabaseWriterFilter.java | 163 ++++++++++++++++++++ .../org/jumpmind/symmetric/model/LoadFilter.java | 2 +- 4 files changed, 175 insertions(+), 10 deletions(-) create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java diff --git a/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad b/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad index 84430fb..b76b023 100644 --- a/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad +++ b/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad @@ -29,15 +29,15 @@ Handle Error Script:: A script to execute if data cannot be processed. .Variables available within scripts [cols="3,^1,^1,5"] |=== -|Variable|BSH|JAVA|Description - -|engine|X||The Symmetric engine object. -|COLUMN_NAME|X||The source values for the row being inserted, updated or deleted. -|OLD_COLUMN_NAME|X||The old values for the row being inserted, updated or deleted. -|context|X|X|The data context object for the data being inserted, updated or deleted. . -|table|X|X|The table object for the table being inserted, updated or deleted. -|data|X|X|The `CsvData` object for the data change. -|error|X|X|`java.lang.Exception` +|Variable|BSH|SQL|JAVA|Description + +|engine|X|||The Symmetric engine object. +|COLUMN_NAME|X|X||The source values for the row being inserted, updated or deleted. +|OLD_COLUMN_NAME|X|X||The old values for the row being inserted, updated or deleted. +|context|X||X|The data context object for the data being inserted, updated or deleted. . +|table|X||X|The table object for the table being inserted, updated or deleted. +|data|X||X|The `CsvData` object for the data change. +|error|X||X|`java.lang.Exception` |=== diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java index 97684f9..bc7e269 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java @@ -77,6 +77,8 @@ public abstract class DynamicDatabaseWriterFilter implements IDatabaseWriterFilt databaseWriterFilters.add(new BshDatabaseWriterFilter(engine, entry.getValue())); } else if (entry.getKey().equals(LoadFilterType.JAVA)) { databaseWriterFilters.add(new JavaDatabaseWriterFilter(engine, entry.getValue())); + } else if (entry.getKey().equals(LoadFilterType.SQL)) { + databaseWriterFilters.add(new SQLDatabaseWriterFilter(engine, entry.getValue())); } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java new file mode 100644 index 0000000..33053c4 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java @@ -0,0 +1,163 @@ +package org.jumpmind.symmetric.load; + +import bsh.TargetError; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.sql.*; +import org.jumpmind.symmetric.*; +import org.jumpmind.symmetric.common.Constants; +import org.jumpmind.symmetric.io.data.*; +import org.jumpmind.symmetric.model.*; +import org.jumpmind.util.*; + +import java.util.*; + +import static org.apache.commons.lang.StringUtils.isNotBlank; + +/** + * User: Markus Schulz <msc@antzsystem.de> + * Date: 24.08.15 + * Time: 10:53 + */ +public class SQLDatabaseWriterFilter extends DynamicDatabaseWriterFilter { + + protected static final ISqlRowMapper<Boolean> lookupColumnRowMapper = new ISqlRowMapper<Boolean>() { + @Override + public Boolean mapRow(Row row) { + return Boolean.TRUE.equals(row.values().iterator().next()); + } + }; + + private static final String OLD_ = "OLD_"; + + public SQLDatabaseWriterFilter(ISymmetricEngine engine, Map<String, List<LoadFilter>> loadFilters) { + super(engine, loadFilters); + } + + @Override + protected boolean processLoadFilters(DataContext context, Table table, CsvData data, Exception error, + WriteMethod writeMethod, List<LoadFilter> loadFiltersForTable) { + + boolean writeRow = true; + LoadFilter currentFilter = null; + List<Boolean> values = null; + try { + LinkedCaseInsensitiveMap<Object> namedParams = null; + for (LoadFilter filter : loadFiltersForTable) { + currentFilter = filter; + values = null; + if (filter.isFilterOnDelete() && data.getDataEventType().equals(DataEventType.DELETE) + || filter.isFilterOnInsert() && data.getDataEventType().equals(DataEventType.INSERT) + || filter.isFilterOnUpdate() && data.getDataEventType().equals(DataEventType.UPDATE)) { + String sql = null; + if (writeMethod.equals(WriteMethod.BEFORE_WRITE) && filter.getBeforeWriteScript() != null) { + sql = doTokenReplacementOnSql(context, filter.getBeforeWriteScript()); + } + else if (writeMethod.equals(WriteMethod.AFTER_WRITE) && filter.getAfterWriteScript() != null) { + sql = doTokenReplacementOnSql(context, filter.getAfterWriteScript()); + } + else if (writeMethod.equals(WriteMethod.HANDLE_ERROR) && filter.getHandleErrorScript() != null) { + sql = doTokenReplacementOnSql(context, filter.getHandleErrorScript()); + } + if (sql != null && !sql.trim().isEmpty()) { + if (namedParams == null) { + namedParams = getVariablesMap(table, data); + } + ISqlTransaction transaction = context.findTransaction(); + values = transaction.query(sql, lookupColumnRowMapper, namedParams); + } + + if (values != null && values.size() > 0) { + writeRow = values.get(0); + } + } + } + } + catch (Exception ex) { + processError(currentFilter, table, ex); + } + return writeRow; + } + + private LinkedCaseInsensitiveMap<Object> getVariablesMap(Table table, CsvData data) { + LinkedCaseInsensitiveMap<Object> namedParams = new LinkedCaseInsensitiveMap<Object>(); + if (data != null) { + Map<String, String> sourceValues = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA); + if (sourceValues.size() > 0) { + for (String columnName : sourceValues.keySet()) { + namedParams.put(columnName, sourceValues.get(columnName)); + namedParams.put(columnName.toUpperCase(), sourceValues.get(columnName)); + } + } + else { + Map<String, String> pkValues = data.toColumnNameValuePairs( + table.getPrimaryKeyColumnNames(), CsvData.PK_DATA); + for (String columnName : pkValues.keySet()) { + namedParams.put(columnName, pkValues.get(columnName)); + namedParams.put(columnName.toUpperCase(), pkValues.get(columnName)); + } + } + + Map<String, String> oldValues = data.toColumnNameValuePairs(table.getColumnNames(), + CsvData.OLD_DATA); + for (String columnName : oldValues.keySet()) { + namedParams.put(OLD_ + columnName, sourceValues.get(columnName)); + namedParams.put(OLD_ + columnName.toUpperCase(), sourceValues.get(columnName)); + } + } + return namedParams; + } + + @Override + protected void executeScripts(DataContext context, String key, Set<String> scripts, boolean isFailOnError) { + if (scripts != null) { + try { + ISqlTransaction transaction = context.findTransaction(); + for (String script : scripts) { + String sql = doTokenReplacementOnSql(context, script); + transaction.query(sql, lookupColumnRowMapper, null); + } + } + catch (Exception e) { + if (isFailOnError) { + throw (RuntimeException) e; + } + else { + log.error("Failed while executing sql script", e); + } + } + } + } + + protected String doTokenReplacementOnSql(DataContext context, String sql) { + if (isNotBlank(sql)) { + Data csvData = (Data) context.get(Constants.DATA_CONTEXT_CURRENT_CSV_DATA); + + if (csvData != null && csvData.getTriggerHistory() != null) { + sql = FormatUtils + .replaceToken(sql, "sourceCatalogName", csvData.getTriggerHistory().getSourceCatalogName(), true); + } + + if (csvData != null && csvData.getTriggerHistory() != null) { + sql = FormatUtils + .replaceToken(sql, "sourceSchemaName", csvData.getTriggerHistory().getSourceSchemaName(), true); + } + } + return sql; + } + + + protected void processError(LoadFilter currentFilter, Table table, Throwable ex) { + if (ex instanceof TargetError) { + ex = ((TargetError) ex).getTarget(); + } + String formattedMessage = String + .format("Error executing sql script for load filter %s on table %s. The error was: %s", + new Object[]{currentFilter != null ? currentFilter.getLoadFilterId() : "N/A", table.getName(), + ex.getMessage()}); + log.error(formattedMessage); + if (currentFilter.isFailOnError()) { + throw new SymmetricException(formattedMessage, ex); + } + } + +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java index 0647c24..be9940b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java @@ -32,7 +32,7 @@ public class LoadFilter implements Serializable { static final Logger logger = LoggerFactory.getLogger(LoadFilter.class); - public enum LoadFilterType { BSH, JAVA }; + public enum LoadFilterType { BSH, JAVA, SQL }; private String loadFilterId; -- 1.7.10.4 |
|
GitHub Patch: https://github.com/NiasSt90/symmetric-ds/commit/716853ce8fb83d317112ba4c016a9c15d57b1f64 |
SymmetricDS: 3.7 716853ce 2015-08-25 04:40:41 Details Diff |
0002375: LoadFilter with SQL-Script support currently only SQL-PreparedStatement with COLUMN/OLD_COLUMN variables support |
Affected Issues 0002375 |
|
mod - symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java | Diff File | ||
add - symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java | Diff File | ||
SymmetricDS: 3.7 00246c88 2015-09-25 10:46:56 Details Diff |
0002375: LoadFilter with SQL-Script support |
Affected Issues 0002375 |
|
mod - symmetric-assemble/src/asciidoc/configuration/load-filters.ad | Diff File | ||
mod - symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java | Diff File | ||
mod - symmetric-core/src/main/resources/symmetric-schema.xml | Diff File |
Date Modified | Username | Field | Change |
---|---|---|---|
2015-08-25 08:49 | msc | New Issue | |
2015-08-25 08:49 | msc | File Added: 0001-LoadFilter-with-SQL-Script-support-currently-only-SQ.patch | |
2015-08-25 09:20 | msc | Note Added: 0000713 | |
2015-09-11 20:14 | chenson | Target Version | => 3.7.22 |
2015-09-24 15:34 | chenson | Fixed in Version | => 3.7.22 |
2015-09-24 19:00 | msc | Changeset attached | => SymmetricDS 3.7 716853ce |
2015-09-25 14:47 | chenson | Status | new => resolved |
2015-09-25 14:47 | chenson | Resolution | open => fixed |
2015-09-25 14:47 | chenson | Assigned To | => chenson |
2015-09-25 15:00 | chenson | Changeset attached | => SymmetricDS 3.7 00246c88 |
2015-10-02 14:23 | chenson | Status | resolved => closed |