Issue
This Content is from Stack Overflow. Question asked by lucy
The use case I have is to join the stream in Flink and save the output to S3. I am reading Kafka data using Flink. Messages from Kafka contain two types of data, so I need to join both types in S3.
Table tr = tableEnv.sqlQuery("select " +
" a.x," +
" a.y," +
" b.z", +
" b.z1" +
" from " +
" (select * from table1" +
" where type='machine1') a" +
" full outer join " +
" (select * from table2" +
" where type='machine12') b" +
" on a.id=b.id ");
In flink, the output is continuous, and the state is stored in memory. My goal is to run the program in batch mode, meaning that every minute whatever events come in will be applied to the join and saved to S3, regardless of whether the join succeeds or fails. It’s easy to do spark streaming, but I’m not sure how to do it with Flink?
Solution
If you want the Flink Kafka SQL connector to run in batch mode, you’ll need to specify the offsets where Flink needs to consume the events from. As documented on https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#connector-options, you can provide the scan.startup.specific-offsets
parameter with the right values. The only thing that you’ll need to determine is from which offset you’ve started to consume and what’s the last offset that you’ve read.
This Question was asked in StackOverflow by lucy and Answered by Martijn Visser It is licensed under the terms of CC BY-SA 2.5. - CC BY-SA 3.0. - CC BY-SA 4.0.