[SOLVED] How to do join in flink job in batch mode

Issue

This Content is from Stack Overflow. Question asked by lucy

The use case I have is to join the stream in Flink and save the output to S3. I am reading Kafka data using Flink. Messages from Kafka contain two types of data, so I need to join both types in S3.

 Table tr = tableEnv.sqlQuery("select " +
            " a.x," +
            " a.y," +
            " b.z", +
            " b.z1" +
            " from " +
            " (select * from table1" +
            " where type='machine1') a" +
            " full outer join " +
            " (select * from table2" +
            " where type='machine12') b" +
            " on a.id=b.id ");

In flink, the output is continuous, and the state is stored in memory. My goal is to run the program in batch mode, meaning that every minute whatever events come in will be applied to the join and saved to S3, regardless of whether the join succeeds or fails. It’s easy to do spark streaming, but I’m not sure how to do it with Flink?



Solution

If you want the Flink Kafka SQL connector to run in batch mode, you’ll need to specify the offsets where Flink needs to consume the events from. As documented on https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#connector-options, you can provide the scan.startup.specific-offsets parameter with the right values. The only thing that you’ll need to determine is from which offset you’ve started to consume and what’s the last offset that you’ve read.


This Question was asked in StackOverflow by lucy and Answered by Martijn Visser It is licensed under the terms of CC BY-SA 2.5. - CC BY-SA 3.0. - CC BY-SA 4.0.

people found this article helpful. What about you?