View Issue Details

IDProjectCategoryView StatusLast Update
0003015SymmetricDSImprovementpublic2019-04-24 10:46
Reporterpavan kumarAssigned To 
Prioritylow 
Status closedResolutionno change required 
Product Version3.8.17 
Target VersionFixed in Version 
Summary0003015: SymmetricDS with KafkaConsumer.
DescriptionWe are trying database replication using SymmetricDS with Kafka.
We followed below sample to read data from Oracle and push to Kafka.
Now we are trying for the code to read from Kafka and insert the records into Postgres.
Though we have written custom KafkaConsumer to do the same, we want to try the code using SymmetricDS API in below format.
It would be helpful if you can provide us the code for KafkaConsumer with SymmetricDS


insert into SYM_EXTENSION (EXTENSION_ID, EXTENSION_TYPE, INTERFACE_NAME, NODE_GROUP_ID, ENABLED, EXTENSION_ORDER, EXTENSION_TEXT, CREATE_TIME, LAST_UPDATE_BY, LAST_UPDATE_TIME) values ('KafkaDataWriter','java','org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter','client',1,1,'
import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaWriterFilter implements IDatabaseWriterFilter {
    protected final String KAKFA_TEXT_CACHE = "KAKFA_TEXT_CACHE" + this.hashCode();

    private final Logger log = LoggerFactory.getLogger(getClass());

    public boolean beforeWrite(DataContext context, Table table, CsvData data) {
        if (table.getName().toUpperCase().startsWith("SYM_")) {
        return true;
        }
        else {
            log.info("Processing table " + table + " for Kafka");

            String[] rowData = data.getParsedData(CsvData.ROW_DATA);
            if (data.getDataEventType() == DataEventType.DELETE) {
                rowData = data.getParsedData(CsvData.OLD_DATA);
            }

            StringBuffer kafkaText = new StringBuffer();
            if (context.get(KAKFA_TEXT_CACHE) != null) {
                kafkaText = (StringBuffer) context.get(KAKFA_TEXT_CACHE);
            }

            boolean useJson = false;

            if (useJson) {
                kafkaText.append("{\"")
                    .append(table.getName())
                    .append("\": {")
                    .append("\"eventType\": \"" + data.getDataEventType() + "\",")
                    .append("\"data\": { ");
                for (int i = 0; i < table.getColumnNames().length; i++) {
                    kafkaText.append("\"" + table.getColumnNames()[i] + "\": \"" + rowData[i]);
                    if (i + 1 < table.getColumnNames().length) {
                        kafkaText.append("\",");
                    }
                }
                kafkaText.append(" } } }");
            }
            else {
                kafkaText.append("\nTABLE")
                    .append(",")
                    .append(table.getName())
                    .append(",")
                    .append("EVENT")
                    .append(",")
                    .append(data.getDataEventType())
                    .append(",");

                for (int i = 0; i < table.getColumnNames().length; i++) {
                    kafkaText.append(table.getColumnNames()[i])
                        .append(",")
                        .append(rowData[i]);
                    if (i + 1 < table.getColumnNames().length) {
                        kafkaText.append(",");
                    }
                }
            }
            context.put(KAKFA_TEXT_CACHE, kafkaText);
        }
        return false;
    }

    public void afterWrite(DataContext context, Table table, CsvData data) {
    }

    public boolean handlesMissingTable(DataContext context, Table table) {
        return true;
    }

    public void earlyCommit(DataContext context) {
    }

    public void batchComplete(DataContext context) {
        if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) {
            String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId();
            log.info("Processing batch " + batchFileName + " for Kafka");
            try {
                File batchesDir = new File("batches");
                if (!batchesDir.exists()) {
                    batchesDir.mkdir();
                }
                File batchFile = new File(batchesDir.getAbsoluteFile() + "/" + batchFileName);

                if (context.get(KAKFA_TEXT_CACHE) != null) {
                    String kafkaText = ((StringBuffer) context.get(KAKFA_TEXT_CACHE)).toString();
                    FileUtils.writeStringToFile(batchFile, KAKFA_TEXT_CACHE);
                    sendKafkaMessage(kafkaText);
                } else {
                    log.info("No text found to write to kafka queue");
                }
            }
            catch (Exception e) {
                log.warn("Unable to write batch to Kafka " + batchFileName, e);
                e.printStackTrace();
            }
        }
    }

    public void batchCommitted(DataContext context) {
    }

    public void batchRolledback(DataContext context) {
    }

    public void sendKafkaMessage(String kafkaText) {
        Map<String,Object> configs = new HashMap<String, Object>();

        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "symmetricds-producer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

        producer.send(new ProducerRecord<String, String>("test", kafkaText));
        log.debug("Data to be sent to Kafka-" + kafkaText);

        producer.close();
    }
}

',{ts '2017-01-09 10:58:17.981'},'admin',{ts '2017-01-09 13:04:37.490'});
TagsNo tags attached.

Relationships

related to 0003609 closedjosh-a-hicks Kafka support as a load only node 

Activities

elong

2019-04-24 10:46

developer   ~0001429

Use the Kafka load-only node in 3.9 or 3.10.

Issue History

Date Modified Username Field Change
2017-03-14 11:41 pavan kumar New Issue
2017-03-15 14:48 chenson Priority @60@ => low
2017-03-15 14:49 chenson Assigned To => chenson
2017-03-15 14:49 chenson Status new => acknowledged
2019-04-24 10:46 elong Relationship added related to 0003609
2019-04-24 10:46 elong Assigned To chenson =>
2019-04-24 10:46 elong Status acknowledged => closed
2019-04-24 10:46 elong Resolution open => no change required
2019-04-24 10:46 elong Note Added: 0001429