Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/DESIGN_AND_IMPLEMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ Invalid values are ignored and the default `header_with_nodetime_fallback` is us
`GreenwaveMonitor::has_header_from_type()`; unknown types fall back to no-header behavior.
- `publishDiagnostics()` marks status as `STALE` if no fresh `updateDiagnostics()` happened since the previous publish.
- `setExpectedDt()` requires `expected_hz > 0`; zero disables useful timing checks.
- Do not add externally monitored topics (those already using inline `GreenwaveDiagnostics`) via `manage_topic` or the terminal UI, as this will create a duplicate diagnostic subscription for the same topic.

## Where To Look In Code

Expand Down
11 changes: 11 additions & 0 deletions greenwave_monitor/include/greenwave_monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include <chrono>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <set>
#include <string>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -48,6 +50,9 @@ class GreenwaveMonitor : public rclcpp::Node
if (init_timer_) {
init_timer_->cancel();
}
// Reset diagnostics subscription before clearing internal state to prevent
// callbacks from firing after greenwave_diagnostics_ is destroyed
diagnostics_subscription_.reset();
// Clear diagnostics before base Node destructor runs to avoid accessing invalid node state
greenwave_diagnostics_.clear();
subscriptions_.clear();
Expand Down Expand Up @@ -88,6 +93,8 @@ class GreenwaveMonitor : public rclcpp::Node

void add_topics_from_parameters();

void diagnostics_callback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg);

std::map<std::string,
std::unique_ptr<greenwave_diagnostics::GreenwaveDiagnostics>> greenwave_diagnostics_;
std::vector<std::shared_ptr<rclcpp::GenericSubscription>> subscriptions_;
Expand All @@ -98,4 +105,8 @@ class GreenwaveMonitor : public rclcpp::Node
rclcpp::Service<greenwave_monitor_interfaces::srv::SetExpectedFrequency>::SharedPtr
set_expected_frequency_service_;
greenwave_diagnostics::TimeCheckPreset time_check_preset_;
rclcpp::Subscription<diagnostic_msgs::msg::DiagnosticArray>::SharedPtr
diagnostics_subscription_;
std::set<std::string> externally_diagnosed_topics_;
std::mutex externally_diagnosed_topics_mutex_;
};
47 changes: 47 additions & 0 deletions greenwave_monitor/src/greenwave_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ GreenwaveMonitor::GreenwaveMonitor(const rclcpp::NodeOptions & options)
timer_ = this->create_wall_timer(
1s, std::bind(&GreenwaveMonitor::timer_callback, this));

// Subscribe to /diagnostics early so we can detect external publishers before
// deferred_init() adds topics. This gives us the best chance of catching
// externally-published diagnostics before add_topic() is called.
diagnostics_subscription_ =
this->create_subscription<diagnostic_msgs::msg::DiagnosticArray>(
"/diagnostics", 10,
std::bind(&GreenwaveMonitor::diagnostics_callback, this, std::placeholders::_1));

// Defer topic discovery to allow the ROS graph to settle before querying other nodes
init_timer_ = this->create_wall_timer(
100ms, [this]() {
Expand Down Expand Up @@ -170,6 +178,19 @@ void GreenwaveMonitor::timer_callback()
RCLCPP_INFO(this->get_logger(), "====================================================");
}

void GreenwaveMonitor::diagnostics_callback(
const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(externally_diagnosed_topics_mutex_);
for (const auto & status : msg->status) {
// Only track topic names that are not already monitored by us. This prevents
// our own published diagnostics from blocking re-adds after a remove_topic().
if (greenwave_diagnostics_.find(status.name) == greenwave_diagnostics_.end()) {
externally_diagnosed_topics_.insert(status.name);
}
}
}

void GreenwaveMonitor::handle_manage_topic(
const std::shared_ptr<greenwave_monitor_interfaces::srv::ManageTopic::Request> request,
std::shared_ptr<greenwave_monitor_interfaces::srv::ManageTopic::Response> response)
Expand Down Expand Up @@ -319,6 +340,20 @@ bool GreenwaveMonitor::has_header_from_type(const std::string & type_name)
bool GreenwaveMonitor::add_topic(
const std::string & topic, std::string & message, int max_retries, double retry_wait_s)
{
// Check if an external node is already publishing diagnostics for this topic.
// Adding a duplicate would create redundant and potentially conflicting diagnostics.
{
std::lock_guard<std::mutex> lock(externally_diagnosed_topics_mutex_);
if (externally_diagnosed_topics_.count(topic) > 0) {
message = "Topic is externally monitored";
RCLCPP_WARN(
this->get_logger(),
"Refusing to add topic '%s': topic is externally monitored",
topic.c_str());
return false;
}
}

// Check if topic already exists
if (greenwave_diagnostics_.find(topic) != greenwave_diagnostics_.end()) {
message = "Topic already being monitored";
Expand Down Expand Up @@ -361,6 +396,18 @@ bool GreenwaveMonitor::add_topic(

bool GreenwaveMonitor::remove_topic(const std::string & topic, std::string & message)
{
{
std::lock_guard<std::mutex> lock(externally_diagnosed_topics_mutex_);
if (externally_diagnosed_topics_.count(topic) > 0) {
message = "Topic is externally monitored";
RCLCPP_WARN(
this->get_logger(),
"Refusing to remove topic '%s': topic is externally monitored",
topic.c_str());
return false;
}
}

auto diag_it = greenwave_diagnostics_.find(topic);
if (diag_it == greenwave_diagnostics_.end()) {
message = "Topic not found";
Expand Down
74 changes: 74 additions & 0 deletions greenwave_monitor/test/test_greenwave_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import time
import unittest

from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus

from greenwave_monitor.test_utils import (
call_manage_topic_service,
collect_diagnostics_for_topic,
Expand Down Expand Up @@ -415,6 +417,78 @@ def test_yaml_parameter_loading(self, expected_frequency, message_type, toleranc
f'Topic {NONEXISTENT_TOPIC} should not be monitored'
)

def test_reject_add_externally_diagnosed_topic(
self, expected_frequency, message_type, tolerance_hz):
"""Test that add_topic() fails when the topic already has external diagnostics."""
if (message_type, expected_frequency, tolerance_hz) != MANAGE_TOPIC_TEST_CONFIG:
self.skipTest('Only running external diagnostics test once')

service_client = self.check_node_launches_successfully()
external_topic = '/external_diag_topic'
diag_pub = self.test_node.create_publisher(DiagnosticArray, '/diagnostics', 10)
try:
end_time = time.time() + 3.0
while time.time() < end_time:
msg = DiagnosticArray()
status = DiagnosticStatus()
status.name = external_topic
status.level = DiagnosticStatus.OK
status.message = 'External publisher'
msg.status = [status]
diag_pub.publish(msg)
rclpy.spin_once(self.test_node, timeout_sec=0.1)

# Greenwave Monitor should refuse to add the topic because an external
# node is already publishing diagnostics for it.
response = self.call_manage_topic(
add=True, topic=external_topic, service_client=service_client)
self.assertFalse(
response.success,
'add_topic() should fail when external diagnostics already exist for the topic'
)
self.assertIn(
'externally monitored', response.message.lower(),
'Error message should mention topic is externally monitored'
)
finally:
self.test_node.destroy_publisher(diag_pub)

def test_reject_remove_externally_diagnosed_topic(
self, expected_frequency, message_type, tolerance_hz):
"""Test that remove_topic() fails when the topic is externally monitored."""
if (message_type, expected_frequency, tolerance_hz) != MANAGE_TOPIC_TEST_CONFIG:
self.skipTest('Only running external diagnostics removal test once')

service_client = self.check_node_launches_successfully()

external_topic = '/external_remove_topic'

diag_pub = self.test_node.create_publisher(DiagnosticArray, '/diagnostics', 10)
try:
end_time = time.time() + 3.0
while time.time() < end_time:
msg = DiagnosticArray()
status = DiagnosticStatus()
status.name = external_topic
status.level = DiagnosticStatus.OK
status.message = 'External publisher'
msg.status = [status]
diag_pub.publish(msg)
rclpy.spin_once(self.test_node, timeout_sec=0.1)

response = self.call_manage_topic(
add=False, topic=external_topic, service_client=service_client)
self.assertFalse(
response.success,
'remove_topic() should fail when topic is externally monitored'
)
self.assertIn(
'externally monitored', response.message.lower(),
'Error message should mention topic is externally monitored'
)
finally:
self.test_node.destroy_publisher(diag_pub)


if __name__ == '__main__':
unittest.main()