diff --git a/rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h b/rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h index 08b95c08..33b7ed67 100644 --- a/rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h +++ b/rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h @@ -47,6 +47,15 @@ JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetInit( JNIEXPORT void JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeDisposeWaitSet(JNIEnv *, jclass, jlong); +/* + * Class: org_ros2_rcljava_executors_BaseExecutor + * Method: nativeWaitSetResize + * Signature: (JIIIIII)V + */ +JNIEXPORT void +JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetResize( + JNIEnv *, jclass, jlong, jint, jint, jint, jint, jint, jint); + /* * Class: org_ros2_rcljava_executors_BaseExecutor * Method: nativeWaitSetClear diff --git a/rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp b/rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp index 7cbdb6cc..ee6c1ef0 100644 --- a/rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp +++ b/rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp @@ -80,6 +80,24 @@ Java_org_ros2_rcljava_executors_BaseExecutor_nativeDisposeWaitSet( } } +JNIEXPORT void +JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetResize( + JNIEnv * env, jclass, jlong wait_set_handle, jint number_of_subscriptions, + jint number_of_guard_conditions, jint number_of_timers, jint number_of_clients, + jint number_of_services, jint number_of_events) +{ + rcl_wait_set_t * wait_set = reinterpret_cast(wait_set_handle); + + rcl_ret_t ret = rcl_wait_set_resize( + wait_set, number_of_subscriptions, number_of_guard_conditions, number_of_timers, + number_of_clients, number_of_services, number_of_events); + if (ret != RCL_RET_OK) { + std::string msg = "Failed to resize wait set: " + std::string(rcl_get_error_string().str); + rcl_reset_error(); + rcljava_throw_rclexception(env, ret, msg); + } +} + JNIEXPORT void JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetClear( JNIEnv * env, jclass, jlong wait_set_handle) diff --git a/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java index e500f9ac..f1552b2f 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java @@ -28,6 +28,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import javax.swing.Action; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +77,20 @@ public class BaseExecutor { private List> actionServerHandles = new ArrayList>(); + private long waitSetHandle = 0; + + public BaseExecutor() { + this.waitSetHandle = nativeGetZeroInitializedWaitSet(); + long contextHandle = RCLJava.getDefaultContext().getHandle(); + nativeWaitSetInit( + this.waitSetHandle, contextHandle, 0, 0, + 0, 0, 0, 0); + } + + public void dispose() { + nativeDisposeWaitSet(this.waitSetHandle); + } + protected void addNode(ComposableNode node) { this.nodes.add(node); } @@ -225,38 +240,28 @@ protected void waitForWork(long timeout) { } } - int subscriptionsSize = 0; - int timersSize = 0; - int clientsSize = 0; - int servicesSize = 0; + int subscriptionsSize = this.subscriptionHandles.size(); + int timersSize = this.timerHandles.size(); + int clientsSize = this.clientHandles.size(); + int servicesSize = this.serviceHandles.size(); int eventsSize = this.eventHandles.size(); - for (ComposableNode node : this.nodes) { - subscriptionsSize += node.getNode().getSubscriptions().size(); - timersSize += node.getNode().getTimers().size(); - clientsSize += node.getNode().getClients().size(); - servicesSize += node.getNode().getServices().size(); - - for (ActionServer actionServer : node.getNode().getActionServers()) { - subscriptionsSize += actionServer.getNumberOfSubscriptions(); - timersSize += actionServer.getNumberOfTimers(); - clientsSize += actionServer.getNumberOfClients(); - servicesSize += actionServer.getNumberOfServices(); - } + for (Map.Entry entry : this.actionServerHandles) { + ActionServer actionServer = entry.getValue(); + subscriptionsSize += actionServer.getNumberOfSubscriptions(); + timersSize += actionServer.getNumberOfTimers(); + clientsSize += actionServer.getNumberOfClients(); + servicesSize += actionServer.getNumberOfServices(); } if (subscriptionsSize == 0 && timersSize == 0 && clientsSize == 0 && servicesSize == 0) { return; } - - long waitSetHandle = nativeGetZeroInitializedWaitSet(); - long contextHandle = RCLJava.getDefaultContext().getHandle(); - nativeWaitSetInit( - waitSetHandle, contextHandle, subscriptionsSize, 0, + long waitSetHandle = this.waitSetHandle; + nativeWaitSetResize( + waitSetHandle, subscriptionsSize, 0, timersSize, clientsSize, servicesSize, eventsSize); - nativeWaitSetClear(waitSetHandle); - for (Map.Entry entry : this.subscriptionHandles) { nativeWaitSetAddSubscription(waitSetHandle, entry.getKey()); } @@ -282,93 +287,70 @@ protected void waitForWork(long timeout) { } nativeWait(waitSetHandle, timeout); - - for (int i = 0; i < this.subscriptionHandles.size(); ++i) { - if (!nativeWaitSetSubscriptionIsReady(waitSetHandle, i)) { - this.subscriptionHandles.get(i).setValue(null); - } - } - - for (int i = 0; i < this.timerHandles.size(); ++i) { - if (!nativeWaitSetTimerIsReady(waitSetHandle, i)) { - this.timerHandles.get(i).setValue(null); - } - } - - for (int i = 0; i < this.serviceHandles.size(); ++i) { - if (!nativeWaitSetServiceIsReady(waitSetHandle, i)) { - this.serviceHandles.get(i).setValue(null); - } - } - - for (int i = 0; i < this.clientHandles.size(); ++i) { - if (!nativeWaitSetClientIsReady(waitSetHandle, i)) { - this.clientHandles.get(i).setValue(null); - } - } - - for (int i = 0; i < this.eventHandles.size(); ++i) { - if (!nativeWaitSetEventIsReady(waitSetHandle, i)) { - this.eventHandles.get(i).setValue(null); - } - } - - for (Map.Entry entry : this.actionServerHandles) { - if (!entry.getValue().isReady(waitSetHandle)) { - entry.setValue(null); - } - } - - Iterator> subscriptionIterator = - this.subscriptionHandles.iterator(); - while (subscriptionIterator.hasNext()) { - Map.Entry entry = subscriptionIterator.next(); - if (entry.getValue() == null) { - subscriptionIterator.remove(); + { + int waitSetIndex = 0; + Iterator> it = this.subscriptionHandles.iterator(); + while (it.hasNext()) { + it.next(); + if (!nativeWaitSetSubscriptionIsReady(waitSetHandle, waitSetIndex)) { + it.remove(); + } + ++waitSetIndex; } } - - Iterator> timerIterator = this.timerHandles.iterator(); - while (timerIterator.hasNext()) { - Map.Entry entry = timerIterator.next(); - if (entry.getValue() == null) { - timerIterator.remove(); + { + int waitSetIndex = 0; + Iterator> it = this.timerHandles.iterator(); + while (it.hasNext()) { + it.next(); + if (!nativeWaitSetTimerIsReady(waitSetHandle, waitSetIndex)) { + it.remove(); + } + ++waitSetIndex; } } - - Iterator> serviceIterator = this.serviceHandles.iterator(); - while (serviceIterator.hasNext()) { - Map.Entry entry = serviceIterator.next(); - if (entry.getValue() == null) { - serviceIterator.remove(); + { + int waitSetIndex = 0; + Iterator> it = this.serviceHandles.iterator(); + while (it.hasNext()) { + it.next(); + if (!nativeWaitSetServiceIsReady(waitSetHandle, waitSetIndex)) { + it.remove(); + } + ++waitSetIndex; } } - - Iterator> clientIterator = this.clientHandles.iterator(); - while (clientIterator.hasNext()) { - Map.Entry entry = clientIterator.next(); - if (entry.getValue() == null) { - clientIterator.remove(); + { + int waitSetIndex = 0; + Iterator> it = this.clientHandles.iterator(); + while (it.hasNext()) { + it.next(); + if (!nativeWaitSetClientIsReady(waitSetHandle, waitSetIndex)) { + it.remove(); + } + ++waitSetIndex; } } - - Iterator> eventIterator = this.eventHandles.iterator(); - while (eventIterator.hasNext()) { - Map.Entry entry = eventIterator.next(); - if (entry.getValue() == null) { - eventIterator.remove(); + { + int waitSetIndex = 0; + Iterator> it = this.eventHandles.iterator(); + while (it.hasNext()) { + it.next(); + if (!nativeWaitSetEventIsReady(waitSetHandle, waitSetIndex)) { + it.remove(); + } + ++waitSetIndex; } } - - Iterator> actionServerIterator = this.actionServerHandles.iterator(); - while (actionServerIterator.hasNext()) { - Map.Entry entry = actionServerIterator.next(); - if (entry.getValue() == null) { - actionServerIterator.remove(); + { + Iterator> it = this.actionServerHandles.iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + if (!entry.getValue().isReady(waitSetHandle)) { + it.remove(); + } } } - - nativeDisposeWaitSet(waitSetHandle); } protected AnyExecutable getNextExecutable() { @@ -463,6 +445,9 @@ public void spinUntilComplete(Future future, long maxDurationNs) { anyExecutable = getNextExecutable(); } } + if (!RCLJava.ok()) { + this.dispose(); + } } private void spinSomeImpl(long maxDurationNs, boolean exhaustive) { @@ -483,6 +468,9 @@ private void spinSomeImpl(long maxDurationNs, boolean exhaustive) { workAvailable = false; } } + if (!RCLJava.ok()) { + this.dispose(); + } } protected void spinSome(long maxDurationNs) { @@ -513,6 +501,11 @@ private static native void nativeWaitSetInit( long waitSetHandle, long contextHandle, int numberOfSubscriptions, int numberOfGuardConditions, int numberOfTimers, int numberOfClients, int numberOfServices, int numberOfEvents); + + private static native void nativeWaitSetResize( + long waitSetHandle, int numberOfSubscriptions, + int numberOfGuardConditions, int numberOfTimers, int numberOfClients, + int numberOfServices, int numberOfEvents); private static native void nativeWaitSetClear(long waitSetHandle); diff --git a/rcljava/src/main/java/org/ros2/rcljava/executors/MultiThreadedExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executors/MultiThreadedExecutor.java index 5e813701..56c8ad92 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executors/MultiThreadedExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executors/MultiThreadedExecutor.java @@ -95,5 +95,8 @@ private void run() { this.spinOnce(); } } + if (!RCLJava.ok()) { + this.baseExecutor.dispose(); + } } } diff --git a/rcljava/src/main/java/org/ros2/rcljava/executors/SingleThreadedExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executors/SingleThreadedExecutor.java index a55259d4..f0251f9a 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executors/SingleThreadedExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executors/SingleThreadedExecutor.java @@ -64,5 +64,8 @@ public void spin() { while (RCLJava.ok()) { this.spinOnce(); } + if (!RCLJava.ok()) { + this.baseExecutor.dispose(); + } } }