How could I catch all exceptions while starting or running a Spring Cloud Stream app?

How could I catch all exceptions while starting or running a Spring Cloud Stream app?
java
Ethan Jackson

This application is for DBAs and they couldn't modify the source codes. So I must consider all situations in the application from the start of the program to the end of the program and report these problems (wrong configuration, unreachable network, unreachable db, wrong message...). It is for incremental data synchronization from one db(briefly describe db1) to other dbs(briefly describe db2), a program gets redo logs from db1 and send it to kafka, then my app will pull these logs , convert it to sql and use jdbc to execute these sql statements. And while the application start running, It can't bind the topic in configuation, and app stopped because of exception. I wanna catch this exception in main code:

public static void main(String[] args) { try { SpringApplication.run(ConsumerApplication.class, args); } catch (Exception e) { // Here actually I can't catch the ProvisioningException // Check for Kafka provisioning exception in the cause chain Throwable rootCause = getRootCause(e); if (rootCause instanceof org.springframework.cloud.stream.provisioning.ProvisioningException) { logger.error("Failed to start ConsumerApplication: {} - The Kafka topic cannot be found or created. " + "Please check if Kafka is running and the topic exists or can be auto-created.", KAFKA_PROVISIONING_ERROR, e); } else { logger.error("Failed to start ConsumerApplication: {}", e.getMessage(), e); } System.exit(1); // Exit with error code } }

But it can't catch this exception. Here is the output in terminal:

2025-07-20 11:53:06 [kafka-admin-client-thread | adminclient-1] WARN o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node -1 (/172.32.230.85:9092) could not be established. Broker may not be available. 2025-07-20 11:53:28 [kafka-admin-client-thread | adminclient-1] WARN o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Connection to node -1 (/172.32.230.85:9092) could not be established. Broker may not be available. 2025-07-20 11:53:35 [main] ERROR o.s.c.stream.binding.BindingService - Failed to create consumer binding; retrying in 30 seconds org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for testtbl.testtbl at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:377) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:246) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:211) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:96) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:523) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:102) at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:144) at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:186) at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:139) at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:98) at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:833) at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:284) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:467) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:256) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:201) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:965) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:619) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:753) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:455) at org.springframework.boot.SpringApplication.run(SpringApplication.java:323) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1342) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1331) at sync.ConsumerApplication.main(ConsumerApplication.java:53) Caused by: java.util.concurrent.TimeoutException: null at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364) ... 26 common frames omitted 2025-07-20 11:53:45 [kafka-binder-health-1] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-null-1, groupId=null] Bootstrap broker 172.32.230.85:9092 (id: -1 rack: null) disconnected 2025-07-20 11:54:01 [kafka-binder-health-1] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-null-1, groupId=null] Bootstrap broker 172.32.230.85:9092 (id: -1 rack: null) disconnected 2025-07-20 11:54:23 [kafka-binder-health-1] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-null-1, groupId=null] Connection to node -1 (/172.32.230.85:9092) could not be established. Broker may not be available. 2025-07-20 11:54:23 [kafka-binder-health-1] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-null-1, groupId=null] Bootstrap broker 172.32.230.85:9092 (id: -1 rack: null) disconnected 2025-07-20 11:54:36 [RMI TCP Connection(1)-192.168.10.104] WARN o.s.b.a.health.HealthEndpointSupport - Health contributor org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator (binders/kafka) took 60010ms to respond 2025-07-20 11:54:55 [kafka-admin-client-thread | adminclient-2] WARN o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-2] Connection to node -1 (/172.32.230.85:9092) could not be established. Broker may not be available. 2025-07-20 11:55:16 [kafka-admin-client-thread | adminclient-2] WARN o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-2] Connection to node -1 (/172.32.230.85:9092) could not be established. Broker may not be available. 2025-07-20 11:55:16 [scheduling-1] ERROR o.s.c.stream.binding.BindingService - Failed to create consumer binding; retrying in 30 seconds org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for testtbl.testtbl at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:377) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:246) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:211) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:96) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:523) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:102) at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:144) at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:211) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: java.util.concurrent.TimeoutException: null at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364) ... 14 common frames omitted 2025-07-20 11:56:38 [kafka-admin-client-thread | adminclient-3] WARN o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-3] Connection to node -1 (/172.32.230.85:9092) could not be established. Broker may not be available. 2025-07-20 11:56:46 [scheduling-1] ERROR o.s.c.stream.binding.BindingService - Failed to create consumer binding; retrying in 30 seconds org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for testtbl.testtbl at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:377) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:246) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:211) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:96) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:523) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:102) at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:144) at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:211) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: java.util.concurrent.TimeoutException: null at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364) ... 14 common frames omitted 2025-07-20 11:57:46 [kafka-admin-client-thread | adminclient-4] WARN o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-4] Connection to node -1 (/172.32.230.85:9092) could not be established. Broker may not be available. 2025-07-20 11:58:07 [kafka-admin-client-thread | adminclient-4] WARN o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-4] Connection to node -1 (/172.32.230.85:9092) could not be established. Broker may not be available. 2025-07-20 11:58:16 [scheduling-1] ERROR o.s.c.stream.binding.BindingService - Failed to create consumer binding; retrying in 30 seconds org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for testtbl.testtbl at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:377) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:246) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:211) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:96) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:523) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:102) at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:144) at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:211) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: java.util.concurrent.TimeoutException: null at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364) ... 14 common frames omitted 2025-07-20 11:59:34 [kafka-admin-client-thread | adminclient-5] WARN o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-5] Connection to node -1 (/172.32.230.85:9092) could not be established. Broker may not be available.

I try to catch the PrevisionException in the main, but this app's output is not what I excepted.

Answer

  1. Use @ServiceActivator on the .errors channel to catch runtime message exceptions.

  2. Use try-catch blocks inside your consumer functions for fine-grained handling.

  3. Wrap your main method with a try-catch to handle startup exceptions.

Related Articles