View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    * Copyright (C) Andrew Wilcox <andrewbwilcox@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.extract.csv;
23  
24  import java.io.BufferedWriter;
25  import java.io.IOException;
26  import java.util.Map;
27  
28  import org.jumpmind.symmetric.common.ParameterConstants;
29  import org.jumpmind.symmetric.common.csv.CsvConstants;
30  import org.jumpmind.symmetric.db.IDbDialect;
31  import org.jumpmind.symmetric.extract.DataExtractorContext;
32  import org.jumpmind.symmetric.extract.IDataExtractor;
33  import org.jumpmind.symmetric.model.Data;
34  import org.jumpmind.symmetric.model.Node;
35  import org.jumpmind.symmetric.model.OutgoingBatch;
36  import org.jumpmind.symmetric.service.INodeService;
37  import org.jumpmind.symmetric.service.IParameterService;
38  
39  public class CsvExtractor14 implements IDataExtractor {
40  
41      private Map<String, IStreamDataCommand> dictionary = null;
42  
43      private IParameterService parameterService;
44  
45      private IDbDialect dbDialect;
46  
47      private INodeService nodeService;
48  
49      public void init(BufferedWriter writer, DataExtractorContext context) throws IOException {
50          Node nodeIdentity = nodeService.findIdentity();
51          String nodeId = (nodeIdentity == null) ? parameterService.getString(ParameterConstants.EXTERNAL_ID)
52                  : nodeIdentity.getNodeId();
53          Util.write(writer, CsvConstants.NODEID, Util.DELIMITER, nodeId);
54          writer.newLine();
55      }
56  
57      public void begin(OutgoingBatch batch, BufferedWriter writer) throws IOException {
58          Util.write(writer, CsvConstants.BATCH, Util.DELIMITER, Long.toString(batch.getBatchId()));
59          writer.newLine();
60          Util.write(writer, CsvConstants.BINARY, Util.DELIMITER, dbDialect.getBinaryEncoding()
61                  .name());
62          writer.newLine();
63      }
64  
65      public void commit(OutgoingBatch batch, BufferedWriter writer) throws IOException {
66          Util.write(writer, CsvConstants.COMMIT, Util.DELIMITER, Long.toString(batch.getBatchId()));
67          writer.newLine();
68      }
69  
70      public void write(BufferedWriter writer, Data data, DataExtractorContext context) throws IOException {
71          preprocessTable(data, writer, context);
72          dictionary.get(data.getEventType().getCode()).execute(writer, data, context);
73      }
74  
75      /***
76       * Writes the table metadata out to a stream only if it hasn't already been
77       * written out before
78       * 
79       * @param tableName
80       * @param out
81       */
82      public void preprocessTable(Data data, BufferedWriter out, DataExtractorContext context) throws IOException {
83  
84          if (data.getAudit() == null) {
85              throw new RuntimeException("Missing trigger_hist for table " + data.getTableName()
86                      + ": try running syncTriggers() or restarting SymmetricDS");
87          }
88          String auditKey = Integer.toString(data.getAudit().getTriggerHistoryId()).intern();
89          if (!context.getAuditRecordsWritten().contains(auditKey)) {
90              Util.write(out, CsvConstants.TABLE, ", ", data.getTableName());
91              out.newLine();
92              Util.write(out, CsvConstants.KEYS, ", ", data.getAudit().getPkColumnNames());
93              out.newLine();
94              Util.write(out, CsvConstants.COLUMNS, ", ", data.getAudit().getColumnNames());
95              out.newLine();
96              context.getAuditRecordsWritten().add(auditKey);
97          } else if (!context.isLastTable(data.getTableName())) {
98              Util.write(out, CsvConstants.TABLE, ", ", data.getTableName());
99              out.newLine();
100         }
101 
102         context.setLastTableName(data.getTableName());
103     }
104 
105     public void setDictionary(Map<String, IStreamDataCommand> dictionary) {
106         this.dictionary = dictionary;
107     }
108 
109     public void setDbDialect(IDbDialect dbDialect) {
110         this.dbDialect = dbDialect;
111     }
112 
113     public void setParameterService(IParameterService parameterService) {
114         this.parameterService = parameterService;
115     }
116 
117     public void setNodeService(INodeService nodeService) {
118         this.nodeService = nodeService;
119     }
120 
121 }