Issue
This Content is from Stack Overflow. Question asked by Tartartartaglia
Hi I am new to Flink and trying to figure out some best practices with the following scenerio:
I am playing around with a Flink job that reads unique data from multiple CSV files. Each row in the CSV is composed of three columns: userId, appId, name. I have to do some processing on each of these records (capitalize the name) and post the record to a Kafka Topic.
The goal is to filter out any duplicate records that exist so we do not have duplicate messages in the output Kafka Topic.
I am doing a keyBy(userId, appId) on the stream and keeping a boolean value state “Processed” to filter out duplicate records.
The issue is when I cancel the Task Manager in the middle of processing a file, to simulate a failure, it will start processing the file from the beginning once it restarts.
This is a problem because the “Processed” State in the Flink job is also wiped clean after the Task Manager fails!
This leads to duplicate messages in the output Kafka topic.
How can I prevent this?
I need to restore the “Processed” Flink state to what it was prior to the Task Manager failing. What is the best practice to do this?
Things to consider:
- Checkpointing is already enabled.
- K8 pod for Task Manager (can be scaled very fast) and Parallelism is always > 1.
- Files can have millions of rows and need to be processed in parallel.
Thank you for the help!
Solution
I would recommend to read up on Flink’s fault tolerance mechanisms, checkpointing & savepointing. https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/ and https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/ are good places to start.
I think you could also achieve your deduplication easier by using Table API/SQL. See https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication/
This Question was asked in StackOverflow by Tartartartaglia and Answered by Martijn Visser It is licensed under the terms of CC BY-SA 2.5. - CC BY-SA 3.0. - CC BY-SA 4.0.