diff --git a/docs/DESIGN_AND_IMPLEMENTATION.md b/docs/DESIGN_AND_IMPLEMENTATION.md index 4bd4655..2ec4a67 100644 --- a/docs/DESIGN_AND_IMPLEMENTATION.md +++ b/docs/DESIGN_AND_IMPLEMENTATION.md @@ -161,7 +161,6 @@ Invalid values are ignored and the default `header_with_nodetime_fallback` is us `GreenwaveMonitor::has_header_from_type()`; unknown types fall back to no-header behavior. - `publishDiagnostics()` marks status as `STALE` if no fresh `updateDiagnostics()` happened since the previous publish. - `setExpectedDt()` requires `expected_hz > 0`; zero disables useful timing checks. -- Do not add externally monitored topics (those already using inline `GreenwaveDiagnostics`) via `manage_topic` or the terminal UI, as this will create a duplicate diagnostic subscription for the same topic. ## Where To Look In Code diff --git a/greenwave_monitor/include/greenwave_monitor.hpp b/greenwave_monitor/include/greenwave_monitor.hpp index 4302888..5d3389e 100644 --- a/greenwave_monitor/include/greenwave_monitor.hpp +++ b/greenwave_monitor/include/greenwave_monitor.hpp @@ -20,7 +20,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -48,6 +50,9 @@ class GreenwaveMonitor : public rclcpp::Node if (init_timer_) { init_timer_->cancel(); } + // Reset diagnostics subscription before clearing internal state to prevent + // callbacks from firing after greenwave_diagnostics_ is destroyed + diagnostics_subscription_.reset(); // Clear diagnostics before base Node destructor runs to avoid accessing invalid node state greenwave_diagnostics_.clear(); subscriptions_.clear(); @@ -88,6 +93,8 @@ class GreenwaveMonitor : public rclcpp::Node void add_topics_from_parameters(); + void diagnostics_callback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg); + std::map> greenwave_diagnostics_; std::vector> subscriptions_; @@ -98,4 +105,8 @@ class GreenwaveMonitor : public rclcpp::Node rclcpp::Service::SharedPtr set_expected_frequency_service_; greenwave_diagnostics::TimeCheckPreset time_check_preset_; + rclcpp::Subscription::SharedPtr + diagnostics_subscription_; + std::set externally_diagnosed_topics_; + std::mutex externally_diagnosed_topics_mutex_; }; diff --git a/greenwave_monitor/src/greenwave_monitor.cpp b/greenwave_monitor/src/greenwave_monitor.cpp index 48b26ef..28941f0 100644 --- a/greenwave_monitor/src/greenwave_monitor.cpp +++ b/greenwave_monitor/src/greenwave_monitor.cpp @@ -99,6 +99,14 @@ GreenwaveMonitor::GreenwaveMonitor(const rclcpp::NodeOptions & options) timer_ = this->create_wall_timer( 1s, std::bind(&GreenwaveMonitor::timer_callback, this)); + // Subscribe to /diagnostics early so we can detect external publishers before + // deferred_init() adds topics. This gives us the best chance of catching + // externally-published diagnostics before add_topic() is called. + diagnostics_subscription_ = + this->create_subscription( + "/diagnostics", 10, + std::bind(&GreenwaveMonitor::diagnostics_callback, this, std::placeholders::_1)); + // Defer topic discovery to allow the ROS graph to settle before querying other nodes init_timer_ = this->create_wall_timer( 100ms, [this]() { @@ -170,6 +178,19 @@ void GreenwaveMonitor::timer_callback() RCLCPP_INFO(this->get_logger(), "===================================================="); } +void GreenwaveMonitor::diagnostics_callback( + const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg) +{ + std::lock_guard lock(externally_diagnosed_topics_mutex_); + for (const auto & status : msg->status) { + // Only track topic names that are not already monitored by us. This prevents + // our own published diagnostics from blocking re-adds after a remove_topic(). + if (greenwave_diagnostics_.find(status.name) == greenwave_diagnostics_.end()) { + externally_diagnosed_topics_.insert(status.name); + } + } +} + void GreenwaveMonitor::handle_manage_topic( const std::shared_ptr request, std::shared_ptr response) @@ -319,6 +340,20 @@ bool GreenwaveMonitor::has_header_from_type(const std::string & type_name) bool GreenwaveMonitor::add_topic( const std::string & topic, std::string & message, int max_retries, double retry_wait_s) { + // Check if an external node is already publishing diagnostics for this topic. + // Adding a duplicate would create redundant and potentially conflicting diagnostics. + { + std::lock_guard lock(externally_diagnosed_topics_mutex_); + if (externally_diagnosed_topics_.count(topic) > 0) { + message = "Topic is externally monitored"; + RCLCPP_WARN( + this->get_logger(), + "Refusing to add topic '%s': topic is externally monitored", + topic.c_str()); + return false; + } + } + // Check if topic already exists if (greenwave_diagnostics_.find(topic) != greenwave_diagnostics_.end()) { message = "Topic already being monitored"; @@ -361,6 +396,18 @@ bool GreenwaveMonitor::add_topic( bool GreenwaveMonitor::remove_topic(const std::string & topic, std::string & message) { + { + std::lock_guard lock(externally_diagnosed_topics_mutex_); + if (externally_diagnosed_topics_.count(topic) > 0) { + message = "Topic is externally monitored"; + RCLCPP_WARN( + this->get_logger(), + "Refusing to remove topic '%s': topic is externally monitored", + topic.c_str()); + return false; + } + } + auto diag_it = greenwave_diagnostics_.find(topic); if (diag_it == greenwave_diagnostics_.end()) { message = "Topic not found"; diff --git a/greenwave_monitor/test/test_greenwave_monitor.py b/greenwave_monitor/test/test_greenwave_monitor.py index 4fd10c4..2c8c8dd 100644 --- a/greenwave_monitor/test/test_greenwave_monitor.py +++ b/greenwave_monitor/test/test_greenwave_monitor.py @@ -23,6 +23,8 @@ import time import unittest +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus + from greenwave_monitor.test_utils import ( call_manage_topic_service, collect_diagnostics_for_topic, @@ -415,6 +417,78 @@ def test_yaml_parameter_loading(self, expected_frequency, message_type, toleranc f'Topic {NONEXISTENT_TOPIC} should not be monitored' ) + def test_reject_add_externally_diagnosed_topic( + self, expected_frequency, message_type, tolerance_hz): + """Test that add_topic() fails when the topic already has external diagnostics.""" + if (message_type, expected_frequency, tolerance_hz) != MANAGE_TOPIC_TEST_CONFIG: + self.skipTest('Only running external diagnostics test once') + + service_client = self.check_node_launches_successfully() + external_topic = '/external_diag_topic' + diag_pub = self.test_node.create_publisher(DiagnosticArray, '/diagnostics', 10) + try: + end_time = time.time() + 3.0 + while time.time() < end_time: + msg = DiagnosticArray() + status = DiagnosticStatus() + status.name = external_topic + status.level = DiagnosticStatus.OK + status.message = 'External publisher' + msg.status = [status] + diag_pub.publish(msg) + rclpy.spin_once(self.test_node, timeout_sec=0.1) + + # Greenwave Monitor should refuse to add the topic because an external + # node is already publishing diagnostics for it. + response = self.call_manage_topic( + add=True, topic=external_topic, service_client=service_client) + self.assertFalse( + response.success, + 'add_topic() should fail when external diagnostics already exist for the topic' + ) + self.assertIn( + 'externally monitored', response.message.lower(), + 'Error message should mention topic is externally monitored' + ) + finally: + self.test_node.destroy_publisher(diag_pub) + + def test_reject_remove_externally_diagnosed_topic( + self, expected_frequency, message_type, tolerance_hz): + """Test that remove_topic() fails when the topic is externally monitored.""" + if (message_type, expected_frequency, tolerance_hz) != MANAGE_TOPIC_TEST_CONFIG: + self.skipTest('Only running external diagnostics removal test once') + + service_client = self.check_node_launches_successfully() + + external_topic = '/external_remove_topic' + + diag_pub = self.test_node.create_publisher(DiagnosticArray, '/diagnostics', 10) + try: + end_time = time.time() + 3.0 + while time.time() < end_time: + msg = DiagnosticArray() + status = DiagnosticStatus() + status.name = external_topic + status.level = DiagnosticStatus.OK + status.message = 'External publisher' + msg.status = [status] + diag_pub.publish(msg) + rclpy.spin_once(self.test_node, timeout_sec=0.1) + + response = self.call_manage_topic( + add=False, topic=external_topic, service_client=service_client) + self.assertFalse( + response.success, + 'remove_topic() should fail when topic is externally monitored' + ) + self.assertIn( + 'externally monitored', response.message.lower(), + 'Error message should mention topic is externally monitored' + ) + finally: + self.test_node.destroy_publisher(diag_pub) + if __name__ == '__main__': unittest.main()