Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ target_sources(dsr_api
dsr_inner_eigen_api.cpp
dsr_rt_api.cpp
dsr_utils.cpp
dsr_signal_emitter.cpp
GHistorySaver.cpp
${GEOM_API_SOURCES}
${headers_to_moc}
Expand Down
95 changes: 52 additions & 43 deletions api/dsr_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "dsr/api/dsr_graph_settings.h"
#include "dsr/core/topics/IDLGraph.hpp"
#include "dsr/core/types/translator.h"
#include "include/dsr/api/dsr_signal_emitter.h"
#include <chrono>
#include <ctime>
#include <dsr/api/dsr_api.h>
Expand Down Expand Up @@ -44,7 +45,11 @@ DSRGraph::DSRGraph(GraphSettings settings) :

qDebug() << "Agent name: " << QString::fromStdString(agent_name);
utils = std::make_unique<Utilities>(this);

if (settings.signal_mode == SignalMode::QT) {
set_qt_signals();
} else {
set_queued_signals();
}
// RTPS Create participant
auto[suc, participant_handle] = dsrparticipant.init(agent_id, agent_name, settings.same_host,
ParticipantChangeFunctor(this, [&](DSR::DSRGraph *graph,
Expand Down Expand Up @@ -122,8 +127,8 @@ DSRGraph::DSRGraph(GraphSettings settings) :
qDebug() << __FUNCTION__ << "Constructor finished OK";
}

DSRGraph::DSRGraph(std::string name, uint32_t id, const std::string &dsr_input_file, bool all_same_host, int8_t domain_id)
: DSR::DSRGraph(GraphSettings {id, 5, 1, name, dsr_input_file, "", all_same_host, GraphSettings::LOGLEVEL::INFOL, domain_id})
DSRGraph::DSRGraph(std::string name, uint32_t id, const std::string &dsr_input_file, bool all_same_host, int8_t domain_id, SignalMode mode)
: DSR::DSRGraph(GraphSettings {id, 5, 1, name, dsr_input_file, "", all_same_host, GraphSettings::LOGLEVEL::INFOL, domain_id, mode})
{}


Expand Down Expand Up @@ -202,10 +207,10 @@ std::optional<uint64_t> DSRGraph::insert_node(No &&node)
if (delta.has_value())
{
dsrpub_node.write(&delta.value());
emit update_node_signal(node.id(), node.type(), SignalInfo{agent_id});
emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id});
for (const auto &[k, v]: node.fano())
{
emit update_edge_signal(node.id(), k.first, k.second, SignalInfo{agent_id});
emitter.update_edge_signal(node.id(), k.first, k.second, SignalInfo{agent_id});
}
}
}
Expand Down Expand Up @@ -291,13 +296,13 @@ requires (std::is_same_v<std::remove_cvref_t<No>, DSR::Node>)
if (!copy) {
if (vec_node_attr.has_value()) {
dsrpub_node_attrs.write(&vec_node_attr.value());
emit update_node_signal(node.id(), node.type(), SignalInfo{agent_id});
emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id});
std::vector<std::string> atts_names(vec_node_attr->size());
std::transform(std::make_move_iterator(vec_node_attr->begin()),
std::make_move_iterator(vec_node_attr->end()),
atts_names.begin(),
[](auto &&x) { return x.attr_name(); });
emit update_node_attr_signal(node.id(), atts_names, SignalInfo{agent_id});
emitter.update_node_attr_signal(node.id(), atts_names, SignalInfo{agent_id});

}
}
Expand Down Expand Up @@ -383,16 +388,16 @@ bool DSRGraph::delete_node(const std::string &name)

if (result) {
if (!copy) {
emit del_node_signal(id.value(), SignalInfo{agent_id});
if (node_signal) emit deleted_node_signal(*node_signal, SignalInfo{agent_id});
emitter.del_node_signal(id.value(), SignalInfo{agent_id});
if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id});
dsrpub_node.write(&deleted_node.value());

for (auto &a : delta_vec) {
dsrpub_edge.write(&a);
}
for (auto &edge : deleted_edges) {
emit del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id });
emit deleted_edge_signal(edge);
emitter.del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id });
emitter.deleted_edge_signal(edge, SignalInfo{ agent_id });
}
}
return true;
Expand All @@ -416,16 +421,16 @@ bool DSRGraph::delete_node(uint64_t id)

if (result) {
if (!copy) {
emit del_node_signal(id, SignalInfo{ agent_id });
if (node_signal) emit deleted_node_signal(*node_signal, SignalInfo{agent_id});
emitter.del_node_signal(id, SignalInfo{ agent_id });
if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id});
dsrpub_node.write(&deleted_node.value());

for (auto &a : delta_vec) {
dsrpub_edge.write(&a);
}
for (auto &edge : deleted_edges) {
emit del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id });
emit deleted_edge_signal(edge);
emitter.del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id });
emitter.deleted_edge_signal(edge, SignalInfo{ agent_id });
}
}
return true;
Expand Down Expand Up @@ -632,7 +637,7 @@ requires (std::is_same_v<std::remove_cvref_t<Ed>, DSR::Edge>)
}
if (result) {
if (!copy) {
emit update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id });
emitter.update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id });

if (delta_edge.has_value()) { //Insert
dsrpub_edge.write(&delta_edge.value());
Expand All @@ -645,7 +650,7 @@ requires (std::is_same_v<std::remove_cvref_t<Ed>, DSR::Edge>)
atts_names.begin(),
[](auto &&x) { return x.attr_name(); });

emit update_edge_attr_signal(attrs.from(), attrs.to(), attrs.type(), atts_names, SignalInfo{ agent_id });
emitter.update_edge_attr_signal(attrs.from(), attrs.to(), attrs.type(), atts_names, SignalInfo{ agent_id });

}
}
Expand Down Expand Up @@ -687,9 +692,9 @@ bool DSRGraph::delete_edge(uint64_t from, uint64_t to, const std::string &key)
if (delta.has_value())
{
if (!copy) {
emit del_edge_signal(from, to, key, SignalInfo{ agent_id });
emitter.del_edge_signal(from, to, key, SignalInfo{ agent_id });
if (deleted_edge.has_value()) {
emit deleted_edge_signal(*deleted_edge);
emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id });
}
dsrpub_edge.write(&delta.value());
}
Expand Down Expand Up @@ -718,9 +723,9 @@ bool DSRGraph::delete_edge(const std::string &from, const std::string &to, const
if (delta.has_value())
{
if (!copy) {
emit del_edge_signal(id_from.value(), id_to.value(), key, SignalInfo{ agent_id });
emitter.del_edge_signal(id_from.value(), id_to.value(), key, SignalInfo{ agent_id });
if (deleted_edge.has_value()) {
emit deleted_edge_signal(*deleted_edge);
emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id });
}
dsrpub_edge.write(&delta.value());
}
Expand Down Expand Up @@ -1069,34 +1074,36 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg)

if (joined) {
if (signal) {
emit update_node_signal(id, nodes.at(id).read_reg().type(), SignalInfo{ mvreg.agent_id() });
emitter.update_node_signal(id, nodes.at(id).read_reg().type(), SignalInfo{ mvreg.agent_id() });
for (const auto &[k, v] : nodes.at(id).read_reg().fano()) {
//std::cout << "[JOIN NODE] add edge FROM: "<< id << ", " << k.first << ", " << k.second << std::endl;
emit update_edge_signal(id, k.first, k.second, SignalInfo{ mvreg.agent_id() });
emitter.update_edge_signal(id, k.first, k.second, SignalInfo{ mvreg.agent_id() });
}

for (const auto &[k, v]: map_new_to_edges)
{
//std::cout << "[JOIN NODE] add edge TO: "<< k << ", " << id << ", " << v << std::endl;
emit update_edge_signal(k, id, v, SignalInfo{ mvreg.agent_id() });
emitter.update_edge_signal(k, id, v, SignalInfo{ mvreg.agent_id() });
}
} else {
emit del_node_signal(id, SignalInfo{ mvreg.agent_id() });
emitter.del_node_signal(id, SignalInfo{ mvreg.agent_id() });
if (maybe_deleted_node.has_value()) {
emit deleted_node_signal(Node(*maybe_deleted_node));
Node tmp_node(*maybe_deleted_node);
emitter.deleted_node_signal(tmp_node, SignalInfo{ agent_id });
for (const auto &node: maybe_deleted_node->fano()) {
//std::cout << "[JOIN NODE] delete edge FROM: "<< node.second.read_reg().from() << ", " << node.second.read_reg().to() << ", " << node.second.read_reg().type() << std::endl;
emit del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(),
emitter.del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(),
node.second.read_reg().type(), SignalInfo{ mvreg.agent_id() });
emit deleted_edge_signal(Edge(node.second.read_reg()));
Edge tmp_edge(node.second.read_reg());
emitter.deleted_edge_signal(tmp_edge, SignalInfo{ agent_id });
}
}

//TODO: deleted_edge_signal. update_maps_node_delete was called before so the maps are probably wrong...
for (const auto &[from, type] : cache_map_to_edges.value()) {
//std::cout << "[JOIN NODE] delete edge TO: "<< from << ", " << id << ", " << type << std::endl;
emit del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() });
//emit deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this
emitter.del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() });
//emitter.deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this
}

}
Expand Down Expand Up @@ -1238,12 +1245,12 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg)
if (joined) {
if (signal) {
//std::cout << "[JOIN EDGE] add edge: "<< from << ", " << to << ", " << type << std::endl;
emit update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() });
emitter.update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() });
} else {
//std::cout << "[JOIN EDGE] delete edge: "<< from << ", " << to << ", " << type << std::endl;
emit del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() });
emitter.del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() });
if (deleted_edge.has_value()) {
emit deleted_edge_signal(*deleted_edge);
emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id });
}
}
}
Expand Down Expand Up @@ -1491,26 +1498,28 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph)
if (signal) {
//check what change is joined
if (!nd.has_value() || nd->attrs() != nodes[id].read_reg().attrs()) {
emit update_node_signal(id, nodes[id].read_reg().type(), SignalInfo{ agent_id_ch });
emitter.update_node_signal(id, nodes[id].read_reg().type(), SignalInfo{ agent_id_ch });
} else if (nd.value() != nodes[id].read_reg()) {
auto iter = nodes[id].read_reg().fano();
for (const auto &[k, v] : nd->fano()) {
if (!iter.contains(k)) {
emit del_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch });
emitter.del_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch });
if (v.dk.ds.size() > 0) {
emit deleted_edge_signal(Edge(v.read_reg()));
Edge tmp_edge(v.read_reg());
emitter.deleted_edge_signal(tmp_edge, SignalInfo{ agent_id });
}
}
}
for (const auto &[k, v] : iter) {
if (auto it = nd->fano().find(k); it == nd->fano().end() or it->second != v)
emit update_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch });
emitter.update_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch });
}
}
} else {
emit del_node_signal(id, SignalInfo{ agent_id_ch });
emitter.del_node_signal(id, SignalInfo{ agent_id_ch });
if (nd.has_value()) {
emit deleted_node_signal(Node(*nd));
Node tmp_node(*nd);
emitter.deleted_node_signal(tmp_node, SignalInfo{ agent_id_ch });
}
}

Expand Down Expand Up @@ -1689,8 +1698,8 @@ void DSRGraph::edge_attrs_subscription_thread()
}


emit update_edge_attr_signal(from, to, type, sig, SignalInfo{samples.vec().at(0).agent_id()});
emit update_edge_signal(from, to, type, SignalInfo{samples.vec().at(0).agent_id()});
emitter.update_edge_attr_signal(from, to, type, sig, SignalInfo{samples.vec().at(0).agent_id()});
emitter.update_edge_signal(from, to, type, SignalInfo{samples.vec().at(0).agent_id()});

});
}
Expand Down Expand Up @@ -1759,8 +1768,8 @@ void DSRGraph::node_attrs_subscription_thread()
sig.emplace_back(std::move(opt_str.value()));
}

emit update_node_attr_signal(id, sig, SignalInfo{samples.vec().at(0).agent_id()});
emit update_node_signal(id, type, SignalInfo{samples.vec().at(0).agent_id()});
emitter.update_node_attr_signal(id, sig, SignalInfo{samples.vec().at(0).agent_id()});
emitter.update_node_signal(id, type, SignalInfo{samples.vec().at(0).agent_id()});
});
}
}
Expand Down
16 changes: 8 additions & 8 deletions api/dsr_rt_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,12 @@ void RT_API::insert_or_assign_edge_RT(Node &n, uint64_t to, const std::vector<fl

if (!no_send and node2.has_value()) G->dsrpub_node_attrs.write(&node2.value());

emit G->update_edge_attr_signal(n.id(), to, "RT" ,{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id });
emit G->update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id });
G->emitter.update_edge_attr_signal(n.id(), to, "RT" ,{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id });
G->emitter.update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id });
if (!no_send)
{
emit G->update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id });
emit G->update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id });
G->emitter.update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id });
G->emitter.update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id });
}
}
}
Expand Down Expand Up @@ -528,12 +528,12 @@ void RT_API::insert_or_assign_edge_RT(Node &n, uint64_t to, std::vector<float> &

if (!no_send and node2.has_value()) G->dsrpub_node_attrs.write(&node2.value());

emit G->update_edge_attr_signal(n.id(), to, "RT",{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id });
emit G->update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id });
G->emitter.update_edge_attr_signal(n.id(), to, "RT",{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id });
G->emitter.update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id });
if (!no_send)
{
emit G->update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id });
emit G->update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id });
G->emitter.update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id });
G->emitter.update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id });
}
}
}
86 changes: 86 additions & 0 deletions api/dsr_signal_emitter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#include <dsr/api/dsr_api.h>
#include <dsr/api/dsr_signal_emitter.h>
#include <dsr/api/dsr_signal_info.h>

void DSR::QueuedSignalRunner::run_update_node_signal(std::uint64_t a,
const std::string &b,
SignalInfo c) {

tp.spawn_task([=, this] {
for (auto fn : uns_fns) {
if (fn)
fn(a, b);
}
});
}
void DSR::QueuedSignalRunner::run_update_node_attr_signal(
std::uint64_t a, const std::vector<std::string> &b, SignalInfo c) {
tp.spawn_task([=, this] {
for (auto fn : unas_fns) {
if (fn)
fn(a, b);
}
});
}
void DSR::QueuedSignalRunner::run_update_edge_signal(std::uint64_t a,
std::uint64_t b,
const std::string &c,
SignalInfo d) {
tp.spawn_task([=, this] {
for (auto fn : ues_fns) {
if (fn)
fn(a, b, c);
}
});
}

void DSR::QueuedSignalRunner::run_update_edge_attr_signal(
std::uint64_t a, std::uint64_t b, const std::string &c,
const std::vector<std::string> &d, SignalInfo e) {
tp.spawn_task([=, this] {
for (auto fn : ueas_fns) {
if (fn)
fn(a, b, c, d);
}
});
}

void DSR::QueuedSignalRunner::run_del_edge_signal(std::uint64_t a,
std::uint64_t b,
const std::string &c,
SignalInfo d) {
tp.spawn_task([=, this] {
for (auto fn : des_fns) {
if (fn)
fn(a, b, c);
}
});
}
void DSR::QueuedSignalRunner::run_del_node_signal(std::uint64_t a,
SignalInfo b) {
tp.spawn_task([=, this] {
for (auto fn : den_fns) {
if (fn)
fn(a);
}
});
}

void DSR::QueuedSignalRunner::run_deleted_node_signal(const Node &a,
SignalInfo b) {
tp.spawn_task([=, this] {
for (auto fn : dn_fns) {
if (fn)
fn(a);
}
});
}
void DSR::QueuedSignalRunner::run_deleted_edge_signal(const Edge &a,
SignalInfo b) {
tp.spawn_task([=, this] {
for (auto fn : de_fns) {
if (fn)
fn(a);
}
});
}
Loading
Loading