Users of Apache Flink are familiar with creating a savepoint and restarting a job from savepoint.
The issue with savepoint is, how to move a savepoint to a different location and be able to start a Flink job from the new location. Problem lies in the _metadata
file of
savepoint files, which contains absolute URIs (see
documentation on moving savepoint).
In this article, we go step-by-step on how to move Flink savepoint from one S3 bucket to another and how to safely (without corrupting) alter the _metadata
file in the
destination, so that the Flink job starts smoothly from a new savepoint location. Setup is tested with S3 and filesystem state backend.
Goal
Suppose we have Flink cluster up and running, the flink-config.yaml
contains these options to configure S3 for savepoints and checkpoints:
s3.access-key: outacccesskey
s3.secret-key: thisisverysecret
s3.endpoint: https://ourcloudprovider.org
s3.path-style-access: true
state.savepoints.dir: s3://old-bucket/old-savepoints
state.checkpoints.dir: s3://old-bucket/old-checkpoints
state.backend: filesystem
We want to change the bucket name and also the location in bucket to
state.savepoints.dir: s3://new-bucket/new-savepoints
state.checkpoints.dir: s3://new-bucket/new-checkpoints
We will do this in these steps:
- Create a savepoint in the current location
- Move the savepoint to new location
- Download the
_metadata
file to our local machine - Alter the
_metadata
file so it contains absolute URIs pointing to new location - Upload
_metadata
to new location - Start the job from new location savepoint
Requirements
We will need:
- S3 CLI, various options are available: AWS S3 CLI or minio client which has the advantage that you can configure multiple providers of S3 API in its config file, which is handy if you are changing provider, too.
- Java 8 JRE, various options: Zulu, or OpenJDK or Oracle
Create a savepoint and move it to new location
At your flink cluster location, issue a standard savepoint command:
./bin/flink savepoint <job_id>
Savepoint completed. Path: s3://old-bucket/old-savepoints/savepoint-b9888f-a23df1784fa3
Move the savepoint to new location:
aws s3 --endpoint-url https://ourcloudprovider.org mv s3://old-bucket/old-savepoints/savepoint-b9888f-a23df1784fa3 s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3 --recursive
Download the _metadata
to your workstation:
aws s3 --endpoint-url https://ourcloudprovider.org cp s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3/_metadata ./old_metadata
We call the file old_metadata
on purpose, to make the new file later.
Alter the metadata file to accommodate for new location
One option is to open the _metadata
file in text of you choice and try to find and replace all occurrences of old URI to new URI.
We will go with much safer option of programmatically altering the contents of this file.
We have prepared small Java app to do this job. You can download state-metadata-1.0.0.jar. Source code of this project is available at flink-state-metadata github repository.
Given the old savepoint location s3://old-bucket/old-savepoints
and new location s3://new-bucket/new-savepoints
we execute the program as follows:
java -jar state-metadata-1.0.0.jar --input.file old_metadata --output.file _metadata \
--input.uri s3://old-bucket/old-savepoints/savepoint-b9888f-a23df1784fa3 \
--output.uri s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3
The file _metadata
now has the URIs updated, which you can check by looking into it:
head _metadata
<some binary garbage>
s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3/44c90045-8c15-41d0-8bfd-26389b951243 ...
<some binary garbage>
Upload the _metadata and start from relocated savepoint
Upload the altered _metadata
file:
aws s3 --endpoint-url https://ourcloudprovider.org cp ./_metadata s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3/_metadata
Remember to update the flink-config.yaml
:
state.savepoints.dir: s3://new-bucket/new-savepoints
state.checkpoints.dir: s3://new-bucket/new-checkpoints
Now we can safely start from relocated checkpoint at our flink cluster:
./bin/flink ... --from-savepoint s3://new-bucket/new-savepoints/savepoint-b9888f-a23df1784fa3
It should load the savepoint and start checkpointing to new bucket.
Conclusion
All the source code is available in github repository.
Let me know about any questions.