1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.extract.csv;
23
24 import java.io.BufferedWriter;
25 import java.io.IOException;
26 import java.util.Map;
27
28 import org.jumpmind.symmetric.common.ParameterConstants;
29 import org.jumpmind.symmetric.common.csv.CsvConstants;
30 import org.jumpmind.symmetric.db.IDbDialect;
31 import org.jumpmind.symmetric.extract.DataExtractorContext;
32 import org.jumpmind.symmetric.extract.IDataExtractor;
33 import org.jumpmind.symmetric.model.Data;
34 import org.jumpmind.symmetric.model.Node;
35 import org.jumpmind.symmetric.model.OutgoingBatch;
36 import org.jumpmind.symmetric.service.INodeService;
37 import org.jumpmind.symmetric.service.IParameterService;
38
39 public class CsvExtractor14 implements IDataExtractor {
40
41 private Map<String, IStreamDataCommand> dictionary = null;
42
43 private IParameterService parameterService;
44
45 private IDbDialect dbDialect;
46
47 private INodeService nodeService;
48
49 public void init(BufferedWriter writer, DataExtractorContext context) throws IOException {
50 Node nodeIdentity = nodeService.findIdentity();
51 String nodeId = (nodeIdentity == null) ? parameterService.getString(ParameterConstants.EXTERNAL_ID)
52 : nodeIdentity.getNodeId();
53 Util.write(writer, CsvConstants.NODEID, Util.DELIMITER, nodeId);
54 writer.newLine();
55 }
56
57 public void begin(OutgoingBatch batch, BufferedWriter writer) throws IOException {
58 Util.write(writer, CsvConstants.BATCH, Util.DELIMITER, Long.toString(batch.getBatchId()));
59 writer.newLine();
60 Util.write(writer, CsvConstants.BINARY, Util.DELIMITER, dbDialect.getBinaryEncoding()
61 .name());
62 writer.newLine();
63 }
64
65 public void commit(OutgoingBatch batch, BufferedWriter writer) throws IOException {
66 Util.write(writer, CsvConstants.COMMIT, Util.DELIMITER, Long.toString(batch.getBatchId()));
67 writer.newLine();
68 }
69
70 public void write(BufferedWriter writer, Data data, DataExtractorContext context) throws IOException {
71 preprocessTable(data, writer, context);
72 dictionary.get(data.getEventType().getCode()).execute(writer, data, context);
73 }
74
75 /***
76 * Writes the table metadata out to a stream only if it hasn't already been
77 * written out before
78 *
79 * @param tableName
80 * @param out
81 */
82 public void preprocessTable(Data data, BufferedWriter out, DataExtractorContext context) throws IOException {
83
84 if (data.getAudit() == null) {
85 throw new RuntimeException("Missing trigger_hist for table " + data.getTableName()
86 + ": try running syncTriggers() or restarting SymmetricDS");
87 }
88 String auditKey = Integer.toString(data.getAudit().getTriggerHistoryId()).intern();
89 if (!context.getAuditRecordsWritten().contains(auditKey)) {
90 Util.write(out, CsvConstants.TABLE, ", ", data.getTableName());
91 out.newLine();
92 Util.write(out, CsvConstants.KEYS, ", ", data.getAudit().getPkColumnNames());
93 out.newLine();
94 Util.write(out, CsvConstants.COLUMNS, ", ", data.getAudit().getColumnNames());
95 out.newLine();
96 context.getAuditRecordsWritten().add(auditKey);
97 } else if (!context.isLastTable(data.getTableName())) {
98 Util.write(out, CsvConstants.TABLE, ", ", data.getTableName());
99 out.newLine();
100 }
101
102 context.setLastTableName(data.getTableName());
103 }
104
105 public void setDictionary(Map<String, IStreamDataCommand> dictionary) {
106 this.dictionary = dictionary;
107 }
108
109 public void setDbDialect(IDbDialect dbDialect) {
110 this.dbDialect = dbDialect;
111 }
112
113 public void setParameterService(IParameterService parameterService) {
114 this.parameterService = parameterService;
115 }
116
117 public void setNodeService(INodeService nodeService) {
118 this.nodeService = nodeService;
119 }
120
121 }