1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.extract.csv;
23
24 import java.io.BufferedWriter;
25 import java.io.IOException;
26 import java.util.Map;
27
28 import org.jumpmind.symmetric.common.ParameterConstants;
29 import org.jumpmind.symmetric.common.csv.CsvConstants;
30 import org.jumpmind.symmetric.db.IDbDialect;
31 import org.jumpmind.symmetric.extract.DataExtractorContext;
32 import org.jumpmind.symmetric.extract.IDataExtractor;
33 import org.jumpmind.symmetric.model.Data;
34 import org.jumpmind.symmetric.model.DataEventType;
35 import org.jumpmind.symmetric.model.Node;
36 import org.jumpmind.symmetric.model.OutgoingBatch;
37 import org.jumpmind.symmetric.service.INodeService;
38 import org.jumpmind.symmetric.service.IParameterService;
39
40 public class CsvExtractor implements IDataExtractor {
41
42 private Map<String, IStreamDataCommand> dictionary = null;
43
44 private IParameterService parameterService;
45
46 private IDbDialect dbDialect;
47
48 private INodeService nodeService;
49
50 public void init(BufferedWriter writer, DataExtractorContext context) throws IOException {
51 Node nodeIdentity = nodeService.findIdentity();
52 String nodeId = (nodeIdentity == null) ? parameterService.getString(ParameterConstants.EXTERNAL_ID)
53 : nodeIdentity.getNodeId();
54 Util.write(writer, CsvConstants.NODEID, Util.DELIMITER, nodeId);
55 writer.newLine();
56 }
57
58 public void begin(OutgoingBatch batch, BufferedWriter writer) throws IOException {
59 Util.write(writer, CsvConstants.BATCH, Util.DELIMITER, Long.toString(batch.getBatchId()));
60 writer.newLine();
61 Util.write(writer, CsvConstants.BINARY, Util.DELIMITER, dbDialect.getBinaryEncoding()
62 .name());
63 writer.newLine();
64 }
65
66 public void commit(OutgoingBatch batch, BufferedWriter writer) throws IOException {
67 Util.write(writer, CsvConstants.COMMIT, Util.DELIMITER, Long.toString(batch.getBatchId()));
68 writer.newLine();
69 }
70
71 public void write(BufferedWriter writer, Data data, DataExtractorContext context) throws IOException {
72 preprocessTable(data, writer, context);
73 dictionary.get(data.getEventType().getCode()).execute(writer, data, context);
74 }
75
76 /***
77 * Writes the table metadata out to a stream only if it hasn't already been
78 * written out before
79 *
80 * @param tableName
81 * @param out
82 */
83 public void preprocessTable(Data data, BufferedWriter out, DataExtractorContext context) throws IOException {
84 if (data.getAudit() != null) {
85 String auditKey = Integer.toString(
86 data.getAudit().getTriggerHistoryId()).intern();
87 if (!context.getAuditRecordsWritten().contains(auditKey)) {
88 Util.write(out, CsvConstants.TABLE, ", ", data.getTableName());
89 out.newLine();
90 Util.write(out, CsvConstants.KEYS, ", ", data.getAudit()
91 .getPkColumnNames());
92 out.newLine();
93 Util.write(out, CsvConstants.COLUMNS, ", ", data.getAudit()
94 .getColumnNames());
95 out.newLine();
96 context.getAuditRecordsWritten().add(auditKey);
97 } else if (!context.isLastTable(data.getTableName())) {
98 Util.write(out, CsvConstants.TABLE, ", ", data.getTableName());
99 out.newLine();
100 }
101 if (data.getEventType() == DataEventType.UPDATE
102 && data.getOldData() != null) {
103 Util.write(out, CsvConstants.OLD, ", ", data.getOldData());
104 out.newLine();
105 }
106
107 }
108 context.setLastTableName(data.getTableName());
109 }
110
111 public void setDictionary(Map<String, IStreamDataCommand> dictionary) {
112 this.dictionary = dictionary;
113 }
114
115 public void setDbDialect(IDbDialect dbDialect) {
116 this.dbDialect = dbDialect;
117 }
118
119 public void setParameterService(IParameterService parameterService) {
120 this.parameterService = parameterService;
121 }
122
123 public void setNodeService(INodeService nodeService) {
124 this.nodeService = nodeService;
125 }
126
127 }