diff --git a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/model/EnabledState.java b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/model/EnabledState.java index ca4dab087b..eea91f440a 100644 --- a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/model/EnabledState.java +++ b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/model/EnabledState.java @@ -44,9 +44,9 @@ public EnabledState(final boolean enabled) { this.enabled = enabled; } - /** @return Time to (re-)enable */ + /** @return Time to (re-)enable, or null if no date is set */ public String getDateString() { - return enabled_date.format(formatter); + return enabled_date != null ? enabled_date.format(formatter) : ""; } @Override diff --git a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmCmdLogger.java b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmCmdLogger.java index 9878516e7f..3a1ab27433 100644 --- a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmCmdLogger.java +++ b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmCmdLogger.java @@ -5,10 +5,8 @@ import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalUnit; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -18,6 +16,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Transformer; @@ -48,6 +47,11 @@ public class AlarmCmdLogger implements Runnable { private IndexNameHelper indexNameHelper; + private volatile boolean shouldReconnect = true; + private volatile KafkaStreams currentStreams = null; + private final long reconnectDelayMs; + private Thread shutdownHook = null; + /** * Create a alarm command message logger for the given topic. * This runnable will create the kafka streams for the given alarm messages which match the format 'topicCommand' @@ -60,25 +64,90 @@ public AlarmCmdLogger(String topic) throws Exception { MessageParser messageParser = new MessageParser(AlarmCommandMessage.class); alarmCommandMessageSerde = Serdes.serdeFrom(messageParser, messageParser); + + // Read reconnect delay from system property, default to 30 seconds + this.reconnectDelayMs = Long.parseLong( + System.getProperty("kafka.reconnect.delay.ms", "30000") + ); } @Override public void run() { - logger.info("Starting the cmd stream consumer for " + topic); Properties props = new Properties(); props.putAll(PropertiesHelper.getProperties()); + final String indexDateSpanUnits = props.getProperty("date_span_units"); + final boolean useDatedIndexNames = Boolean.parseBoolean(props.getProperty("use_dated_index_names")); + + try { + indexNameHelper = new IndexNameHelper(topic + INDEX_FORMAT, useDatedIndexNames, indexDateSpanUnits); + } catch (Exception ex) { + logger.log(Level.SEVERE, "Time based index creation failed.", ex); + } + + // Register shutdown hook once before retry loop + shutdownHook = new Thread("streams-" + topic + "-alarm-cmd-shutdown-hook") { + @Override + public void run() { + logger.info("Shutdown hook triggered for topic " + topic); + shouldReconnect = false; + if (currentStreams != null) { + logger.info("Closing Kafka Streams for topic " + topic); + currentStreams.close(Duration.of(10, ChronoUnit.SECONDS)); + currentStreams = null; + } + logger.info("Shutting cmd streams down for topic " + topic); + } + }; + Runtime.getRuntime().addShutdownHook(shutdownHook); + + // Retry loop for handling missing topics + while (shouldReconnect) { + try { + startKafkaStreams(props); + // If we get here, streams shut down normally + break; + } catch (Exception e) { + logger.log(Level.SEVERE, "Failed to start Kafka Streams for topic " + topic + + ", will retry in " + reconnectDelayMs + "ms", e); + + if (!shouldReconnect) { + break; + } + + try { + Thread.sleep(reconnectDelayMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.info("Reconnection loop interrupted for topic " + topic); + break; + } + } + } + + // Clean up shutdown hook when we're done + try { + if (shutdownHook != null) { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } catch (IllegalStateException e) { + // Ignore - shutdown already in progress + } + + logger.info("Alarm cmd logger for topic " + topic + " has shut down"); + } + + private void startKafkaStreams(Properties props) throws Exception { + logger.info("Attempting to start Kafka Streams for topic " + topic); + Properties kafkaProps = KafkaHelper.loadPropsFromFile(props.getProperty("kafka_properties","")); kafkaProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-" + topic + "-alarm-cmd"); - if (props.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)){ - kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, - props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)); - } else { - kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - } + kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, + props.getOrDefault(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")); StreamsBuilder builder = new StreamsBuilder(); KStream alarms = builder.stream(topic + "Command", Consumed @@ -91,14 +160,6 @@ public long extract(ConsumerRecord record, long previousTimestam } })); - final String indexDateSpanUnits = props.getProperty("date_span_units"); - final boolean useDatedIndexNames = Boolean.parseBoolean(props.getProperty("use_dated_index_names")); - - try { - indexNameHelper = new IndexNameHelper(topic + INDEX_FORMAT, useDatedIndexNames, indexDateSpanUnits); - } catch (Exception ex) { - logger.log(Level.SEVERE, "Time based index creation failed.", ex); - } KStream timeStampedAlarms = alarms.transform(new TransformerSupplier>() { @Override @@ -132,27 +193,58 @@ public void close() { String topic_name = indexNameHelper.getIndexName(v.getMessage_time()); ElasticClientHelper.getInstance().indexAlarmCmdDocument(topic_name, v); }); + final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaProps); - final CountDownLatch latch = new CountDownLatch(1); - // attach shutdown handler to catch control-c - Runtime.getRuntime().addShutdownHook(new Thread("streams-" + topic + "-alarm-cmd-shutdown-hook") { - @Override - public void run() { - streams.close(Duration.of(10, ChronoUnit.SECONDS)); - System.out.println("\nShutting cmd streams Done."); + // Store reference for cleanup (volatile ensures visibility across threads) + currentStreams = streams; + + streams.setUncaughtExceptionHandler(exception -> { + logger.log(Level.SEVERE, "Stream exception encountered for topic " + topic + ": " + + exception.getMessage(), exception); + + // Check if it's a missing source topic exception + if (exception.getCause() instanceof org.apache.kafka.streams.errors.MissingSourceTopicException || + exception instanceof org.apache.kafka.streams.errors.MissingSourceTopicException) { + logger.log(Level.WARNING, "Missing source topic detected for " + topic + + ". Will retry connection in " + reconnectDelayMs + "ms"); + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + } + + // For other exceptions, stop retry + logger.log(Level.SEVERE, "Unrecoverable stream exception for topic " + topic, exception); + shouldReconnect = false; + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + }); + + // Simple latch to wait for streams to stop + final CountDownLatch latch = new CountDownLatch(1); + streams.setStateListener((newState, oldState) -> { + if (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR) { latch.countDown(); } }); try { streams.start(); + logger.info("Kafka Streams started for topic " + topic); + + // Wait for streams to stop (either due to exception or shutdown) latch.await(); - } catch (Throwable e) { - System.exit(1); - } - System.exit(0); + // If stopped due to error, throw to trigger retry + if (streams.state() == KafkaStreams.State.ERROR) { + throw new Exception("Streams stopped with ERROR state"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new Exception("Interrupted", e); + } finally { + if (currentStreams != null) { + currentStreams.close(Duration.of(10, ChronoUnit.SECONDS)); + currentStreams = null; + } + } } } diff --git a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java index 1fb346636b..8e29dc51e5 100644 --- a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java +++ b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java @@ -11,7 +11,6 @@ import java.util.logging.Level; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -19,6 +18,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.kstream.Branched; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; @@ -48,6 +48,11 @@ public class AlarmMessageLogger implements Runnable { private static final String CONFIG_INDEX_FORMAT = "_alarms_config"; private static final String STATE_INDEX_FORMAT = "_alarms_state"; + private volatile boolean shouldReconnect = true; + private volatile KafkaStreams currentStreams = null; + private final long reconnectDelayMs; + private Thread shutdownHook = null; + /** * Create a alarm logger for the alarm messages (both state and configuration) * for a given alarm server topic. @@ -62,6 +67,10 @@ public AlarmMessageLogger(String topic) { MessageParser messageParser = new MessageParser<>(AlarmMessage.class); alarmMessageSerde = Serdes.serdeFrom(messageParser, messageParser); + // Read reconnect delay from system property, default to 30 seconds + this.reconnectDelayMs = Long.parseLong( + System.getProperty("kafka.reconnect.delay.ms", "30000") + ); } @Override @@ -71,6 +80,72 @@ public void run() { Properties props = new Properties(); props.putAll(PropertiesHelper.getProperties()); + final String indexDateSpanUnits = props.getProperty("date_span_units"); + final boolean useDatedIndexNames = Boolean.parseBoolean(props.getProperty("use_dated_index_names")); + + try { + stateIndexNameHelper = new IndexNameHelper(topic + STATE_INDEX_FORMAT, useDatedIndexNames, indexDateSpanUnits); + configIndexNameHelper = new IndexNameHelper(topic + CONFIG_INDEX_FORMAT , useDatedIndexNames, indexDateSpanUnits); + } catch (Exception ex) { + logger.log(Level.SEVERE, "Time based index creation failed.", ex); + } + + // Register shutdown hook once before retry loop + shutdownHook = new Thread("streams-"+topic+"-alarm-messages-shutdown-hook") { + @Override + public void run() { + logger.info("Shutdown hook triggered for topic " + topic); + shouldReconnect = false; + if (currentStreams != null) { + logger.info("Closing Kafka Streams for topic " + topic); + currentStreams.close(Duration.of(10, ChronoUnit.SECONDS)); + currentStreams = null; + } + logger.info("Shutting streams down for topic " + topic); + } + }; + Runtime.getRuntime().addShutdownHook(shutdownHook); + + // Retry loop for handling missing topics + while (shouldReconnect) { + try { + startKafkaStreams(props); + // If we get here, streams shut down normally + break; + } catch (Exception e) { + logger.log(Level.SEVERE, "Failed to start Kafka Streams for topic " + topic + + ", will retry in " + reconnectDelayMs + "ms", e); + + if (!shouldReconnect) { + break; + } + + try { + Thread.sleep(reconnectDelayMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.info("Reconnection loop interrupted for topic " + topic); + break; + } + } + } + + // Clean up shutdown hook when we're done + try { + if (shutdownHook != null) { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } catch (IllegalStateException e) { + // Ignore - shutdown already in progress + } + + logger.info("Alarm message logger for topic " + topic + " has shut down"); + } + + private void startKafkaStreams(Properties props) throws Exception { + logger.info("Attempting to start Kafka Streams for topic " + topic); + Properties kafkaProps = KafkaHelper.loadPropsFromFile(props.getProperty("kafka_properties","")); kafkaProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-"+topic+"-alarm-messages"); @@ -84,19 +159,11 @@ public void run() { final String group_id = "Alarm-" + UUID.randomUUID(); kafkaProps.put("group.id", group_id); + AlarmSystemConstants.logger.fine(kafkaProps.getProperty("group.id") + " subscribes to " + + kafkaProps.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG) + " for " + topic); AlarmSystemConstants.logger.fine(kafkaProps.getProperty("group.id") + " subscribes to " + kafkaProps.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG) + " for " + topic); - final String indexDateSpanUnits = props.getProperty("date_span_units"); - final boolean useDatedIndexNames = Boolean.parseBoolean(props.getProperty("use_dated_index_names")); - - try { - stateIndexNameHelper = new IndexNameHelper(topic + STATE_INDEX_FORMAT, useDatedIndexNames, indexDateSpanUnits); - configIndexNameHelper = new IndexNameHelper(topic + CONFIG_INDEX_FORMAT , useDatedIndexNames, indexDateSpanUnits); - } catch (Exception ex) { - logger.log(Level.SEVERE, "Time based index creation failed.", ex); - } - // Attach a message time stamp. StreamsBuilder builder = new StreamsBuilder(); @@ -131,25 +198,56 @@ public long extract(ConsumerRecord record, long previousTimestam })); final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaProps); - final CountDownLatch latch = new CountDownLatch(1); - // attach shutdown handler to catch control-c - Runtime.getRuntime().addShutdownHook(new Thread("streams-"+topic+"-alarm-messages-shutdown-hook") { - @Override - public void run() { - streams.close(Duration.of(10, ChronoUnit.SECONDS)); - System.out.println("\nShutting streams Done."); + // Store reference for cleanup (volatile ensures visibility across threads) + currentStreams = streams; + + streams.setUncaughtExceptionHandler(exception -> { + logger.log(Level.SEVERE, "Stream exception encountered for topic " + topic + ": " + + exception.getMessage(), exception); + + // Check if it's a missing source topic exception + if (exception.getCause() instanceof org.apache.kafka.streams.errors.MissingSourceTopicException || + exception instanceof org.apache.kafka.streams.errors.MissingSourceTopicException) { + logger.log(Level.WARNING, "Missing source topic detected for " + topic + + ". Will retry connection in " + reconnectDelayMs + "ms"); + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + } + + // For other exceptions, stop retry + logger.log(Level.SEVERE, "Unrecoverable stream exception for topic " + topic, exception); + shouldReconnect = false; + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + }); + + // Simple latch to wait for streams to stop + final CountDownLatch latch = new CountDownLatch(1); + streams.setStateListener((newState, oldState) -> { + if (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR) { latch.countDown(); } }); try { streams.start(); + logger.info("Kafka Streams started for topic " + topic); + + // Wait for streams to stop (either due to exception or shutdown) latch.await(); - } catch (Throwable e) { - System.exit(1); + + // If stopped due to error, throw to trigger retry + if (streams.state() == KafkaStreams.State.ERROR) { + throw new Exception("Streams stopped with ERROR state"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new Exception("Interrupted", e); + } finally { + if (currentStreams != null) { + currentStreams.close(Duration.of(10, ChronoUnit.SECONDS)); + currentStreams = null; + } } - System.exit(0); } private void processAlarmStateStream(KStream alarmStateBranch) { diff --git a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/rest/AlarmLogSearchUtil.java b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/rest/AlarmLogSearchUtil.java index 42633da568..737701aba3 100644 --- a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/rest/AlarmLogSearchUtil.java +++ b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/rest/AlarmLogSearchUtil.java @@ -77,7 +77,7 @@ public class AlarmLogSearchUtil { */ public static List search(ElasticsearchClient client, Map searchParameters) { - logger.info("searching for alarm log entires : " + + logger.fine("searching for alarm log entires : " + searchParameters.entrySet().stream().map(e -> e.getKey() + ": " + e.getValue()).collect(Collectors.joining())); Instant fromInstant = Instant.EPOCH; diff --git a/services/alarm-logger/src/main/resources/application.properties b/services/alarm-logger/src/main/resources/application.properties index 466d8341d6..64079aeb76 100644 --- a/services/alarm-logger/src/main/resources/application.properties +++ b/services/alarm-logger/src/main/resources/application.properties @@ -67,3 +67,5 @@ retain_indices_count=0 # Incorrect syntax will fail service startup if retention_period_days >= 100. purge_cron_expr=0 0 0 * * SUN ############################################################################## + +logging.config=classpath:alarm_logger_logging.properties \ No newline at end of file