diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/MQTTNamespaceBundleOwnershipListener.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/MQTTNamespaceBundleOwnershipListener.java index 22a771183..17510354c 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/MQTTNamespaceBundleOwnershipListener.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/MQTTNamespaceBundleOwnershipListener.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -54,11 +55,13 @@ public void unLoad(NamespaceBundle bundle) { .collect(Collectors.toList())) .thenAccept(topics -> { log.info("unload namespace bundle : {}, topics : {}", bundle, topics); - listeners.forEach(listener -> { - if (listener.test(bundle.getNamespaceObject())) { - topics.forEach(topic -> listener.unload(TopicName.get(topic))); - } - }); + if (CollectionUtils.isNotEmpty(listeners)) { + listeners.forEach(listener -> { + if (listener.test(bundle.getNamespaceObject())) { + topics.forEach(topic -> listener.unload(TopicName.get(topic))); + } + }); + } }).exceptionally(ex -> { log.error("unload namespace bundle :{} error", bundle, ex); return null;