Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Data Loss During Load Testing with METADATA Enabled and Autoscale Flink #12738

Open
maheshguptags opened this issue Jan 30, 2025 · 25 comments
Open
Labels
data-loss loss of data only, use data-consistency label for inconsistent view flink Issues related to flink priority:critical production down; pipelines stalled; Need help asap.

Comments

@maheshguptags
Copy link

maheshguptags commented Jan 30, 2025

Issue

While performing load testing with METADATA enabled, I encountered a data loss issue. The issue occurs when deploying the job with Autoscale enabled. Specifically, if checkpointing fails due to reasons such as TM add-ons or memory heap issues, all data is discarded, and no further data is processed after that failure.

Checkpointing failures lead to data loss.
After a failed checkpoint due to lack of resources, a new checkpoint is triggered but no data is processed.
I tried to replicate this behavior on Hudi 1.0, and the same issue persists.

Hudi Properties

#Updated at 2025-01-20T07:41:05.654545Z
#Mon Jan 20 07:41:05 UTC 2025
hoodie.table.keygenerator.type=COMPLEX_AVRO
hoodie.table.type=COPY_ON_WRITE
hoodie.table.precombine.field=updated_date
hoodie.table.create.schema={}
hoodie.timeline.layout.version=2
hoodie.timeline.history.path=history
hoodie.table.checksum=1292384652
hoodie.datasource.write.drop.partition.columns=false
hoodie.record.merge.strategy.id=00000000-0000-0000-0000-000000000000
hoodie.datasource.write.hive_style_partitioning=false
hoodie.table.metadata.partitions.inflight=
hoodie.database.name=default_database
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.record.merge.mode=CUSTOM
hoodie.table.version=8
hoodie.compaction.payload.class=com.gupshup.cdp.PartialUpdate
hoodie.table.initial.version=8
hoodie.table.metadata.partitions=files
hoodie.table.partition.fields=xyz
hoodie.table.cdc.enabled=false
hoodie.archivelog.folder=history
hoodie.table.name=customer_temp
hoodie.table.recordkey.fields=xyz.abc 
hoodie.timeline.path=timeline

Steps to reproduce the behavior:

  1. Create a table with Flink hudi along with MDT Enable
  2. Ingest some load
  3. Try to delete one of TM Or Ingest heavy load so that it can give memory issue
  4. once it fails it will discard all the data after that checkpointing

Expected behavior

After checkpoint failure due to resource issues, the system should continue processing data once resources are available, without losing previously processed data.

Environment Description

  • Hudi version : 1.0.0

  • Flink version: 1.18

  • Hive version : NO

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : Yes

  • Table Type: COPY_ON_WRITE

Additional context

Can the Hudi team assist with troubleshooting this issue? Is this expected behavior with METADATA enabled, or is there a bug with flink under resource constraint scenarios?

Image

cc: @codope @bhasudha @danny0405 @xushiyan @ad1happy2go @yihua

@danny0405
Copy link
Contributor

Does the job work well without auto-scale? What is the state of the pipeline after the checkpoint fails, does the writer stil handle inputs?

@maheshguptags
Copy link
Author

Yes, Job works with and without auto-scale if we don't enable MDT.

@danny0405
Copy link
Contributor

Is there any sepcial logs in the JM logging?

@maheshguptags
Copy link
Author

I haven't seen any special log for this, usually, it fails the checkpoint either by autoscaling spin up new TM or if I kill the TM manually and it discards the data post that.

Thanks
Mahesh

@danny0405
Copy link
Contributor

it discards the data post that.

Are you saying the pipeline just hangs up there and does nothing?

@maheshguptags
Copy link
Author

maheshguptags commented Feb 5, 2025

No, it simply moves to the next checkpointing and processes nothing. If you observe the 4th checkpointing, it is currently processing millions of records (in progress, as it's not completed) and taking approximately 4 minutes. However, once it fails, it moves to the next checkpointing and processes nothing, completing in milliseconds(same goes for 2 and 3).

Thanks
Mahesh

Image

@ad1happy2go ad1happy2go added priority:critical production down; pipelines stalled; Need help asap. flink Issues related to flink data-loss loss of data only, use data-consistency label for inconsistent view labels Feb 7, 2025
@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Feb 7, 2025
@maheshguptags
Copy link
Author

@danny0405 any findings?

@danny0405
Copy link
Contributor

No, without more detailed logs I can not help more, and I have no knowledge of Flink auto-scaling.

@cshuo
Copy link
Contributor

cshuo commented Feb 12, 2025

  1. once it fails it will discard all the data after that checkpointing

@maheshguptags do you mean after the checkpoint failed, records ingested after that will loss? if that happens, usually there are two types of problems:

  1. the job is stuck somewhere before the hudi write operator, you can check metric of hudi write operator in dashboard, e.g.,"records received" to make sure there are new coming records after the ckp failure.
  2. records are not committed correctly, you should check logs in jobmanager to see if there are exception messages.

btw, could you also paste the exception trace in the flink dashboard too?

@maheshguptags
Copy link
Author

Hi

do you mean after the checkpoint failed, records ingested after that will loss?

@cshuo Let's assume 10 million records are ingested into the job. While processing these records, if the job manager triggers the creation of a new Task Manager (TM) due to auto-scaling, or if a TM is manually removed (to test the scenario without auto-scaling), a checkpoint failure could occur, causing all the previously ingested data (the 10 million records) to be discarded.

If new data (e.g., 1 million records) is ingested after the checkpoint failure, the new data will be successfully processed and ingested to Hudi, provided the next checkpoint succeeds.

To summarize:

Ingest 10M records → checkpoint failure (due to TM change) → discard all data
Ingest 1M new records → checkpoint success → successfully ingested into Hudi(only 1M).

Thanks
Mahesh

@danny0405
Copy link
Contributor

Ingest 10M records → checkpoint failure (due to TM change) → discard all data

So these records does not even have a complete checkpoint lifecycle and no commits occur.

@maheshguptags
Copy link
Author

I used an example to illustrate the issue.
The process successfully ingested and checkpointed data (5.5M out of 10M). However, whenever the job was interrupted (either manually or due to autoscaling), the remaining 4.5M records were discarded.

example
Ingest 10M records:

chkpnt1 → succeeded → ingested 2.5M (out of 10M)
chkpnt2 → succeeded → ingested 3M (remaining of 7.5M)
chkpnt3 → failed (either manually or due to autoscaling) → No data written to Hudi table, and the remaining 4.5M records will be discarded after this point

Attempts the next checkpoint

chkpnt4 → succeeded → no data will be written due to the failure at chkpnt3 and the checkpoint will complete within milliseconds.

Thanks
Mahesh

@maheshguptags
Copy link
Author

@danny0405 @cshuo, any progress on this?
Please let me know if you need further information.

No, without more detailed logs I can not help more, and I have no knowledge of Flink auto-scaling.

You can try it without auto-scaling by forcefully deleting the TM while checkpointing is in progress (causing the checkpointing to fail) with MDT enabled.

We can also connect over a call.

Thanks
Mahesh

@cshuo
Copy link
Contributor

cshuo commented Feb 24, 2025

@maheshguptags The checkpoint failure can be happened in different phase, e.g., during snapshot state in TM or notify checkpoint complete in JM. Can this problem be stably reproduced? If yes, could you provide a jobmanager.log which contains the exception log for ckp failure.

@maheshguptags
Copy link
Author

Sure @cshuo
I manually deleted the TM with MDT enabled(4th checkpointing in UI and 5 is completing within ms). although I am not seeing any message unusual info(regarding discarding the data if chkpnt fails), I am attaching both JM stack trace and a Screenshot for checkpointing UI.

2025-02-24 12:36:23,155 INFO  org.apache.hudi.sink.StreamWriteOperatorCoordinator          [] - Executor executes action [handle write metadata event for instant 20250224123109474] success!
2025-02-24 12:36:23,155 INFO  org.apache.hudi.sink.StreamWriteOperatorCoordinator          [] - Executor executes action [handle write metadata event for instant 20250224123109474] success!
2025-02-24 12:36:41,196 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Closing TaskExecutor connection cdp-customer-profile-flink-hudi-temp-taskmanager-1-8 because: The TaskExecutor is shutting down.
2025-02-24 12:36:41,196 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Unregistering task executor 0ab3cda26ee84c4d17a07170cc3d6512 from the slot manager.
2025-02-24 12:36:41,196 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot 1b034d142f1d190f7a90a2b6d1ed559e.
2025-02-24 12:36:41,196 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot 2750e82c8c6933e02f6430036ad374fb.
2025-02-24 12:36:41,196 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot 1b6d0825dc15af289115be621bfc4d24.
2025-02-24 12:36:41,196 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot daac4949d81d51b41651522e10ad4382.
2025-02-24 12:36:41,196 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot 65daa7e56ca17a98127fee67481debcf.
2025-02-24 12:36:41,196 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Stopping worker cdp-customer-profile-flink-hudi-temp-taskmanager-1-8.
2025-02-24 12:36:41,196 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Stopping TaskManager pod cdp-customer-profile-flink-hudi-temp-taskmanager-1-8.
2025-02-24 12:36:41,261 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Matching resource requirements against available resources.
Missing resources:
	 Job 5c60fca92dab0e8ab0795dd9e706a4c2
		ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=5}
Current resources:
	TaskManager cdp-customer-profile-flink-hudi-temp-taskmanager-1-6
		Available: ResourceProfile{cpuCores=0, taskHeapMemory=0 bytes, taskOffHeapMemory=0 bytes, managedMemory=3 bytes, networkMemory=2 bytes}
		Total:     ResourceProfile{cpuCores=2.5, taskHeapMemory=741.000mb (776994800 bytes), taskOffHeapMemory=0 bytes, managedMemory=797.600mb (836344228 bytes), networkMemory=199.400mb (209086057 bytes)}
	TaskManager cdp-customer-profile-flink-hudi-temp-taskmanager-1-7
		Available: ResourceProfile{cpuCores=0, taskHeapMemory=0 bytes, taskOffHeapMemory=0 bytes, managedMemory=3 bytes, networkMemory=2 bytes}
		Total:     ResourceProfile{cpuCores=2.5, taskHeapMemory=741.000mb (776994800 bytes), taskOffHeapMemory=0 bytes, managedMemory=797.600mb (836344228 bytes), networkMemory=199.400mb (209086057 bytes)}
	TaskManager cdp-customer-profile-flink-hudi-temp-taskmanager-1-4
		Available: ResourceProfile{cpuCores=0, taskHeapMemory=0 bytes, taskOffHeapMemory=0 bytes, managedMemory=3 bytes, networkMemory=2 bytes}
		Total:     ResourceProfile{cpuCores=2.5, taskHeapMemory=741.000mb (776994800 bytes), taskOffHeapMemory=0 bytes, managedMemory=797.600mb (836344228 bytes), networkMemory=199.400mb (209086057 bytes)}
	TaskManager cdp-customer-profile-flink-hudi-temp-taskmanager-1-5
		Available: ResourceProfile{cpuCores=0, taskHeapMemory=0 bytes, taskOffHeapMemory=0 bytes, managedMemory=3 bytes, networkMemory=2 bytes}
		Total:     ResourceProfile{cpuCores=2.5, taskHeapMemory=741.000mb (776994800 bytes), taskOffHeapMemory=0 bytes, managedMemory=797.600mb (836344228 bytes), networkMemory=199.400mb (209086057 bytes)}
2025-02-24 12:36:41,331 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - need request 1 new workers, current worker number 4, declared worker number 5
2025-02-24 12:36:41,331 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=2.5, taskHeapSize=741.000mb (776994800 bytes), taskOffHeapSize=0 bytes, networkMemSize=199.400mb (209086057 bytes), managedMemSize=797.600mb (836344228 bytes), numSlots=5}, current pending count: 1.
2025-02-24 12:36:41,332 INFO  org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled external resources: []
2025-02-24 12:36:41,332 INFO  org.apache.flink.configuration.Configuration                 [] - Config uses fallback configuration key 'kubernetes.service-account' instead of key 'kubernetes.taskmanager.service-account'
2025-02-24 12:36:41,332 INFO  org.apache.flink.configuration.Configuration                 [] - Config uses fallback configuration key 'kubernetes.service-account' instead of key 'kubernetes.taskmanager.service-account'
2025-02-24 12:36:41,332 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - The service account configured in pod template will be overwritten to 'qbm-cdp-ingestion-flink-service-account' because of explicitly configured options.
2025-02-24 12:36:41,334 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating new TaskManager pod with name cdp-customer-profile-flink-hudi-temp-taskmanager-1-9 and resource <2500,2.5>.
2025-02-24 12:36:41,377 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod cdp-customer-profile-flink-hudi-temp-taskmanager-1-9 is created.
2025-02-24 12:36:41,538 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Disconnect TaskExecutor cdp-customer-profile-flink-hudi-temp-taskmanager-1-8 because: The TaskExecutor is shutting down.
2025-02-24 12:36:41,539 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - bucket_write: default_database.customer_profile (21/25) (3af50fb5a53008686151e6064afe51a7_3f6cfb4bf36a2f36a133726411a32a74_20_0) switched from RUNNING to FAILED on cdp-customer-profile-flink-hudi-temp-taskmanager-1-8 @ ip-10-232-117-102.ap-south-1.compute.internal (dataPort=39121).
org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:481) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.lambda$terminate$0(PekkoRpcActor.java:574) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.terminate(PekkoRpcActor.java:573) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:196) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
2025-02-24 12:36:41,547 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 3af50fb5a53008686151e6064afe51a7_3f6cfb4bf36a2f36a133726411a32a74_20_0.
2025-02-24 12:36:41,547 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 3af50fb5a53008686151e6064afe51a7_3f6cfb4bf36a2f36a133726411a32a74_20_0.
2025-02-24 12:36:41,547 INFO  org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Restarting job.
org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:481) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.lambda$terminate$0(PekkoRpcActor.java:574) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.terminate(PekkoRpcActor.java:573) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:196) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
2025-02-24 12:36:41,548 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job cdp-customer-profile-flink-hudi-temp (5c60fca92dab0e8ab0795dd9e706a4c2) switched from state RUNNING to CANCELLING.
2025-02-24 12:36:41,548 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 5 for job 5c60fca92dab0e8ab0795dd9e706a4c2. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1976) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1589) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1155) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1127) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.cancel(DefaultExecutionGraph.java:955) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.Restarting.<init>(Restarting.java:65) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.Restarting$Factory.getState(Restarting.java:160) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.Restarting$Factory.getState(Restarting.java:125) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.transitionToState(AdaptiveScheduler.java:1295) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.goToRestarting(AdaptiveScheduler.java:977) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.FailureResultUtil.restartOrFail(FailureResultUtil.java:28) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.Executing.onFailure(Executing.java:93) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph.updateTaskExecutionState(StateWithExecutionGraph.java:387) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$updateTaskExecutionState$4(AdaptiveScheduler.java:560) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.State.tryCall(State.java:136) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.updateTaskExecutionState(AdaptiveScheduler.java:557) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1645) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1144) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1084) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:785) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:129) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot.release(SharedSlot.java:144) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:482) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:474) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:445) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:275) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:231) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:549) ~[flink-dist-1.18.1.jar:1.18.1]
	at jdk.internal.reflect.GeneratedMethodAccessor404.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) ~[flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akkab03b76dc-1f97-48c6-9a76-a2d0a4b7bea8.jar:1.18.1]
	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
2025-02-24 12:36:41,550 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: cdp_identify_temp_perf -> Map -> *anonymous_datastream_source$1*[1] -> Calc[2] -> row_data_to_hoodie_record (1/20) (3af50fb5a53008686151e6064afe51a7_cbc357ccb763df2852fee8c4fc7d55f2_0_4) switched from RUNNING to CANCELING.
2025-02-24 12:36:41,550 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: cdp_identify_temp_
Image

Thanks
Mahesh

@cshuo
Copy link
Contributor

cshuo commented Feb 25, 2025

@maheshguptags Based on the log you provided, the issue still cannot be pinpointed. I'm trying to reproduce the problem locally based on:

  • Using Datagen source to produce certain amount of data, and writing to hudi table.
  • Using MiniCluster to simulate deleting of taskmanager.

However I cannot reproduce the problem. The demo code is here, can you try reproducing the problem based on the code? It would be great if you can reproduce the problem based on the demo code, so we can further figure out the problem.

@maheshguptags
Copy link
Author

@cshuo I will try to run the demo code locally. However, upon reviewing the code, it looks like the MDT hasn't been enabled for the Hudi table during its creation. I'm assuming you're using Hudi 1.0.0.

The configuration for MDT that I've been using when creating the Flink Hudi table is:
'metadata.enabled'='true', 'hoodie.write.concurrency.mode'='SINGLE_WRITER', 'hoodie.write.lock.provider'='org.apache.hudi.client.transaction.lock.InProcessLockProvider'.

Could you please try adding this configuration on your end?

@cshuo
Copy link
Contributor

cshuo commented Feb 25, 2025

@cshuo I will try to run the demo code locally. However, upon reviewing the code, it looks like the MDT hasn't been enabled for the Hudi table during its creation. I'm assuming you're using Hudi 1.0.0.

The configuration for MDT that I've been using when creating the Flink Hudi table is: 'metadata.enabled'='true', 'hoodie.write.concurrency.mode'='SINGLE_WRITER', 'hoodie.write.lock.provider'='org.apache.hudi.client.transaction.lock.InProcessLockProvider'.

Could you please try adding this configuration on your end?

Yes, I'm using 1.0.0, and actually MDT is enabled by default, and concurrency.mode is 'SINGLE_WRITER' by default.

@maheshguptags
Copy link
Author

@cshuo is it possible to get on call? we can discuss the issue and reproducible steps.
Please let me know your availability so I can send you the invite.

thanks
Mahesh

@maheshguptags
Copy link
Author

maheshguptags commented Feb 25, 2025

@cshuo, I ran the sample code you provided, and it works as expected. However, it doesn't replicate the issue I'm encountering. In your code, the Task Manager terminates after the checkpointing is completed, but in our case, we manually delete the Task Manager while the checkpointing is still in progress and hasn't been completed yet.

Additionally, I am providing the MDT configuration when creating the DDL. Do you think this could have an impact? From my understanding, it shouldn't.

Could you try deleting the Task Manager while checkpointing is ongoing?

Thanks!

@maheshguptags
Copy link
Author

Thanks to @cshuo for connecting over the call and offering suggestions.

I spoke with @cshuo, and he recommended removing hoodie.write.lock.provider while keeping these two configurations as they are: 'metadata.enabled'='true' and 'hoodie.write.concurrency.mode'='SINGLE_WRITER', with a fresh table. However, it seems that hoodie.write.lock.provider is still required if we include these two configurations when creating the table.

@cshuo, one question arises: If metadata.enabled and concurrency.mode is enabled by default, why isn’t this error occurring in your example?

Another approach I could try is to completely remove it(MDT-related config), as you did(keep default), and then do the testing.

also attaching table creation DDL as discussed :

app.streamingDdlQuery=CREATE TABLE IF NOT EXISTS hudi_table( address STRING, x STRING,company_name STRING,first_active_date TIMESTAMP(3),last_active_date TIMESTAMP(3),y STRING ,ts TIMESTAMP(3),user_id STRING)PARTITIONED BY (client_id)WITH ('connector' = 'hudi','write.task.max.size' = '2048','write.merge.max_memory' = '1024','path' = 's3a:/tmp/hudi_table/','table.type' = 'COPY_ON_WRITE','hoodie.datasource.write.recordkey.field' = 'x,y','payload.class'='com.gupshup.cdp.poc','precombine.field'='ts','hoodie.clean.async'='true','hoodie.cleaner.policy' = 'KEEP_LATEST_COMMITS', 'hoodie.clean.automatic' = 'true','hoodie.clean.max.commits'='8','hoodie.clean.trigger.strategy'='NUM_COMMITS','hoodie.cleaner.parallelism'='100','hoodie.cleaner.commits.retained'='6', 'hoodie.index.type'= 'BUCKET','hoodie.index.bucket.engine' = 'SIMPLE','hoodie.bucket.index.num.buckets'='16','hoodie.bucket.index.hash.field'='y','hoodie.parquet.small.file.limit'='104857600','hoodie.parquet.compression.codec'='snappy','hoodie.schema.on.read.enable'= 'true','hoodie.archive.automatic'='true','hoodie.keep.max.commits'= '45','hoodie.keep.min.commits'= '30','metadata.enabled'='true','hoodie.write.concurrency.mode'='SINGLE_WRITER')

error after enabling

 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:170) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.handleExecutionGraphCreation(CreatingExecutionGraph.java:127) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.lambda$null$0(CreatingExecutionGraph.java:84) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.runIfState(AdaptiveScheduler.java:1246) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$runIfState$29(AdaptiveScheduler.java:1261) ~[flink-dist-1.18.1.jar:1.18.1]
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
	at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) ~[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) ~[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) ~[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) ~[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) ~[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:111) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:98) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:211) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:319) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:352) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:197) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:185) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165) ~[flink-dist-1.18.1.jar:1.18.1]
	... 34 more
Caused by: java.lang.reflect.InvocationTargetException
	at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
	at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:111) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:98) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:211) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:319) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:352) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:197) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:185) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165) ~[flink-dist-1.18.1.jar:1.18.1]
	... 34 more
Caused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation
	at org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:89) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
	at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:111) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:98) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:211) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:319) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:352) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:197) ~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:185) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165) ~[flink-dist-1.18.1.jar:1.18.1]
	... 34 more

@maheshguptags
Copy link
Author

@cshuo Since MDT is the default in 1.0 hudi and it requires the lock. The File system lock is not recommended for Production and S3. Also org.apache.hudi.client.transaction.lock.InProcessLockProvider is not working as expected so the remaining option is zookeeper and dynamoDb.

Another approach I could try is to completely remove it(MDT-related config), as you did(keep default), and then do the testing.

Yes, it requires a lock if not provide any MDT config.

Caused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme: s3a, since this fs can not support atomic creation
	at org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:89) ~[blob_p-eb02881ebbe30d32963c608fce24f92c3ebe2ecd-ab4882c7a4628598f7d5e2561bd74c02:?]

Will keep you posted.

@cshuo
Copy link
Contributor

cshuo commented Feb 27, 2025

@cshuo Since MDT is the default in 1.0 hudi and it requires the lock. The File system lock is not recommended for Production and S3. Also org.apache.hudi.client.transaction.lock.InProcessLockProvider is not working as expected so the remaining option is zookeeper and dynamoDb.

Another approach I could try is to completely remove it(MDT-related config), as you did(keep default), and then do the testing.

Yes, it requires a lock if not provide any MDT config.

Caused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme: s3a, since this fs can not support atomic creation
	at org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:89) ~[blob_p-eb02881ebbe30d32963c608fce24f92c3ebe2ecd-ab4882c7a4628598f7d5e2561bd74c02:?]

Will keep you posted.

@maheshguptags thks for updating, as discussed in the call, there are directions you can try:

  • Validate whether the source can be set to the last success ckp properly, i.e., the source should emit records after failover(maybe you can substitute the kafka source with datagen source, and check whether the problem still exits);
  • If the problem still exits, you can try reproduce the problem with a smaller flink parallelism, and provide all the taskmanager.log and jobmanager.log files here. (the whole file, including logs before and after ckp failover).

Could you try deleting the Task Manager while checkpointing is ongoing?

btw, I tried this, and unfortunately, the problem can not be reproduced either.

Image

@maheshguptags
Copy link
Author

@cshuo Sure, I will give it a try for lower parallelism.
I have a question: the same configuration is working with Hudi 0.15 where MDT is not enabled by default(Not sure why but it seems some issue is with MDT).

@maheshguptags
Copy link
Author

I did try the same configuration with hudi 0.15 and it is giving me the expected result unlike hudi 1.0.0.
here is the screenshot for the same along with the table creation schema and hoodie.properties.

#Properties saved on 2025-02-27T07:54:16.016244Z
#Thu Feb 27 07:54:16 UTC 2025
hoodie.table.type=COPY_ON_WRITE
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=x
hoodie.archivelog.folder=archived
hoodie.table.cdc.enabled=false
hoodie.timeline.layout.version=1
hoodie.table.checksum=1292384652
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.recordkey.fields=x,y
hoodie.table.name=customer_profile
hoodie.datasource.write.hive_style_partitioning=false
hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator
hoodie.database.name=default_database
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.version=6

app.streamingDdlQuery=CREATE TABLE IF NOT EXISTS customer_profile(x STRING,y STRING,created_date TIMESTAMP(3), ts STRING)PARTITIONED BY (x)WITH ('connector' = 'hudi','write.task.max.size' = '2048','write.merge.max_memory' = '1024','path' = '${app.cdp.base.path}/customer_profile_temp_6/','table.type' = 'COPY_ON_WRITE','hoodie.datasource.write.recordkey.field' = 'x,y','payload.class'='com.gupshup.cdp.poc','precombine.field'='ts','hoodie.clean.async'='true','hoodie.cleaner.policy' = 'KEEP_LATEST_COMMITS','hoodie.clean.automatic' = 'true','hoodie.clean.max.commits'='8','hoodie.clean.trigger.strategy'='NUM_COMMITS','hoodie.cleaner.parallelism'='100','hoodie.cleaner.commits.retained'='6', 'hoodie.index.type'= 'BUCKET','hoodie.index.bucket.engine' = 'SIMPLE','hoodie.bucket.index.num.buckets'='16','hoodie.bucket.index.hash.field'='y','hoodie.parquet.small.file.limit'='104857600','hoodie.parquet.compression.codec'='snappy','hoodie.schema.on.read.enable'= 'true','hoodie.archive.automatic'='true','hoodie.keep.max.commits'= '45','hoodie.keep.min.commits'= '30')

ss: ingested 2.5M record and manually killed TM on 2nd chkpntg and it is not losing any data and successfully writing to hudi table in 3rd checkpointing.
post 3rd chkpnt no data ingested so it completing with MS

Image

Not sure but it seem this is the issue with MDT

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
data-loss loss of data only, use data-consistency label for inconsistent view flink Issues related to flink priority:critical production down; pipelines stalled; Need help asap.
Projects
Status: Awaiting Triage
Development

No branches or pull requests

4 participants