1 package org.jumpmind.symmetric.db.derby;
2
3 import java.sql.Blob;
4 import java.sql.Clob;
5 import java.sql.Connection;
6 import java.sql.DriverManager;
7 import java.sql.PreparedStatement;
8 import java.sql.ResultSet;
9 import java.sql.SQLException;
10 import java.util.Hashtable;
11
12 import org.apache.commons.codec.binary.Base64;
13 import org.apache.derby.iapi.sql.conn.LanguageConnectionContext;
14 import org.apache.derby.impl.jdbc.EmbedConnection;
15
16 public class DerbyFunctions {
17
18 private static final String CURRENT_CONNECTION_URL = "jdbc:default:connection";
19
20 private static final int MAX_STRING_LENGTH = 32672;
21
22
23 private static final int MAX_BINARY_LENGTH = 23700;
24
25 private static Hashtable<String, Boolean> syncDisabledTable = new Hashtable<String, Boolean>();
26
27 private static Hashtable<String, String> syncNodeDisabledTable = new Hashtable<String, String>();
28
29 public static String getTransactionId() throws SQLException {
30 return getLanguageConnection().getTransactionExecute().getTransactionIdString();
31 }
32
33 public static String getSessionId() throws SQLException {
34 return Integer.toString(getLanguageConnection().getInstanceNumber());
35 }
36
37 public static int isSyncDisabled() throws SQLException {
38 return syncDisabledTable.get(getSessionId()) != null ? 1 : 0;
39 }
40
41 public static String getSyncNodeDisabled() throws SQLException {
42 return syncNodeDisabledTable.get(getSessionId());
43 }
44
45 public static String setSyncNodeDisabled(String nodeId) throws SQLException {
46 if (nodeId == null) {
47 return syncNodeDisabledTable.remove(getSessionId());
48 } else {
49 return syncNodeDisabledTable.put(getSessionId(), nodeId);
50 }
51 }
52
53 public static int setSyncDisabled(int disabledIndicator) throws SQLException {
54 if (disabledIndicator == 0) {
55 syncDisabledTable.remove(getSessionId());
56 return 0;
57 } else {
58 syncDisabledTable.put(getSessionId(), Boolean.TRUE);
59 return 1;
60 }
61 }
62
63 public static void insertData(String schemaName, String prefixName, String tableName, String channelName,
64 String dmlType, long triggerHistId, String transactionId, String targetGroupId, String nodeSelectWhere,
65 String pkData, String rowData, String oldRowData) throws SQLException {
66 if (((dmlType.equals("I") || dmlType.equals("U")) && rowData != null)
67 || (dmlType.equals("D") && pkData != null)) {
68 Connection conn = DriverManager.getConnection(CURRENT_CONNECTION_URL);
69 String sql = "insert into " + schemaName + prefixName + "_data "
70 + "(table_name, event_type, trigger_hist_id, pk_data, row_data, old_data, create_time) "
71 + "values (?, ?, ?, ?, ?, ?, current_timestamp)";
72 PreparedStatement ps = conn.prepareStatement(sql);
73 ps.setString(1, tableName);
74 ps.setString(2, dmlType);
75 ps.setLong(3, triggerHistId);
76 ps.setString(4, pkData);
77 ps.setString(5, rowData);
78 ps.setString(6, oldRowData);
79 ps.executeUpdate();
80 ps.close();
81 String where = "";
82 String disabledNodeId = getSyncNodeDisabled();
83 if (disabledNodeId != null) {
84 where = "and c.node_id != '" + disabledNodeId + "' ";
85 }
86 sql = "insert into " + schemaName + prefixName
87 + "_data_event (node_id, data_id, channel_id, transaction_id) "
88 + "select node_id, IDENTITY_VAL_LOCAL(),'" + channelName + "','" + transactionId + "' from "
89 + prefixName + "_node c where (c.node_group_id = ? and c.sync_enabled = 1) " + where + nodeSelectWhere;
90 ps = conn.prepareStatement(sql);
91 ps.setString(1, targetGroupId);
92 ps.executeUpdate();
93 ps.close();
94 conn.close();
95 }
96 }
97
98 public static String blobToString(String columnName, String tableName, String whereClause) throws SQLException {
99 Connection conn = DriverManager.getConnection(CURRENT_CONNECTION_URL);
100 String sql = "select " + columnName + " from " + tableName + " where " + whereClause;
101 PreparedStatement ps = conn.prepareStatement(sql);
102 ResultSet rs = ps.executeQuery();
103 String str = null;
104 if (rs.next()) {
105 Blob blob = rs.getBlob(1);
106 if (blob != null && blob.length() > 0) {
107 str = new String(Base64.encodeBase64(blob.getBytes(1, MAX_BINARY_LENGTH)));
108 }
109 }
110 ps.close();
111 conn.close();
112 return str == null ? "" : "\"" + str + "\"";
113 }
114
115 public static String clobToString(String columnName, String tableName, String whereClause) throws SQLException {
116 Connection conn = DriverManager.getConnection(CURRENT_CONNECTION_URL);
117 String sql = "select " + columnName + " from " + tableName + " where " + whereClause;
118 PreparedStatement ps = conn.prepareStatement(sql);
119 ResultSet rs = ps.executeQuery();
120 String str = null;
121 if (rs.next()) {
122 Clob clob = rs.getClob(1);
123 if (clob != null && clob.length() > 0) {
124 str = clob.getSubString(1, MAX_STRING_LENGTH);
125 }
126 }
127 ps.close();
128 conn.close();
129 return str == null ? "" : "\"" + str + "\"";
130 }
131
132 private static LanguageConnectionContext getLanguageConnection() throws SQLException {
133 EmbedConnection conn = (EmbedConnection) DriverManager.getConnection(CURRENT_CONNECTION_URL);
134 return conn.getLanguageConnection();
135 }
136 }