View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    * Copyright (C) Andrew Wilcox <andrewbwilcox@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.extract.csv;
23  
24  import java.io.BufferedWriter;
25  import java.io.IOException;
26  import java.util.Map;
27  
28  import org.jumpmind.symmetric.common.ParameterConstants;
29  import org.jumpmind.symmetric.common.csv.CsvConstants;
30  import org.jumpmind.symmetric.db.IDbDialect;
31  import org.jumpmind.symmetric.extract.DataExtractorContext;
32  import org.jumpmind.symmetric.extract.IDataExtractor;
33  import org.jumpmind.symmetric.model.Data;
34  import org.jumpmind.symmetric.model.DataEventType;
35  import org.jumpmind.symmetric.model.Node;
36  import org.jumpmind.symmetric.model.OutgoingBatch;
37  import org.jumpmind.symmetric.service.INodeService;
38  import org.jumpmind.symmetric.service.IParameterService;
39  
40  public class CsvExtractor implements IDataExtractor {
41  
42      private Map<String, IStreamDataCommand> dictionary = null;
43  
44      private IParameterService parameterService;
45  
46      private IDbDialect dbDialect;
47  
48      private INodeService nodeService;
49  
50      public void init(BufferedWriter writer, DataExtractorContext context) throws IOException {
51          Node nodeIdentity = nodeService.findIdentity();
52          String nodeId = (nodeIdentity == null) ? parameterService.getString(ParameterConstants.EXTERNAL_ID)
53                  : nodeIdentity.getNodeId();
54          Util.write(writer, CsvConstants.NODEID, Util.DELIMITER, nodeId);
55          writer.newLine();
56      }
57  
58      public void begin(OutgoingBatch batch, BufferedWriter writer) throws IOException {
59          Util.write(writer, CsvConstants.BATCH, Util.DELIMITER, Long.toString(batch.getBatchId()));
60          writer.newLine();
61          Util.write(writer, CsvConstants.BINARY, Util.DELIMITER, dbDialect.getBinaryEncoding()
62                  .name());
63          writer.newLine();
64      }
65  
66      public void commit(OutgoingBatch batch, BufferedWriter writer) throws IOException {
67          Util.write(writer, CsvConstants.COMMIT, Util.DELIMITER, Long.toString(batch.getBatchId()));
68          writer.newLine();
69      }
70  
71      public void write(BufferedWriter writer, Data data, DataExtractorContext context) throws IOException {
72          preprocessTable(data, writer, context);
73          dictionary.get(data.getEventType().getCode()).execute(writer, data, context);
74      }
75  
76      /***
77       * Writes the table metadata out to a stream only if it hasn't already been
78       * written out before
79       * 
80       * @param tableName
81       * @param out
82       */
83      public void preprocessTable(Data data, BufferedWriter out, DataExtractorContext context) throws IOException {
84          if (data.getAudit() != null) {
85              String auditKey = Integer.toString(
86                      data.getAudit().getTriggerHistoryId()).intern();
87              if (!context.getAuditRecordsWritten().contains(auditKey)) {
88                  Util.write(out, CsvConstants.TABLE, ", ", data.getTableName());
89                  out.newLine();
90                  Util.write(out, CsvConstants.KEYS, ", ", data.getAudit()
91                          .getPkColumnNames());
92                  out.newLine();
93                  Util.write(out, CsvConstants.COLUMNS, ", ", data.getAudit()
94                          .getColumnNames());
95                  out.newLine();
96                  context.getAuditRecordsWritten().add(auditKey);
97              } else if (!context.isLastTable(data.getTableName())) {
98                  Util.write(out, CsvConstants.TABLE, ", ", data.getTableName());
99                  out.newLine();
100             }
101             if (data.getEventType() == DataEventType.UPDATE
102                     && data.getOldData() != null) {
103                 Util.write(out, CsvConstants.OLD, ", ", data.getOldData());
104                 out.newLine();
105             }
106 
107         }
108         context.setLastTableName(data.getTableName());
109     }
110 
111     public void setDictionary(Map<String, IStreamDataCommand> dictionary) {
112         this.dictionary = dictionary;
113     }
114 
115     public void setDbDialect(IDbDialect dbDialect) {
116         this.dbDialect = dbDialect;
117     }
118 
119     public void setParameterService(IParameterService parameterService) {
120         this.parameterService = parameterService;
121     }
122 
123     public void setNodeService(INodeService nodeService) {
124         this.nodeService = nodeService;
125     }
126 
127 }