diff --git a/tactical-microgrid-standard/CMakeLists.txt b/tactical-microgrid-standard/CMakeLists.txt index 022c070..f1033b4 100644 --- a/tactical-microgrid-standard/CMakeLists.txt +++ b/tactical-microgrid-standard/CMakeLists.txt @@ -63,6 +63,7 @@ target_link_libraries(Controller PRIVATE Commands_Idl PowerSim_Idl) add_executable(CLI cli/main.cpp cli/CLIClient.cpp + cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp ) target_include_directories(CLI PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries(CLI PRIVATE Commands_Idl PowerSim_Idl) diff --git a/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp b/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp new file mode 100644 index 0000000..91fe6dc --- /dev/null +++ b/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp @@ -0,0 +1,23 @@ +#include "ActiveMicrogridControllerStateDataReaderListenerImpl.h" + +void ActiveMicrogridControllerStateDataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) +{ + tms::ActiveMicrogridControllerStateSeq data; + DDS::SampleInfoSeq info_seq; + tms::ActiveMicrogridControllerStateDataReader_var typed_reader = tms::ActiveMicrogridControllerStateDataReader::_narrow(reader); + DDS::ReturnCode_t rc = typed_reader->take(data, info_seq, DDS::LENGTH_UNLIMITED, + DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE); + if (rc != DDS::RETCODE_OK) { + ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: ActiveMicrogridControllerStateDataReaderListenerImpl::on_data_available: " + "take data failed: %C\n", OpenDDS::DCPS::retcode_to_string(rc))); + return; + } + + for (CORBA::ULong i = 0; i < data.length(); ++i) { + if (info_seq[i].valid_data) { + const tms::Identity& device_id = data[i].deviceId(); + auto master_id = data[i].masterId(); + cli_client_.set_active_controller(device_id, master_id); + } + } +} diff --git a/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.h b/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.h new file mode 100644 index 0000000..fc65746 --- /dev/null +++ b/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.h @@ -0,0 +1,21 @@ +#ifndef CLI_ACTIVE_MICROGRID_CONTROLLER_STATE_DATA_READER_LISTENER_IMPL_H +#define CLI_ACTIVE_MICROGRID_CONTROLLER_STATE_DATA_READER_LISTENER_IMPL_H + +#include "common/DataReaderListenerBase.h" +#include "CLIClient.h" + +class ActiveMicrogridControllerStateDataReaderListenerImpl : public DataReaderListenerBase { +public: + explicit ActiveMicrogridControllerStateDataReaderListenerImpl(CLIClient& cli_client) + : DataReaderListenerBase("tms::ActiveMicrogridControllerState - DataReaderListenerImpl") + , cli_client_(cli_client) {} + + virtual ~ActiveMicrogridControllerStateDataReaderListenerImpl() = default; + + void on_data_available(DDS::DataReader_ptr reader) final; + +private: + CLIClient& cli_client_; +}; + +#endif diff --git a/tactical-microgrid-standard/cli/CLIClient.cpp b/tactical-microgrid-standard/cli/CLIClient.cpp index 9a36cbc..628e1c6 100644 --- a/tactical-microgrid-standard/cli/CLIClient.cpp +++ b/tactical-microgrid-standard/cli/CLIClient.cpp @@ -1,6 +1,7 @@ #include "CLIClient.h" #include "common/QosHelper.h" #include "common/Utils.h" +#include "ActiveMicrogridControllerStateDataReaderListenerImpl.h" #include #include @@ -9,6 +10,7 @@ #include #include +#include CLIClient::CLIClient(const tms::Identity& id) : handshaking_(id) @@ -90,6 +92,46 @@ DDS::ReturnCode_t CLIClient::init_tms(DDS::DomainId_t domain_id, int argc, char* return DDS::RETCODE_ERROR; } + // Subscribe to the tms::ActiveMicrogridControllerState topic + tms::ActiveMicrogridControllerStateTypeSupport_var amcs_ts = new tms::ActiveMicrogridControllerStateTypeSupportImpl; + if (DDS::RETCODE_OK != amcs_ts->register_type(dp, "")) { + ACE_ERROR((LM_ERROR, "(%P|%t) CLIClient::init: register_type ActiveMicrogridControllerState failed\n")); + return DDS::RETCODE_ERROR; + } + + CORBA::String_var amcs_type_name = amcs_ts->get_type_name(); + DDS::Topic_var amcs_topic = dp->create_topic(tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str(), + amcs_type_name, + TOPIC_QOS_DEFAULT, + nullptr, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!amcs_topic) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: CLIClient::init: create_topic \"%C\" failed\n", + tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str())); + return DDS::RETCODE_ERROR; + } + + const DDS::SubscriberQos tms_sub_qos = Qos::Subscriber::get_qos(); + DDS::Subscriber_var tms_sub = dp->create_subscriber(tms_sub_qos, + nullptr, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!tms_sub) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: CLIClient::init: create_subscriber with TMS QoS failed\n")); + return DDS::RETCODE_ERROR; + } + + const DDS::DataReaderQos& amcs_dr_qos = Qos::DataReader::fn_map.at(tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE)(device_id); + DDS::DataReaderListener_var amcs_listener(new ActiveMicrogridControllerStateDataReaderListenerImpl(*this)); + DDS::DataReader_var amcs_dr_base = tms_sub->create_datareader(amcs_topic, + amcs_dr_qos, + amcs_listener, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!amcs_dr_base) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: CLIClient::init: create_datareader for topic \"%C\" failed\n", + tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str())); + return DDS::RETCODE_ERROR; + } + return DDS::RETCODE_OK; } @@ -328,6 +370,18 @@ void CLIClient::run() thr.join(); } +void CLIClient::set_active_controller(const tms::Identity& device_id, + const OPENDDS_OPTIONAL_NS::optional& master_id) +{ + std::lock_guard guard(active_controllers_m_); + if (master_id.has_value()) { + active_controllers_[device_id] = master_id.value(); + } else { + // The device has lost its active controller or hasn't selected one yet. + active_controllers_[device_id] = ""; + } +} + void CLIClient::tolower(std::string& s) const { for (size_t i = 0; i < s.size(); ++i) { @@ -415,9 +469,22 @@ void CLIClient::display_power_devices() const std::cout << "Number of Connected Power Devices: " << power_devices_.size() << std::endl; size_t i = 1; for (auto it = power_devices_.begin(); it != power_devices_.end(); ++it) { - std::cout << i << ". Device Id: " << it->first << - ". Type: " << Utils::device_role_to_string(it->second.device_info().role()) << - ". Energy Level: " << energy_level_to_string(it->second.essl()) << std::endl; + std::string selected_controller; + { + std::lock_guard guard(active_controllers_m_); + auto ac_it = active_controllers_.find(it->first); + if (ac_it != active_controllers_.end()) { + selected_controller = "\"" + ac_it->second + "\""; + } else { + selected_controller = "\"Undetermined\""; + } + } + const std::string formated_id = "\"" + it->first + "\""; + std::cout << std::setfill(' ') << std::setw(3) << i++ + << ". Id: " << std::left << std::setw(15) << formated_id + << "| Type: " << std::left << std::setw(18) << Utils::device_role_to_string(it->second.device_info().role()) + << "| Energy Level: " << std::left << std::setw(15) << energy_level_to_string(it->second.essl()) + << "| Active Controller: " << std::left << selected_controller << std::endl; } } diff --git a/tactical-microgrid-standard/cli/CLIClient.h b/tactical-microgrid-standard/cli/CLIClient.h index 337ddba..32f638a 100644 --- a/tactical-microgrid-standard/cli/CLIClient.h +++ b/tactical-microgrid-standard/cli/CLIClient.h @@ -1,5 +1,5 @@ -#ifndef CONTROLLER_CLI_CLIENT_H -#define CONTROLLER_CLI_CLIENT_H +#ifndef CLI_CLI_CLIENT_H +#define CLI_CLI_CLIENT_H #include "common/Handshaking.h" #include "controller/Common.h" @@ -36,8 +36,11 @@ class CLIClient : public TimerHandler { ~CLIClient() {} DDS::ReturnCode_t init(DDS::DomainId_t domain_id, int argc = 0, char* argv[] = nullptr); + void run(); + void set_active_controller(const tms::Identity& device_id, const OPENDDS_OPTIONAL_NS::optional& master_id); + private: // Initialize DDS entities in the TMS domain DDS::ReturnCode_t init_tms(DDS::DomainId_t tms_domain_id, int argc = 0, char* argv[] = nullptr); @@ -127,6 +130,12 @@ class CLIClient : public TimerHandler { // The current microgrid controller with which the CLI client is interacting tms::Identity curr_controller_; + + mutable std::mutex active_controllers_m_; + + // Active controller selected by each power device (power device => its controller). + // Can be used to check that all power devices will eventually select the same active controller. + std::map active_controllers_; }; #endif diff --git a/tactical-microgrid-standard/common/ControllerSelector.cpp b/tactical-microgrid-standard/common/ControllerSelector.cpp index dbab885..9e35a1c 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.cpp +++ b/tactical-microgrid-standard/common/ControllerSelector.cpp @@ -1,5 +1,36 @@ #include "ControllerSelector.h" +#include +#include +#include + +// If caller passes a non-null reactor, use it unmodified. +// Otherwise, create another reactor for this class separated from the one for Handshaking. +ControllerSelector::ControllerSelector(const tms::Identity& device_id, ACE_Reactor* reactor) + : TimerHandler(reactor) + , device_id_(device_id) +{ + if (!reactor) { + reactor_ = new ACE_Reactor; + + // We had an issue with using ACE_Reactor's default timer queue, which is + // ACE_Timer_Heap, when the rate of timer creation and cancellation is high + // for detecting missed heartbeat deadline from microgrid controllers. + // ACE_Timer_Hash seems working okay. + timer_queue_ = new ACE_Timer_Hash; + reactor_->timer_queue(timer_queue_); + own_reactor_ = true; + } +} + +ControllerSelector::~ControllerSelector() +{ + if (own_reactor_) { + delete timer_queue_; + delete reactor_; + } +} + void ControllerSelector::got_heartbeat(const tms::Heartbeat& hb) { Guard g(lock_); @@ -10,15 +41,15 @@ void ControllerSelector::got_heartbeat(const tms::Heartbeat& hb) if (selected_.empty()) { if (!this->get_timer()->active()) { - schedule_once(NewController{hb.deviceId()}, new_controller_delay); + schedule_once(NewController{hb.deviceId()}, new_active_controller_delay); } } else if (is_selected(hb.deviceId())) { cancel(); - if (this->get_timer()->active()) { - reschedule(); + if (this->get_timer()->active()) { + reschedule(); } else { - // MissedController was triggered, so we need to schedule it again. - schedule_once(MissedController{}, missed_controller_delay); + // MissedHeartbeat was triggered, so we need to schedule it again. + schedule_once(MissedHeartbeat{}, heartbeat_deadline); } } } @@ -39,19 +70,49 @@ void ControllerSelector::got_device_info(const tms::DeviceInfo& di) void ControllerSelector::timer_fired(Timer& timer) { Guard g(lock_); - const auto& id = timer.arg.id; + const auto& mc_id = timer.arg.id; ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::timed_event(NewController): " - "\"%C\" -> \"%C\"\n", selected_.c_str(), id.c_str())); - select(id); + "\"%C\" -> \"%C\"\n", selected_.c_str(), mc_id.c_str())); + + // The TMS spec isn't clear to whether the device needs to verify that the last + // heartbeat of this controller was received less than 3s (i.e., heartbeat deadline) ago. + // This check makes sense since if its last heartbeat was more than 3s ago, that means + // the controller is not available and should not be selected as the active controller. + const TimePoint now = Clock::now(); + auto it = all_controllers_.find(mc_id); + if (it == all_controllers_.end()) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ControllerSelector::timed_event(NewController): Controller \"%C\" not found!\n", + mc_id.c_str())); + return; + } + + if (now - it->second < heartbeat_deadline) { + selected_ = mc_id; + send_controller_state(); + } } -void ControllerSelector::timer_fired(Timer&) +void ControllerSelector::timer_fired(Timer& timer) { Guard g(lock_); - ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::timed_event(MissedController): " - "\"%C\"\n", selected_.c_str())); - schedule_once(LostController{}, lost_controller_delay); - schedule_once(NoControllers{}, no_controllers_delay); + const auto& timer_id = timer.id; + ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::timed_event(MissedHeartbeat): " + "\"%C\". Timer id: %d\n", selected_.c_str(), timer_id)); + schedule_once(LostController{}, lost_active_controller_delay); + + // Start a No MC timer if the device has missed heartbeats from all MCs + const TimePoint now = Clock::now(); + bool no_avail_mc = true; + for (const auto& pair : all_controllers_) { + if (now - pair.second < heartbeat_deadline) { + no_avail_mc = false; + break; + } + } + + if (no_avail_mc) { + schedule_once(NoControllers{}, no_controllers_delay); + } } void ControllerSelector::timer_fired(Timer&) @@ -66,7 +127,7 @@ void ControllerSelector::timer_fired(Timer&) const TimePoint now = Clock::now(); for (auto it = all_controllers_.begin(); it != all_controllers_.end(); ++it) { const auto last_hb = now - it->second; - if (last_hb < missed_controller_delay) { + if (last_hb < heartbeat_deadline) { select(it->first, std::chrono::duration_cast(last_hb)); break; } @@ -84,6 +145,20 @@ void ControllerSelector::select(const tms::Identity& id, Sec last_hb) { ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::select: \"%C\"\n", id.c_str())); selected_ = id; - schedule_once(MissedController{}, missed_controller_delay - last_hb); - // TODO: Send ActiveMicrogridControllerState + send_controller_state(); + schedule_once(MissedHeartbeat{}, heartbeat_deadline - last_hb); +} + +void ControllerSelector::send_controller_state() +{ + tms::ActiveMicrogridControllerState amcs; + amcs.deviceId() = device_id_; + if (!selected_.empty()) { + amcs.masterId() = selected_; + } + + const DDS::ReturnCode_t rc = amcs_dw_->write(amcs, DDS::HANDLE_NIL); + if (rc != DDS::RETCODE_OK) { + ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: ControllerSelector::send_controller_state: write ActiveMicrogridControllerState failed\n")); + } } diff --git a/tactical-microgrid-standard/common/ControllerSelector.h b/tactical-microgrid-standard/common/ControllerSelector.h index 362d25e..d21b747 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.h +++ b/tactical-microgrid-standard/common/ControllerSelector.h @@ -11,7 +11,7 @@ struct NewController { tms::Identity id; }; -struct MissedController {}; +struct MissedHeartbeat {}; struct LostController {}; struct NoControllers {}; @@ -30,7 +30,7 @@ class PowerDevice; * | | [S] | * | [6s] | | * | | V V - * +-[A,R]->MissedController<-[3s]-select()->{ActiveMicrogridControllerState} + * +-[A,R]->MissedHeartbeat<-[3s]-select()->{ActiveMicrogridControllerState} * | | * | [10s] * | | @@ -45,8 +45,11 @@ class PowerDevice; * S: If there's a selectable controller with a recent heartbeat */ class OpenDDS_TMS_Export ControllerSelector : - public TimerHandler { + public TimerHandler { public: + explicit ControllerSelector(const tms::Identity& device_id, ACE_Reactor* reactor = nullptr); + ~ControllerSelector(); + void got_heartbeat(const tms::Heartbeat& hb); void got_device_info(const tms::DeviceInfo& di); @@ -62,14 +65,29 @@ class OpenDDS_TMS_Export ControllerSelector : return selected_ == id; } + ACE_Reactor* get_reactor() const + { + Guard g(lock_); + return reactor_; + } + + void set_ActiveMicrogridControllerState_writer(tms::ActiveMicrogridControllerStateDataWriter_var amcs_dw) + { + amcs_dw_ = amcs_dw; + } + private: - static constexpr Sec new_controller_delay = Sec(3); - static constexpr Sec missed_controller_delay = Sec(3); - static constexpr Sec lost_controller_delay = Sec(6); + // Allow using non-default timer queue + ACE_Timer_Queue* timer_queue_ = nullptr; + bool own_reactor_ = false; + + static constexpr Sec heartbeat_deadline = Sec(3); + static constexpr Sec new_active_controller_delay = Sec(3); + static constexpr Sec lost_active_controller_delay = Sec(6); static constexpr Sec no_controllers_delay = Sec(10); void timer_fired(Timer& timer); - void timer_fired(Timer&); + void timer_fired(Timer&); void timer_fired(Timer&); void timer_fired(Timer&); void any_timer_fired(AnyTimer timer) @@ -79,8 +97,15 @@ class OpenDDS_TMS_Export ControllerSelector : void select(const tms::Identity& id, Sec last_hb = Sec(0)); + void send_controller_state(); + tms::Identity selected_; std::map all_controllers_; + + // Device ID to which this controller selector belong. + tms::Identity device_id_; + + tms::ActiveMicrogridControllerStateDataWriter_var amcs_dw_; }; #endif diff --git a/tactical-microgrid-standard/common/QosHelper.cpp b/tactical-microgrid-standard/common/QosHelper.cpp index f3a2d81..32bf7cf 100644 --- a/tactical-microgrid-standard/common/QosHelper.cpp +++ b/tactical-microgrid-standard/common/QosHelper.cpp @@ -18,7 +18,8 @@ const FnMap fn_map = { {tms::topic::TOPIC_DEVICE_INFO, get_PublishLast}, {tms::topic::TOPIC_OPERATOR_INTENT_REQUEST, get_Command}, {tms::topic::TOPIC_ENERGY_START_STOP_REQUEST, get_Command}, - {tms::topic::TOPIC_REPLY, get_Reply}}; + {tms::topic::TOPIC_REPLY, get_Reply}, + {tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE, get_PublishLast} }; } namespace DataWriter { @@ -28,7 +29,8 @@ const FnMap fn_map = { {tms::topic::TOPIC_DEVICE_INFO, get_PublishLast}, {tms::topic::TOPIC_OPERATOR_INTENT_REQUEST, get_Command}, {tms::topic::TOPIC_ENERGY_START_STOP_REQUEST, get_Command}, - {tms::topic::TOPIC_REPLY, get_Reply}}; + {tms::topic::TOPIC_REPLY, get_Reply}, + {tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE, get_PublishLast} }; } } diff --git a/tactical-microgrid-standard/common/TimerHandler.h b/tactical-microgrid-standard/common/TimerHandler.h index 2b3bbec..746bed0 100644 --- a/tactical-microgrid-standard/common/TimerHandler.h +++ b/tactical-microgrid-standard/common/TimerHandler.h @@ -13,6 +13,7 @@ #include #include #include +#include using Sec = std::chrono::seconds; using Clock = std::chrono::system_clock; @@ -185,7 +186,12 @@ class TimerHandler : public ACE_Event_Handler, protected TimerHolder int handle_timeout(const ACE_Time_Value&, const void* arg) { Guard g(lock_); - auto timer = active_timers_[*reinterpret_cast(arg)]; + auto timer_id = *reinterpret_cast(arg); + if (active_timers_.count(timer_id) == 0) { + ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: TimerHandler::handle_timeout: timer id %d does NOT exist\n", + timer_id)); + } + auto timer = active_timers_[timer_id]; any_timer_fired(timer); bool exit_after = false; std::visit([&](auto&& value) { @@ -200,7 +206,7 @@ class TimerHandler : public ACE_Event_Handler, protected TimerHolder protected: mutable Mutex lock_; - ACE_Reactor* const reactor_; + ACE_Reactor* reactor_; int end_event_loop(bool yes = true) { diff --git a/tactical-microgrid-standard/power_devices/PowerDevice.cpp b/tactical-microgrid-standard/power_devices/PowerDevice.cpp index 2f16c8d..74f0e14 100644 --- a/tactical-microgrid-standard/power_devices/PowerDevice.cpp +++ b/tactical-microgrid-standard/power_devices/PowerDevice.cpp @@ -1,6 +1,7 @@ #include "PowerDevice.h" #include "PowerConnectionDataReaderListenerImpl.h" #include "common/Utils.h" +#include "common/QosHelper.h" #include @@ -23,6 +24,56 @@ DDS::ReturnCode_t PowerDevice::init(DDS::DomainId_t domain, int argc, char* argv return rc; } + DDS::DomainParticipant_var dp = get_domain_participant(); + + // Publish to the tms::ActiveMicrogridControllerState topic + tms::ActiveMicrogridControllerStateTypeSupport_var amcs_ts = new tms::ActiveMicrogridControllerStateTypeSupportImpl(); + rc = amcs_ts->register_type(participant_, ""); + if (DDS::RETCODE_OK != rc) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: register_type for ActiveMicrogridControllerState failed\n")); + return rc; + } + + CORBA::String_var amcs_type_name = amcs_ts->get_type_name(); + DDS::Topic_var amcs_topic = participant_->create_topic(tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str(), + amcs_type_name, + TOPIC_QOS_DEFAULT, + nullptr, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!amcs_topic) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: create topic '%C' failed\n", + tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str())); + return DDS::RETCODE_ERROR; + } + + const DDS::PublisherQos tms_pub_qos = Qos::Publisher::get_qos(); + DDS::Publisher_var tms_pub = dp->create_publisher(tms_pub_qos, + nullptr, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!tms_pub) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: create_publisher with TMS QoS failed\n")); + return DDS::RETCODE_ERROR; + } + + const DDS::DataWriterQos& amcs_dw_qos = Qos::DataWriter::fn_map.at(tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE)(device_id_); + DDS::DataWriter_var amcs_dw_base = tms_pub->create_datawriter(amcs_topic, + amcs_dw_qos, + nullptr, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!amcs_dw_base) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: create_datawriter for topic \"%C\" failed\n", + tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str())); + return DDS::RETCODE_ERROR; + } + + tms::ActiveMicrogridControllerStateDataWriter_var amcs_dw = tms::ActiveMicrogridControllerStateDataWriter::_narrow(amcs_dw_base); + if (!amcs_dw) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: ActiveMicrogridControllerStateDataWriter narrow failed\n")); + return DDS::RETCODE_ERROR; + } + + controller_selector_.set_ActiveMicrogridControllerState_writer(amcs_dw); + // Subscribe to the PowerConnection topic const DDS::DomainId_t sim_domain_id = Utils::get_sim_domain_id(domain); sim_participant_ = get_participant_factory()->create_participant(sim_domain_id, diff --git a/tactical-microgrid-standard/power_devices/PowerDevice.h b/tactical-microgrid-standard/power_devices/PowerDevice.h index 59bb258..aa8b79e 100644 --- a/tactical-microgrid-standard/power_devices/PowerDevice.h +++ b/tactical-microgrid-standard/power_devices/PowerDevice.h @@ -7,12 +7,14 @@ #include "PowerSim_Idl_export.h" #include +#include class PowerSim_Idl_Export PowerDevice : public Handshaking { public: explicit PowerDevice(const tms::Identity& id, tms::DeviceRole role = tms::DeviceRole::ROLE_SOURCE, bool verbose = false) : Handshaking(id) , verbose_(verbose) + , controller_selector_(id) , role_(role) { } @@ -26,7 +28,7 @@ class PowerSim_Idl_Export PowerDevice : public Handshaking { virtual int run() { - return reactor_->run_reactor_event_loop() == 0 ? 0 : 1; + return run_i(); } powersim::ConnectedDeviceSeq connected_devices_in() const @@ -53,6 +55,19 @@ class PowerSim_Idl_Export PowerDevice : public Handshaking { } protected: + virtual int run_i() + { + if (controller_selector_.reactor() == reactor_) { + // Same reactor instance for both handshaking and controller selection + return reactor_->run_reactor_event_loop() == 0 ? 0 : 1; + } + + std::thread handshaking_thr([&] { reactor_->run_reactor_event_loop(); }); + const int ret = controller_selector_.get_reactor()->run_reactor_event_loop() == 0 ? 0 : 1; + handshaking_thr.join(); + return ret; + } + // Concrete power device should override this function depending on their role. virtual tms::DeviceInfo populate_device_info() const; diff --git a/tactical-microgrid-standard/power_devices/Source.cpp b/tactical-microgrid-standard/power_devices/Source.cpp index 79eb55c..97ca710 100644 --- a/tactical-microgrid-standard/power_devices/Source.cpp +++ b/tactical-microgrid-standard/power_devices/Source.cpp @@ -241,12 +241,10 @@ class SourceDevice : public PowerDevice { int run() override { - std::thread thr(&SourceDevice::simulate_power_flow, this); - if (reactor_->run_reactor_event_loop() != 0) { - return 1; - } - thr.join(); - return 0; + std::thread sim_thr(&SourceDevice::simulate_power_flow, this); + const int ret = run_i(); + sim_thr.join(); + return ret; } tms::ReplyDataWriter_var reply_dw() const