Teach sal-remoterpc-connector to route actions 83/82283/13
authorEmmettCox <emmett.cox@est.tech>
Thu, 4 Jul 2019 16:31:38 +0000 (16:31 +0000)
committerRobert Varga <nite@hq.sk>
Mon, 8 Jul 2019 13:41:01 +0000 (13:41 +0000)
sal-remoterpc-connector already handles routing of RPC registrations
and invocations across a cluster. Actions are very similar to RPCs,
hence it is natural to keep both in the same component.

This patch refactors common bits that go into tracking both, so that
we share common actors and concepts.

JIRA: CONTROLLER-1894
Change-Id: I0b9005bc3560b4dd5977a280d83eceebe132bec9
Signed-off-by: EmmettCox <emmett.cox@est.tech>
59 files changed:
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteFuture.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteImplementation.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsInvoker.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsListener.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java with 59% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsManager.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsRegistrar.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteActionImplementation.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionException.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionFuture.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderConfig.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java with 78% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcInvoker.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcRegistrar.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractExecute.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractResponse.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ActionResponse.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteAction.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/AbstractRoutingTable.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistry.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRoutingTable.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/AbstractRegistryMXBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/OSGI-INF/blueprint/remote-rpc.xml
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractOpsTest.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java with 77% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsBrokerTest.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java with 89% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsListenerTest.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java with 60% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsRegistrarTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsImplementationTest.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java with 58% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderConfigTest.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java with 60% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactoryTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderTest.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java with 72% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactoryTest.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcErrorsExceptionTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcRegistrarTest.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteOpsTest.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java with 51% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/OpsResponseTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistryTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImplTest.java

diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteFuture.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteFuture.java
new file mode 100644 (file)
index 0000000..ae977d6
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.dispatch.OnComplete;
+import com.google.common.util.concurrent.AbstractFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+
+abstract class AbstractRemoteFuture<T, E extends Exception> extends AbstractFuture<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractRemoteFuture.class);
+
+    private final @NonNull SchemaPath type;
+
+    AbstractRemoteFuture(final @NonNull SchemaPath type, final Future<Object> requestFuture) {
+        this.type = requireNonNull(type);
+        requestFuture.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global());
+    }
+
+    @Override
+    public final T get() throws InterruptedException, ExecutionException {
+        try {
+            return super.get();
+        } catch (ExecutionException e) {
+            throw mapException(e);
+        }
+    }
+
+    @Override
+    public final T get(final long timeout, final TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        try {
+            return super.get(timeout, unit);
+        } catch (final ExecutionException e) {
+            throw mapException(e);
+        }
+    }
+
+    @Override
+    protected final boolean set(final T value) {
+        final boolean ret = super.set(value);
+        if (ret) {
+            LOG.debug("Future {} for action {} successfully completed", this, type);
+        }
+        return ret;
+    }
+
+    final void failNow(final Throwable error) {
+        LOG.debug("Failing future {} for operation {}", this, type, error);
+        setException(error);
+    }
+
+    abstract @Nullable T processReply(Object reply);
+
+    abstract @NonNull Class<E> exceptionClass();
+
+    abstract @NonNull E wrapCause(Throwable cause);
+
+    private ExecutionException mapException(final ExecutionException ex) {
+        final Throwable cause = ex.getCause();
+        return exceptionClass().isInstance(cause) ? ex : new ExecutionException(ex.getMessage(), wrapCause(cause));
+    }
+
+    private final class FutureUpdater extends OnComplete<Object> {
+        @Override
+        public void onComplete(final Throwable error, final Object reply) {
+            if (error == null) {
+                final T result = processReply(reply);
+                if (result != null) {
+                    LOG.debug("Received response for operation {}: result is {}", type, result);
+                    set(result);
+                } else {
+                    failNow(new IllegalStateException("Incorrect reply type " + reply + " from Akka"));
+                }
+            } else {
+                failNow(error);
+            }
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteImplementation.java
new file mode 100644 (file)
index 0000000..6a8030f
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.opendaylight.controller.remote.rpc.messages.AbstractExecute;
+import scala.concurrent.Future;
+
+/**
+ * An abstract base class for remote RPC/action implementations.
+ */
+abstract class AbstractRemoteImplementation<T extends AbstractExecute<?>> {
+    // 0 for local, 1 for binding, 2 for remote
+    static final long COST = 2;
+
+    private final ActorRef remoteInvoker;
+    private final Timeout askDuration;
+
+    AbstractRemoteImplementation(final ActorRef remoteInvoker, final RemoteOpsProviderConfig config) {
+        this.remoteInvoker = requireNonNull(remoteInvoker);
+        this.askDuration = config.getAskDuration();
+    }
+
+    final Future<Object> ask(final T message) {
+        return Patterns.ask(remoteInvoker, requireNonNull(message), askDuration);
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsInvoker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsInvoker.java
new file mode 100644 (file)
index 0000000..2f1ac92
--- /dev/null
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Status.Failure;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collection;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.messages.ActionResponse;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteAction;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.mdsal.dom.api.DOMActionResult;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Actor receiving invocation requests from remote nodes, routing them to
+ * {@link DOMRpcService#invokeRpc(SchemaPath, NormalizedNode)} and
+ * {@link DOMActionService#invokeAction(SchemaPath, DOMDataTreeIdentifier, ContainerNode)}.
+ *
+ * <p>
+ * Note that while the two interfaces are very similar, invocation strategies are slightly different due to historic
+ * behavior of RPCs:
+ * <ul>
+ *   <li>RPCs allow both null input and output, and this is passed to the infrastructure. Furthermore any invocation
+ *       which results in errors being reported drops the output content, even if it is present -- which is wrong, as
+ *       'errors' in this case can also be just warnings.</li>
+ *   <li>Actions do not allow null input, but allow null output. If the output is present, it is passed along with any
+ *       errors reported.</li>
+ * </ul>
+ */
+final class OpsInvoker extends AbstractUntypedActor {
+    private final DOMRpcService rpcService;
+    private final DOMActionService actionService;
+
+    private OpsInvoker(final DOMRpcService rpcService, final DOMActionService actionService) {
+        this.rpcService = requireNonNull(rpcService);
+        this.actionService = requireNonNull(actionService);
+    }
+
+    public static Props props(final DOMRpcService rpcService, final DOMActionService actionService) {
+        return Props.create(OpsInvoker.class,
+            requireNonNull(rpcService, "DOMRpcService can not be null"),
+            requireNonNull(actionService, "DOMActionService can not be null"));
+    }
+
+    @Override
+    protected void handleReceive(final Object message) {
+        if (message instanceof ExecuteRpc) {
+            LOG.debug("Handling ExecuteOps Message");
+            execute((ExecuteRpc) message);
+        } else if (message instanceof ExecuteAction) {
+            execute((ExecuteAction) message);
+        } else {
+            unknownMessage(message);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void execute(final ExecuteRpc msg) {
+        LOG.debug("Executing RPC {}", msg.getType());
+        final ActorRef sender = getSender();
+
+        final ListenableFuture<DOMRpcResult> future;
+        try {
+            future = rpcService.invokeRpc(msg.getType(), msg.getInput());
+        } catch (final RuntimeException e) {
+            LOG.debug("Failed to invoke RPC {}", msg.getType(), e);
+            sender.tell(new Failure(e), self());
+            return;
+        }
+
+        Futures.addCallback(future, new AbstractCallback<DOMRpcResult>(getSender(), msg.getType()) {
+            @Override
+            Object nullResponse(final SchemaPath type) {
+                LOG.warn("Execution of {} resulted in null result", type);
+                return new RpcResponse(null);
+            }
+
+            @Override
+            Object response(final SchemaPath type, final DOMRpcResult result) {
+                final Collection<? extends RpcError> errors = result.getErrors();
+                return errors.isEmpty() ? new RpcResponse(result.getResult())
+                        // This is legacy (wrong) behavior, which ignores the fact that errors may be just warnings,
+                        // discarding any output
+                        : new Failure(new RpcErrorsException(String.format("Execution of rpc %s failed", type),
+                            errors));
+            }
+        }, MoreExecutors.directExecutor());
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void execute(final ExecuteAction msg) {
+        LOG.debug("Executing Action {}", msg.getType());
+
+        final ActorRef sender = getSender();
+
+        final ListenableFuture<? extends DOMActionResult> future;
+        try {
+            future = actionService.invokeAction(msg.getType(), msg.getPath(), msg.getInput());
+        } catch (final RuntimeException e) {
+            LOG.debug("Failed to invoke action {}", msg.getType(), e);
+            sender.tell(new Failure(e), self());
+            return;
+        }
+
+        Futures.addCallback(future, new AbstractCallback<DOMActionResult>(getSender(), msg.getType()) {
+            @Override
+            Object nullResponse(final SchemaPath type) {
+                throw new IllegalStateException("Null invocation result of action " + type);
+            }
+
+            @Override
+            Object response(final SchemaPath type, final DOMActionResult result) {
+                final Collection<? extends RpcError> errors = result.getErrors();
+                return errors.isEmpty() ? new ActionResponse(result.getOutput(), result.getErrors())
+                    // This is legacy (wrong) behavior, which ignores the fact that errors may be just warnings,
+                    // discarding any output
+                    : new Failure(new RpcErrorsException(String.format("Execution of action %s failed", type),
+                        errors));
+            }
+        }, MoreExecutors.directExecutor());
+    }
+
+    private abstract class AbstractCallback<T> implements FutureCallback<T> {
+        private final ActorRef replyTo;
+        private final SchemaPath type;
+
+        AbstractCallback(final ActorRef replyTo, final SchemaPath type) {
+            this.replyTo = requireNonNull(replyTo);
+            this.type = requireNonNull(type);
+        }
+
+        @Override
+        public final void onSuccess(final T result) {
+            final Object response;
+            if (result == null) {
+                // This shouldn't happen but the FutureCallback annotates the result param with Nullable so handle null
+                // here to avoid FindBugs warning.
+                response = nullResponse(type);
+            } else {
+                response = response(type, result);
+            }
+
+            LOG.debug("Sending response for execution of {} : {}", type, response);
+            replyTo.tell(response, self());
+        }
+
+        @Override
+        public final void onFailure(final Throwable failure) {
+            LOG.debug("Failed to execute operation {}", type, failure);
+            LOG.error("Failed to execute operation {} due to {}. More details are available on DEBUG level.", type,
+                Throwables.getRootCause(failure).getMessage());
+            replyTo.tell(new Failure(failure), self());
+        }
+
+        abstract @NonNull Object nullResponse(@NonNull SchemaPath type);
+
+        abstract @NonNull Object response(@NonNull SchemaPath type, @NonNull T result);
+    }
+}
@@ -12,9 +12,14 @@ import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
 import java.util.Collection;
+import java.util.Set;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.mdsal.dom.api.DOMActionAvailabilityExtension;
+import org.opendaylight.mdsal.dom.api.DOMActionImplementation;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
@@ -22,17 +27,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A {@link DOMRpcAvailabilityListener} reacting to RPC implementations different than {@link RemoteRpcImplementation}.
- * The knowledge of such implementations is forwarded to {@link RpcRegistry}, which is responsible for advertising
- * their presence to other nodes.
+ * A {@link DOMRpcAvailabilityListener} reacting to RPC implementations different than
+ * {@link RemoteRpcImplementation} or {@link RemoteActionImplementation}.
+ * The knowledge of such implementations is forwarded to {@link RpcRegistry} and {@link ActionRegistry},
+ * which is responsible for advertising their presence to other nodes.
  */
-final class RpcListener implements DOMRpcAvailabilityListener {
-    private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
+final class OpsListener implements DOMRpcAvailabilityListener, DOMActionAvailabilityExtension.AvailabilityListener {
+    private static final Logger LOG = LoggerFactory.getLogger(OpsListener.class);
 
     private final ActorRef rpcRegistry;
+    private final ActorRef actionRegistry;
 
-    RpcListener(final ActorRef rpcRegistry) {
+    OpsListener(final ActorRef rpcRegistry, final ActorRef actionRegistry) {
         this.rpcRegistry = requireNonNull(rpcRegistry);
+        this.actionRegistry = requireNonNull(actionRegistry);
     }
 
     @Override
@@ -55,4 +63,16 @@ final class RpcListener implements DOMRpcAvailabilityListener {
     public boolean acceptsImplementation(final DOMRpcImplementation impl) {
         return !(impl instanceof RemoteRpcImplementation);
     }
+
+    @Override
+    public boolean acceptsImplementation(final DOMActionImplementation impl) {
+        return !(impl instanceof RemoteActionImplementation);
+    }
+
+    @Override
+    public void onActionsChanged(final Set<DOMActionInstance> removed, final Set<DOMActionInstance> added) {
+        LOG.debug("adding registration for [{}]", added);
+        LOG.debug("removing registration for [{}]", removed);
+        actionRegistry.tell(new ActionRegistry.Messages.UpdateActions(added, removed), ActorRef.noSender());
+    }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsManager.java
new file mode 100644 (file)
index 0000000..ea08565
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import akka.actor.OneForOneStrategy;
+import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
+ * {@link OpsListener} with the local {@link DOMRpcService}.
+ */
+public class OpsManager extends AbstractUntypedActor {
+    private final DOMRpcProviderService rpcProvisionRegistry;
+    private final RemoteOpsProviderConfig config;
+    private final DOMRpcService rpcServices;
+    private DOMActionProviderService actionProvisionRegistry;
+    private DOMActionService actionService;
+
+    private ListenerRegistration<OpsListener> listenerReg;
+    private ActorRef opsInvoker;
+    private ActorRef actionRegistry;
+    private ActorRef rpcRegistry;
+    private ActorRef opsRegistrar;
+
+    OpsManager(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+               final RemoteOpsProviderConfig config, final DOMActionProviderService actionProviderService,
+               final DOMActionService actionService) {
+        this.rpcProvisionRegistry = requireNonNull(rpcProvisionRegistry);
+        this.rpcServices = requireNonNull(rpcServices);
+        this.config = requireNonNull(config);
+        this.actionProvisionRegistry = requireNonNull(actionProviderService);
+        this.actionService = requireNonNull(actionService);
+    }
+
+    public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+                              final RemoteOpsProviderConfig config,
+                              final DOMActionProviderService actionProviderService,
+                              final DOMActionService actionService) {
+        requireNonNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
+        requireNonNull(rpcServices, "RpcService can not be null!");
+        requireNonNull(config, "RemoteOpsProviderConfig can not be null!");
+        requireNonNull(actionProviderService, "ActionProviderService can not be null!");
+        requireNonNull(actionService, "ActionService can not be null!");
+        return Props.create(OpsManager.class, rpcProvisionRegistry, rpcServices, config,
+                actionProviderService, actionService);
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        super.preStart();
+
+        opsInvoker = getContext().actorOf(OpsInvoker.props(rpcServices, actionService)
+                .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+        LOG.debug("Listening for RPC invocation requests with {}", opsInvoker);
+
+        opsRegistrar = getContext().actorOf(OpsRegistrar.props(config, rpcProvisionRegistry, actionProvisionRegistry)
+                .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
+        LOG.debug("Registering remote RPCs with {}", opsRegistrar);
+
+        rpcRegistry = getContext().actorOf(RpcRegistry.props(config, opsInvoker, opsRegistrar)
+                .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+        LOG.debug("Propagating RPC information with {}", rpcRegistry);
+
+        actionRegistry = getContext().actorOf(RpcRegistry.props(config, opsInvoker, opsRegistrar)
+                .withMailbox(config.getMailBoxName()), config.getActionRegistryName());
+        LOG.debug("Propagating RPC information with {}", actionRegistry);
+
+        final OpsListener opsListener = new OpsListener(rpcRegistry, actionRegistry);
+        LOG.debug("Registering local availability listener {}", opsListener);
+        listenerReg = rpcServices.registerRpcListener(opsListener);
+    }
+
+    @Override
+    public void postStop() throws Exception {
+        if (listenerReg != null) {
+            listenerReg.close();
+            listenerReg = null;
+        }
+
+        super.postStop();
+    }
+
+    @Override
+    protected void handleReceive(final Object message) {
+        unknownMessage(message);
+    }
+
+    @Override
+    public SupervisorStrategy supervisorStrategy() {
+        return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), t -> {
+            LOG.error("An exception happened actor will be resumed", t);
+            return SupervisorStrategy.resume();
+        });
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsRegistrar.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsRegistrar.java
new file mode 100644 (file)
index 0000000..fdda197
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.Address;
+import akka.actor.Props;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
+import org.opendaylight.mdsal.dom.api.DOMActionImplementation;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+
+/**
+ * Actor handling registration of RPCs and Actions available on remote nodes with the local
+ * {@link DOMRpcProviderService} and {@link DOMActionProviderService}.
+ */
+final class OpsRegistrar extends AbstractUntypedActor {
+    private final Map<Address, ObjectRegistration<DOMRpcImplementation>> rpcRegs = new HashMap<>();
+    private final Map<Address, ObjectRegistration<DOMActionImplementation>> actionRegs = new HashMap<>();
+    private final DOMRpcProviderService rpcProviderService;
+    private final RemoteOpsProviderConfig config;
+    private final DOMActionProviderService actionProviderService;
+
+    OpsRegistrar(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService,
+                 final DOMActionProviderService actionProviderService) {
+        this.config = requireNonNull(config);
+        this.rpcProviderService = requireNonNull(rpcProviderService);
+        this.actionProviderService = requireNonNull(actionProviderService);
+    }
+
+    public static Props props(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService,
+                              final DOMActionProviderService actionProviderService) {
+        return Props.create(OpsRegistrar.class, requireNonNull(config),
+            requireNonNull(rpcProviderService, "DOMRpcProviderService cannot be null"),
+            requireNonNull(actionProviderService, "DOMActionProviderService cannot be null"));
+    }
+
+    @Override
+    public void postStop() throws Exception {
+        rpcRegs.clear();
+        actionRegs.clear();
+
+        super.postStop();
+    }
+
+    @Override
+    protected void handleReceive(final Object message) {
+        if (message instanceof UpdateRemoteEndpoints) {
+            LOG.debug("Handling updateRemoteEndpoints message");
+            updateRemoteRpcEndpoints(((UpdateRemoteEndpoints) message).getRpcEndpoints());
+        } else if (message instanceof UpdateRemoteActionEndpoints) {
+            LOG.debug("Handling updateRemoteActionEndpoints message");
+            updateRemoteActionEndpoints(((UpdateRemoteActionEndpoints) message).getActionEndpoints());
+        } else {
+            unknownMessage(message);
+        }
+    }
+
+    private void updateRemoteRpcEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
+        /*
+         * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close
+         * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
+         * unavailability which would occur if we were to do it the other way around.
+         *
+         * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
+         * hence we register all new implementations before closing all registrations.
+         */
+        for (Entry<Address, Optional<RemoteRpcEndpoint>> e : rpcEndpoints.entrySet()) {
+            LOG.debug("Updating RPC registrations for {}", e.getKey());
+
+            final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
+            if (maybeEndpoint.isPresent()) {
+                final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
+                final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
+                rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
+                        endpoint.getRpcs()));
+            } else {
+                rpcRegs.remove(e.getKey());
+            }
+        }
+    }
+
+    /**
+     * Updates the action endpoints, Adding new registrations first before removing previous registrations.
+     */
+    private void updateRemoteActionEndpoints(final Map<Address,
+            Optional<ActionRegistry.RemoteActionEndpoint>> actionEndpoints) {
+        /*
+         * Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close
+         * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
+         * unavailability which would occur if we were to do it the other way around.
+         *
+         * Note that when an Action moves from one remote node to another, we also do not want to expose the gap,
+         * hence we register all new implementations before closing all registrations.
+         */
+        for (Entry<Address, Optional<RemoteActionEndpoint>> e : actionEndpoints.entrySet()) {
+            LOG.debug("Updating Action registrations for {}", e.getKey());
+
+            final Optional<RemoteActionEndpoint> maybeEndpoint = e.getValue();
+            if (maybeEndpoint.isPresent()) {
+                final RemoteActionEndpoint endpoint = maybeEndpoint.get();
+                final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config);
+                actionRegs.put(e.getKey(),
+                        actionProviderService.registerActionImplementation(impl, endpoint.getActions()));
+            } else {
+                actionRegs.remove(e.getKey());
+            }
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteActionImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteActionImplementation.java
new file mode 100644 (file)
index 0000000..c3eeeed
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteAction;
+import org.opendaylight.mdsal.dom.api.DOMActionImplementation;
+import org.opendaylight.mdsal.dom.api.DOMActionResult;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link DOMActionImplementation} which routes invocation requests to a remote invoker actor.
+ */
+final class RemoteActionImplementation extends AbstractRemoteImplementation<ExecuteAction>
+        implements DOMActionImplementation {
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteActionImplementation.class);
+
+    RemoteActionImplementation(final ActorRef remoteInvoker, final RemoteOpsProviderConfig config) {
+        super(remoteInvoker, config);
+    }
+
+    /**
+     * Routes action request to a remote invoker, which will execute the action and return with result.
+     */
+    @Override
+    public ListenableFuture<DOMActionResult> invokeAction(final SchemaPath type, final DOMDataTreeIdentifier path,
+                                                          final ContainerNode input) {
+        LOG.debug("invoking action {} with path {}", type, path);
+        return new RemoteDOMActionFuture(type, ask(ExecuteAction.from(type, path, input)));
+    }
+
+    @Override
+    public long invocationCost() {
+        return COST;
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionException.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionException.java
new file mode 100644 (file)
index 0000000..b866499
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import org.opendaylight.mdsal.dom.api.DOMActionException;
+
+public class RemoteDOMActionException extends DOMActionException {
+    private static final long serialVersionUID = 1L;
+
+    RemoteDOMActionException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionFuture.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionFuture.java
new file mode 100644 (file)
index 0000000..8dbbff5
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.remote.rpc.messages.ActionResponse;
+import org.opendaylight.mdsal.dom.api.DOMActionException;
+import org.opendaylight.mdsal.dom.api.DOMActionResult;
+import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import scala.concurrent.Future;
+
+final class RemoteDOMActionFuture extends AbstractRemoteFuture<DOMActionResult, DOMActionException> {
+    RemoteDOMActionFuture(final @NonNull SchemaPath type, final @NonNull Future<Object> requestFuture) {
+        super(type, requestFuture);
+    }
+
+    @Override
+    DOMActionResult processReply(final Object reply) {
+        if (reply instanceof ActionResponse) {
+            final ActionResponse actionReply = (ActionResponse) reply;
+            final ContainerNode output = actionReply.getOutput();
+            return output == null ? new SimpleDOMActionResult(actionReply.getErrors())
+                    : new SimpleDOMActionResult(output, actionReply.getErrors());
+        }
+
+        return null;
+    }
+
+    @Override
+    Class<DOMActionException> exceptionClass() {
+        return DOMActionException.class;
+    }
+
+    @Override
+    DOMActionException wrapCause(final Throwable cause) {
+        return new RemoteDOMActionException("Exception during invoking ACTION", cause);
+    }
+}
index d5e46a9..b858f80 100644 (file)
@@ -7,94 +7,31 @@
  */
 package org.opendaylight.controller.remote.rpc;
 
-import static java.util.Objects.requireNonNull;
-
-import akka.dispatch.OnComplete;
-import com.google.common.util.concurrent.AbstractFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
 import org.opendaylight.mdsal.dom.api.DOMRpcException;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import scala.concurrent.Future;
 
-final class RemoteDOMRpcFuture extends AbstractFuture<DOMRpcResult> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RemoteDOMRpcFuture.class);
-
-    private final QName rpcName;
-
-    private RemoteDOMRpcFuture(final QName rpcName) {
-        this.rpcName = requireNonNull(rpcName, "rpcName");
-    }
-
-    public static RemoteDOMRpcFuture create(final QName rpcName) {
-        return new RemoteDOMRpcFuture(rpcName);
-    }
-
-    protected void failNow(final Throwable error) {
-        LOG.debug("Failing future {} for rpc {}", this, rpcName, error);
-        setException(error);
-    }
-
-    protected void completeWith(final Future<Object> future) {
-        future.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global());
+final class RemoteDOMRpcFuture extends AbstractRemoteFuture<DOMRpcResult, DOMRpcException> {
+    RemoteDOMRpcFuture(final @NonNull SchemaPath type, final @NonNull Future<Object> requestFuture) {
+        super(type, requestFuture);
     }
 
     @Override
-    public DOMRpcResult get() throws InterruptedException, ExecutionException {
-        try {
-            return super.get();
-        } catch (ExecutionException e) {
-            throw mapException(e);
-        }
+    DOMRpcResult processReply(final Object reply) {
+        return reply instanceof RpcResponse ? new DefaultDOMRpcResult(((RpcResponse) reply).getOutput()) : null;
     }
 
     @Override
-    public DOMRpcResult get(final long timeout, final TimeUnit unit)
-            throws InterruptedException, ExecutionException, TimeoutException {
-        try {
-            return super.get(timeout, unit);
-        } catch (final ExecutionException e) {
-            throw mapException(e);
-        }
-    }
-
-    private static ExecutionException mapException(final ExecutionException ex) {
-        final Throwable cause = ex.getCause();
-        if (cause instanceof DOMRpcException) {
-            return ex;
-        }
-        return new ExecutionException(ex.getMessage(),
-                new RemoteDOMRpcException("Exception during invoking RPC", ex.getCause()));
+    Class<DOMRpcException> exceptionClass() {
+        return DOMRpcException.class;
     }
 
-    private final class FutureUpdater extends OnComplete<Object> {
-
-        @Override
-        public void onComplete(final Throwable error, final Object reply) {
-            if (error != null) {
-                RemoteDOMRpcFuture.this.failNow(error);
-            } else if (reply instanceof RpcResponse) {
-                final RpcResponse rpcReply = (RpcResponse) reply;
-                final NormalizedNode<?, ?> result = rpcReply.getResultNormalizedNode();
-
-                LOG.debug("Received response for rpc {}: result is {}", rpcName, result);
-
-                RemoteDOMRpcFuture.this.set(new DefaultDOMRpcResult(result));
-
-                LOG.debug("Future {} for rpc {} successfully completed", RemoteDOMRpcFuture.this, rpcName);
-            } else {
-                RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply
-                        + "from Akka"));
-            }
-        }
+    @Override
+    DOMRpcException wrapCause(final Throwable cause) {
+        return new RemoteDOMRpcException("Exception during invoking RPC", cause);
     }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProvider.java
new file mode 100644 (file)
index 0000000..dabe5be
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the base class which initialize all the actors, listeners and
+ * default RPc implementation so remote invocation of rpcs.
+ */
+public class RemoteOpsProvider implements AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteOpsProvider.class);
+
+    private final DOMRpcProviderService rpcProvisionRegistry;
+    private final RemoteOpsProviderConfig config;
+    private final ActorSystem actorSystem;
+    private final DOMRpcService rpcService;
+    private final DOMActionProviderService actionProvisionRegistry;
+    private final DOMActionService actionService;
+
+    private ActorRef opsManager;
+
+    public RemoteOpsProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry,
+                             final DOMRpcService rpcService, final RemoteOpsProviderConfig config,
+                             final DOMActionProviderService actionProviderService,
+                             final DOMActionService actionService) {
+        this.actorSystem = requireNonNull(actorSystem);
+        this.rpcProvisionRegistry = requireNonNull(rpcProvisionRegistry);
+        this.rpcService = requireNonNull(rpcService);
+        this.config = requireNonNull(config);
+        this.actionProvisionRegistry = requireNonNull(actionProviderService);
+        this.actionService = requireNonNull(actionService);
+    }
+
+    @Override
+    public void close() {
+        if (opsManager != null) {
+            LOG.info("Stopping Ops Manager at {}", opsManager);
+            opsManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            opsManager = null;
+        }
+    }
+
+    public void start() {
+        LOG.info("Starting Remote Ops service...");
+        opsManager = actorSystem.actorOf(OpsManager.props(rpcProvisionRegistry, rpcService, config,
+                actionProvisionRegistry, actionService), config.getRpcManagerName());
+        LOG.debug("Ops Manager started at {}", opsManager);
+    }
+}
@@ -14,25 +14,28 @@ import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import scala.concurrent.duration.FiniteDuration;
 
-public class RemoteRpcProviderConfig extends CommonConfig {
+public class RemoteOpsProviderConfig extends CommonConfig {
 
     protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name";
     protected static final String TAG_RPC_REGISTRAR_NAME = "rpc-registrar-name";
     protected static final String TAG_RPC_REGISTRY_NAME = "registry-name";
+    protected static final String TAG_ACTION_REGISTRY_NAME = "action-registry-name";
     protected static final String TAG_RPC_MGR_NAME = "rpc-manager-name";
     protected static final String TAG_RPC_BROKER_PATH = "rpc-broker-path";
     protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path";
+    protected static final String TAG_ACTION_REGISTRY_PATH = "action-registry-path";
     protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path";
     protected static final String TAG_ASK_DURATION = "ask-duration";
 
     private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval";
     private static final String TAG_RPC_REGISTRY_PERSISTENCE_ID = "rpc-registry-persistence-id";
+    private static final String TAG_ACTION_REGISTRY_PERSISTENCE_ID = "action-registry-persistence-id";
 
     //locally cached values
     private Timeout cachedAskDuration;
     private FiniteDuration cachedGossipTickInterval;
 
-    public RemoteRpcProviderConfig(final Config config) {
+    public RemoteOpsProviderConfig(final Config config) {
         super(config);
     }
 
@@ -48,6 +51,10 @@ public class RemoteRpcProviderConfig extends CommonConfig {
         return get().getString(TAG_RPC_REGISTRY_NAME);
     }
 
+    public String getActionRegistryName() {
+        return get().getString(TAG_ACTION_REGISTRY_NAME);
+    }
+
     public String getRpcManagerName() {
         return get().getString(TAG_RPC_MGR_NAME);
     }
@@ -64,6 +71,14 @@ public class RemoteRpcProviderConfig extends CommonConfig {
         return get().getString(TAG_RPC_REGISTRY_PERSISTENCE_ID);
     }
 
+    public String getActionRegistryPath() {
+        return get().getString(TAG_ACTION_REGISTRY_PATH);
+    }
+
+    public String getActionRegistryPersistenceId() {
+        return get().getString(TAG_ACTION_REGISTRY_PERSISTENCE_ID);
+    }
+
     public String getRpcManagerPath() {
         return get().getString(TAG_RPC_MGR_PATH);
     }
@@ -95,10 +110,10 @@ public class RemoteRpcProviderConfig extends CommonConfig {
      */
     @SuppressFBWarnings(value = "BC_UNCONFIRMED_CAST_OF_RETURN_VALUE",
             justification = "Findbugs flags this as an unconfirmed cast of return value but the build method clearly "
-                + "returns RemoteRpcProviderConfig. Perhaps it's confused b/c the build method is overloaded and "
+                + "returns RemoteOpsProviderConfig. Perhaps it's confused b/c the build method is overloaded and "
                 + "and differs in return type from the base class.")
-    public static RemoteRpcProviderConfig newInstance(final String actorSystemName, final boolean metricCaptureEnabled,
-            final int mailboxCapacity) {
+    public static RemoteOpsProviderConfig newInstance(final String actorSystemName, final boolean metricCaptureEnabled,
+                                                      final int mailboxCapacity) {
         return new Builder(actorSystemName).metricCaptureEnabled(metricCaptureEnabled)
                 .mailboxCapacity(mailboxCapacity).build();
     }
@@ -112,11 +127,13 @@ public class RemoteRpcProviderConfig extends CommonConfig {
             configHolder.put(TAG_RPC_BROKER_NAME, "broker");
             configHolder.put(TAG_RPC_REGISTRAR_NAME, "registrar");
             configHolder.put(TAG_RPC_REGISTRY_NAME, "registry");
+            configHolder.put(TAG_ACTION_REGISTRY_NAME, "action-registry");
             configHolder.put(TAG_RPC_MGR_NAME, "rpc");
 
             //Actor paths
             configHolder.put(TAG_RPC_BROKER_PATH, "/user/rpc/broker");
             configHolder.put(TAG_RPC_REGISTRY_PATH, "/user/rpc/registry");
+            configHolder.put(TAG_ACTION_REGISTRY_PATH, "/user/action/registry");
             configHolder.put(TAG_RPC_MGR_PATH, "/user/rpc");
 
             //durations
@@ -125,6 +142,7 @@ public class RemoteRpcProviderConfig extends CommonConfig {
 
             // persistence
             configHolder.put(TAG_RPC_REGISTRY_PERSISTENCE_ID, "remote-rpc-registry");
+            configHolder.put(TAG_ACTION_REGISTRY_PERSISTENCE_ID, "remote-action-registry");
         }
 
         public Builder gossipTickInterval(final String interval) {
@@ -133,8 +151,8 @@ public class RemoteRpcProviderConfig extends CommonConfig {
         }
 
         @Override
-        public RemoteRpcProviderConfig build() {
-            return new RemoteRpcProviderConfig(merge());
+        public RemoteOpsProviderConfig build() {
+            return new RemoteOpsProviderConfig(merge());
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactory.java
new file mode 100644 (file)
index 0000000..35043c3
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorSystem;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+
+public final class RemoteOpsProviderFactory {
+    private RemoteOpsProviderFactory() {
+
+    }
+
+    public static RemoteOpsProvider createInstance(final DOMRpcProviderService rpcProviderService,
+                                                   final DOMRpcService rpcService, final ActorSystem actorSystem,
+                                                   final RemoteOpsProviderConfig config,
+                                                   final DOMActionProviderService actionProviderService,
+                                                   final DOMActionService actionService) {
+
+        return new RemoteOpsProvider(actorSystem, rpcProviderService, rpcService, config,
+                actionProviderService, actionService);
+    }
+}
index 71b2759..a737049 100644 (file)
@@ -7,11 +7,7 @@
  */
 package org.opendaylight.controller.remote.rpc;
 
-import static java.util.Objects.requireNonNull;
-
 import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
@@ -24,24 +20,15 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  *
  * @author Robert Varga
  */
-final class RemoteRpcImplementation implements DOMRpcImplementation {
-    // 0 for local, 1 for binding, 2 for remote
-    private static final long COST = 2;
-
-    private final ActorRef remoteInvoker;
-    private final Timeout askDuration;
-
-    RemoteRpcImplementation(final ActorRef remoteInvoker, final RemoteRpcProviderConfig config) {
-        this.remoteInvoker = requireNonNull(remoteInvoker);
-        this.askDuration = config.getAskDuration();
+final class RemoteRpcImplementation extends AbstractRemoteImplementation<ExecuteRpc> implements DOMRpcImplementation {
+    RemoteRpcImplementation(final ActorRef remoteInvoker, final RemoteOpsProviderConfig config) {
+        super(remoteInvoker, config);
     }
 
     @Override
     public ListenableFuture<DOMRpcResult> invokeRpc(final DOMRpcIdentifier rpc,
             final NormalizedNode<?, ?> input) {
-        final RemoteDOMRpcFuture ret = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent());
-        ret.completeWith(Patterns.ask(remoteInvoker, ExecuteRpc.from(rpc, input), askDuration));
-        return ret;
+        return new RemoteDOMRpcFuture(rpc.getType(), ask(ExecuteRpc.from(rpc, input)));
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java
deleted file mode 100644 (file)
index 6bf84a9..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import com.google.common.base.Preconditions;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is the base class which initialize all the actors, listeners and
- * default RPc implementation so remote invocation of rpcs.
- */
-public class RemoteRpcProvider implements AutoCloseable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
-
-    private final DOMRpcProviderService rpcProvisionRegistry;
-    private final RemoteRpcProviderConfig config;
-    private final ActorSystem actorSystem;
-    private final DOMRpcService rpcService;
-
-    private ActorRef rpcManager;
-
-    public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry,
-            final DOMRpcService rpcService, final RemoteRpcProviderConfig config) {
-        this.actorSystem = Preconditions.checkNotNull(actorSystem);
-        this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
-        this.rpcService = Preconditions.checkNotNull(rpcService);
-        this.config = Preconditions.checkNotNull(config);
-    }
-
-    @Override
-    public void close() {
-        if (rpcManager != null) {
-            LOG.info("Stopping RPC Manager at {}", rpcManager);
-            rpcManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            rpcManager = null;
-        }
-    }
-
-    public void start() {
-        LOG.info("Starting Remote RPC service...");
-        rpcManager = actorSystem.actorOf(RpcManager.props(rpcProvisionRegistry, rpcService, config),
-                config.getRpcManagerName());
-        LOG.debug("RPC Manager started at {}", rpcManager);
-    }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java
deleted file mode 100644 (file)
index 58a7001..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorSystem;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-
-public final class RemoteRpcProviderFactory {
-    private RemoteRpcProviderFactory() {
-
-    }
-
-    public static RemoteRpcProvider createInstance(final DOMRpcProviderService rpcProviderService,
-            final DOMRpcService rpcService, final ActorSystem actorSystem, final RemoteRpcProviderConfig config) {
-
-        return new RemoteRpcProvider(actorSystem, rpcProviderService, rpcService, config);
-    }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcInvoker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcInvoker.java
deleted file mode 100644 (file)
index 4d3a66c..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.mdsal.dom.api.DOMRpcResult;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
-/**
- * Actor receiving invocation requests from remote nodes, routing them to
- * {@link DOMRpcService#invokeRpc(SchemaPath, NormalizedNode)}.
- */
-final class RpcInvoker extends AbstractUntypedActor {
-    private final DOMRpcService rpcService;
-
-    private RpcInvoker(final DOMRpcService rpcService) {
-        this.rpcService = Preconditions.checkNotNull(rpcService);
-    }
-
-    public static Props props(final DOMRpcService rpcService) {
-        Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
-        return Props.create(RpcInvoker.class, rpcService);
-    }
-
-    @Override
-    protected void handleReceive(final Object message) {
-        if (message instanceof ExecuteRpc) {
-            executeRpc((ExecuteRpc) message);
-        } else {
-            unknownMessage(message);
-        }
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void executeRpc(final ExecuteRpc msg) {
-        LOG.debug("Executing rpc {}", msg.getRpc());
-        final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
-        final ActorRef sender = getSender();
-        final ActorRef self = self();
-
-        final ListenableFuture<DOMRpcResult> future;
-        try {
-            future = rpcService.invokeRpc(schemaPath, msg.getInputNormalizedNode());
-        } catch (final RuntimeException e) {
-            LOG.debug("Failed to invoke RPC {}", msg.getRpc(), e);
-            sender.tell(new akka.actor.Status.Failure(e), sender);
-            return;
-        }
-
-        Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
-            @Override
-            public void onSuccess(final DOMRpcResult result) {
-                if (result == null) {
-                    // This shouldn't happen but the FutureCallback annotates the result param with Nullable so
-                    // handle null here to avoid FindBugs warning.
-                    LOG.debug("Got null DOMRpcResult - sending null response for execute rpc : {}", msg.getRpc());
-                    sender.tell(new RpcResponse(null), self);
-                    return;
-                }
-
-                if (!result.getErrors().isEmpty()) {
-                    final String message = String.format("Execution of RPC %s failed", msg.getRpc());
-                    sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, result.getErrors())),
-                        self);
-                } else {
-                    LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
-                    sender.tell(new RpcResponse(result.getResult()), self);
-                }
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                LOG.debug("Failed to execute RPC {}", msg.getRpc(), failure);
-                LOG.error("Failed to execute RPC {} due to {}. More details are available on DEBUG level.",
-                    msg.getRpc(), Throwables.getRootCause(failure).getMessage());
-                sender.tell(new akka.actor.Status.Failure(failure), self);
-            }
-        }, MoreExecutors.directExecutor());
-    }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
deleted file mode 100644 (file)
index 5dbc1cd..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorRef;
-import akka.actor.OneForOneStrategy;
-import akka.actor.Props;
-import akka.actor.SupervisorStrategy;
-import com.google.common.base.Preconditions;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
- * {@link RpcListener} with the local {@link DOMRpcService}.
- */
-public class RpcManager extends AbstractUntypedActor {
-    private final DOMRpcProviderService rpcProvisionRegistry;
-    private final RemoteRpcProviderConfig config;
-    private final DOMRpcService rpcServices;
-
-    private ListenerRegistration<RpcListener> listenerReg;
-    private ActorRef rpcInvoker;
-    private ActorRef rpcRegistry;
-    private ActorRef rpcRegistrar;
-
-    RpcManager(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
-            final RemoteRpcProviderConfig config) {
-        this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
-        this.rpcServices = Preconditions.checkNotNull(rpcServices);
-        this.config = Preconditions.checkNotNull(config);
-    }
-
-    public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
-            final RemoteRpcProviderConfig config) {
-        Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
-        Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
-        Preconditions.checkNotNull(config, "RemoteRpcProviderConfig can not be null!");
-        return Props.create(RpcManager.class, rpcProvisionRegistry, rpcServices, config);
-    }
-
-    @Override
-    public void preStart() throws Exception {
-        super.preStart();
-
-        rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
-            .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
-        LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
-
-        rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
-            .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
-        LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
-
-        rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
-                .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
-        LOG.debug("Propagating RPC information with {}", rpcRegistry);
-
-        final RpcListener rpcListener = new RpcListener(rpcRegistry);
-        LOG.debug("Registering local availabitility listener {}", rpcListener);
-        listenerReg = rpcServices.registerRpcListener(rpcListener);
-    }
-
-    @Override
-    public void postStop() throws Exception {
-        if (listenerReg != null) {
-            listenerReg.close();
-            listenerReg = null;
-        }
-
-        super.postStop();
-    }
-
-    @Override
-    protected void handleReceive(final Object message) {
-        unknownMessage(message);
-    }
-
-    @Override
-    public SupervisorStrategy supervisorStrategy() {
-        return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), t -> {
-            LOG.error("An exception happened actor will be resumed", t);
-            return SupervisorStrategy.resume();
-        });
-    }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcRegistrar.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcRegistrar.java
deleted file mode 100644 (file)
index 5fc088d..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.Address;
-import akka.actor.Props;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
-import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-
-/**
- * Actor handling registration of RPCs available on remote nodes with the local {@link DOMRpcProviderService}.
- *
- * @author Robert Varga
- */
-final class RpcRegistrar extends AbstractUntypedActor {
-    private final Map<Address, DOMRpcImplementationRegistration<?>> regs = new HashMap<>();
-    private final DOMRpcProviderService rpcProviderService;
-    private final RemoteRpcProviderConfig config;
-
-    RpcRegistrar(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) {
-        this.config = Preconditions.checkNotNull(config);
-        this.rpcProviderService = Preconditions.checkNotNull(rpcProviderService);
-    }
-
-    public static Props props(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) {
-        Preconditions.checkNotNull(rpcProviderService, "DOMRpcProviderService cannot be null");
-        return Props.create(RpcRegistrar.class, config, rpcProviderService);
-    }
-
-    @Override
-    public void postStop() throws Exception {
-        regs.values().forEach(DOMRpcImplementationRegistration::close);
-        regs.clear();
-
-        super.postStop();
-    }
-
-    @Override
-    protected void handleReceive(final Object message) {
-        if (message instanceof UpdateRemoteEndpoints) {
-            updateRemoteEndpoints(((UpdateRemoteEndpoints) message).getEndpoints());
-        } else {
-            unknownMessage(message);
-        }
-    }
-
-    private void updateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
-        /*
-         * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close
-         * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
-         * unavailability which would occur if we were to do it the other way around.
-         *
-         * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
-         * hence we register all new implementations before closing all registrations.
-         */
-        final Collection<DOMRpcImplementationRegistration<?>> prevRegs = new ArrayList<>(endpoints.size());
-
-        for (Entry<Address, Optional<RemoteRpcEndpoint>> e : endpoints.entrySet()) {
-            LOG.debug("Updating RPC registrations for {}", e.getKey());
-
-            final DOMRpcImplementationRegistration<?> prevReg;
-            final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
-            if (maybeEndpoint.isPresent()) {
-                final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
-                final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
-                prevReg = regs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
-                    endpoint.getRpcs()));
-            } else {
-                prevReg = regs.remove(e.getKey());
-            }
-
-            if (prevReg != null) {
-                prevRegs.add(prevReg);
-            }
-        }
-
-        for (DOMRpcImplementationRegistration<?> r : prevRegs) {
-            r.close();
-        }
-    }
-}
index 6590941..fb03430 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.remote.rpc;
 
 import akka.actor.Terminated;
@@ -21,7 +20,8 @@ public class TerminationMonitor extends UntypedAbstractActor {
         LOG.debug("Created TerminationMonitor");
     }
 
-    @Override public void onReceive(Object message) {
+    @Override
+    public void onReceive(final Object message) {
         if (message instanceof Terminated) {
             Terminated terminated = (Terminated) message;
             LOG.debug("Actor terminated : {}", terminated.actor());
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractExecute.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractExecute.java
new file mode 100644 (file)
index 0000000..a23bcb3
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import java.io.Serializable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * An abstract base class for invocation requests. Specialized via {@link ExecuteAction} and {@link ExecuteRpc}.
+ */
+public abstract class AbstractExecute<T extends NormalizedNode<?, ?>> implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final transient @NonNull SchemaPath type;
+    private final transient T input;
+
+    AbstractExecute(final @NonNull SchemaPath type, final T input) {
+        this.type = requireNonNull(type);
+        this.input = input;
+    }
+
+    public final @NonNull SchemaPath getType() {
+        return type;
+    }
+
+    public final T getInput() {
+        return input;
+    }
+
+    @Override
+    public final String toString() {
+        // We want 'type' to be always first
+        return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues().add("type", type)).toString();
+    }
+
+    ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+        return helper.add("input", input);
+    }
+
+    abstract Object writeReplace();
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractResponse.java
new file mode 100644 (file)
index 0000000..1789753
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import java.io.Serializable;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * An abstract base class for invocation responses. Specialized via {@link ActionResponse} and {@link RpcResponse}.
+ */
+public abstract class AbstractResponse<T extends NormalizedNode<?, ?>> implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final transient @Nullable T output;
+
+    public AbstractResponse(final @Nullable T output) {
+        this.output = output;
+    }
+
+    public final @Nullable T getOutput() {
+        return output;
+    }
+
+    abstract Object writeReplace();
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ActionResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ActionResponse.java
new file mode 100644 (file)
index 0000000..d6a2358
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ImmutableList;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+@SuppressFBWarnings({"SE_TRANSIENT_FIELD_NOT_RESTORED", "DMI_NONSERIALIZABLE_OBJECT_WRITTEN"})
+public class ActionResponse extends AbstractResponse<ContainerNode> {
+    private static final long serialVersionUID = 1L;
+
+    private final transient @NonNull ImmutableList<@NonNull RpcError> errors;
+
+    public ActionResponse(final @NonNull Optional<ContainerNode> output, @NonNull final Collection<RpcError> errors) {
+        super(output.orElse(null));
+        this.errors = ImmutableList.copyOf(errors);
+    }
+
+    public @NonNull ImmutableList<@NonNull RpcError> getErrors() {
+        return errors;
+    }
+
+    @Override
+    Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private ActionResponse actionResponse;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(final ActionResponse actionResponse) {
+            this.actionResponse = requireNonNull(actionResponse);
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeObject(actionResponse.getErrors());
+            SerializationUtils.writeNormalizedNode(out, actionResponse.getOutput());
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            @SuppressWarnings("unchecked")
+            final ImmutableList<RpcError> errors = (ImmutableList<RpcError>) in.readObject();
+            final Optional<NormalizedNode<?, ?>> output = SerializationUtils.readNormalizedNode(in);
+            actionResponse = new ActionResponse(output.map(ContainerNode.class::cast), errors);
+        }
+
+        private Object readResolve() {
+            return actionResponse;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteAction.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteAction.java
new file mode 100644 (file)
index 0000000..5f9d99f
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects.ToStringHelper;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public final class ExecuteAction extends AbstractExecute<@NonNull ContainerNode> {
+    private static final long serialVersionUID = 1128904894827335676L;
+
+    private final @NonNull DOMDataTreeIdentifier path;
+
+    private ExecuteAction(final @NonNull SchemaPath type, final @NonNull DOMDataTreeIdentifier path,
+            final @NonNull ContainerNode input) {
+        super(type, requireNonNull(input));
+        this.path = requireNonNull(path);
+    }
+
+    public static @NonNull ExecuteAction from(final @NonNull SchemaPath type, @NonNull final DOMDataTreeIdentifier path,
+            final @NonNull ContainerNode input) {
+        return new ExecuteAction(type, path, input);
+    }
+
+    public @NonNull DOMDataTreeIdentifier getPath() {
+        return path;
+    }
+
+    @Override
+    ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+        return super.addToStringAttributes(helper.add("path", path));
+    }
+
+    @Override
+    Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private ExecuteAction executeAction;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+
+        }
+
+        Proxy(final ExecuteAction executeAction) {
+            this.executeAction = requireNonNull(executeAction);
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            try (NormalizedNodeDataOutput stream = NormalizedNodeInputOutput.newDataOutput(out)) {
+                stream.writeSchemaPath(executeAction.getType());
+                // FIXME: deal with data store types?
+                stream.writeYangInstanceIdentifier(executeAction.getPath().getRootIdentifier());
+                stream.writeOptionalNormalizedNode(executeAction.getInput());
+            }
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException {
+            final NormalizedNodeDataInput stream = NormalizedNodeInputOutput.newDataInput(in);
+            final SchemaPath name = stream.readSchemaPath();
+            final YangInstanceIdentifier path = stream.readYangInstanceIdentifier();
+            final ContainerNode input = (ContainerNode) stream.readOptionalNormalizedNode().orElse(null);
+
+            executeAction = new ExecuteAction(name, new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, path),
+                input);
+        }
+
+        private Object readResolve() {
+            return verifyNotNull(executeAction);
+        }
+    }
+}
index c9fb52b..cfa0c89 100644 (file)
@@ -9,61 +9,37 @@ package org.opendaylight.controller.remote.rpc.messages;
 
 import static java.util.Objects.requireNonNull;
 
-import com.google.common.base.MoreObjects;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.io.Serializable;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
-public final class ExecuteRpc implements Serializable {
+public final class ExecuteRpc extends AbstractExecute<@Nullable NormalizedNode<?, ?>> {
     private static final long serialVersionUID = 1128904894827335676L;
 
-    @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
-            + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
-            + "aren't serialized. FindBugs does not recognize this.")
-    private final NormalizedNode<?, ?> inputNormalizedNode;
-    private final QName rpc;
-
-    private ExecuteRpc(final @Nullable NormalizedNode<?, ?> inputNormalizedNode, final @NonNull QName rpc) {
-        this.rpc = requireNonNull(rpc, "rpc Qname should not be null");
-        this.inputNormalizedNode = inputNormalizedNode;
-    }
-
-    public static ExecuteRpc from(final @NonNull DOMRpcIdentifier rpc, final @Nullable NormalizedNode<?, ?> input) {
-        return new ExecuteRpc(input, rpc.getType().getLastComponent());
-    }
-
-    public @Nullable NormalizedNode<?, ?> getInputNormalizedNode() {
-        return inputNormalizedNode;
+    private ExecuteRpc(final @NonNull SchemaPath type, final @Nullable NormalizedNode<?, ?> input) {
+        super(type, input);
     }
 
-    public @NonNull QName getRpc() {
-        return rpc;
-    }
-
-    private Object writeReplace() {
-        return new Proxy(this);
+    public static @NonNull ExecuteRpc from(final @NonNull DOMRpcIdentifier rpc,
+            final @Nullable NormalizedNode<?, ?> input) {
+        return new ExecuteRpc(rpc.getType(), input);
     }
 
     @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("rpc", rpc)
-                .add("normalizedNode", inputNormalizedNode)
-                .toString();
+    Object writeReplace() {
+        return new Proxy(this);
     }
 
-    private static class Proxy implements Externalizable {
+    private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
         private ExecuteRpc executeRpc;
@@ -72,25 +48,27 @@ public final class ExecuteRpc implements Serializable {
         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
         @SuppressWarnings("checkstyle:RedundantModifier")
         public Proxy() {
+
         }
 
         Proxy(final ExecuteRpc executeRpc) {
-            this.executeRpc = executeRpc;
+            this.executeRpc = requireNonNull(executeRpc);
         }
 
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
             try (NormalizedNodeDataOutput stream = NormalizedNodeInputOutput.newDataOutput(out)) {
-                stream.writeQName(executeRpc.getRpc());
-                stream.writeOptionalNormalizedNode(executeRpc.getInputNormalizedNode());
+                stream.writeQName(executeRpc.getType().getLastComponent());
+                stream.writeOptionalNormalizedNode(executeRpc.getInput());
             }
         }
 
         @Override
-        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+        public void readExternal(final ObjectInput in) throws IOException {
             final NormalizedNodeDataInput stream = NormalizedNodeInputOutput.newDataInput(in);
-            final QName qname = stream.readQName();
-            executeRpc = new ExecuteRpc(stream.readOptionalNormalizedNode().orElse(null), qname);
+            final SchemaPath type = SchemaPath.ROOT.createChild(stream.readQName());
+            final NormalizedNode<?, ?> input = stream.readOptionalNormalizedNode().orElse(null);
+            executeRpc = new ExecuteRpc(type, input);
         }
 
         private Object readResolve() {
index 6f1b44b..f141f09 100644 (file)
@@ -7,33 +7,23 @@
  */
 package org.opendaylight.controller.remote.rpc.messages;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.io.Serializable;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class RpcResponse implements Serializable {
+public class RpcResponse extends AbstractResponse<NormalizedNode<?, ?>> {
     private static final long serialVersionUID = -4211279498688989245L;
 
-    @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
-            + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
-            + "aren't serialized. FindBugs does not recognize this.")
-    private final NormalizedNode<?, ?> resultNormalizedNode;
-
-    public RpcResponse(final @Nullable NormalizedNode<?, ?> inputNormalizedNode) {
-        resultNormalizedNode = inputNormalizedNode;
-    }
-
-    public @Nullable NormalizedNode<?, ?> getResultNormalizedNode() {
-        return resultNormalizedNode;
+    public RpcResponse(final @Nullable NormalizedNode<?, ?> output) {
+        super(output);
     }
 
-    private Object writeReplace() {
+    @Override
+    Object writeReplace() {
         return new Proxy(this);
     }
 
@@ -54,7 +44,7 @@ public class RpcResponse implements Serializable {
 
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
-            SerializationUtils.writeNormalizedNode(out, rpcResponse.getResultNormalizedNode());
+            SerializationUtils.writeNormalizedNode(out, rpcResponse.getOutput());
         }
 
         @Override
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/AbstractRoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/AbstractRoutingTable.java
new file mode 100644 (file)
index 0000000..c25ee44
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData;
+
+/**
+ * Common class for routing tables.
+ *
+ * @param <T> Table type
+ * @param <I> Item type
+ */
+public abstract class AbstractRoutingTable<T extends AbstractRoutingTable<T, I>, I> implements BucketData<T>,
+        Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final @NonNull ActorRef invoker;
+    private final @NonNull ImmutableSet<I> items;
+
+    AbstractRoutingTable(final ActorRef invoker, final Collection<I> items) {
+        this.invoker = requireNonNull(invoker);
+        this.items = ImmutableSet.copyOf(items);
+    }
+
+    @Override
+    public final Optional<ActorRef> getWatchActor() {
+        return Optional.of(invoker);
+    }
+
+    public final @NonNull ImmutableSet<I> getItems() {
+        return items;
+    }
+
+    final @NonNull ActorRef getInvoker() {
+        return invoker;
+    }
+
+    @VisibleForTesting
+    public final boolean contains(final I routeId) {
+        return items.contains(routeId);
+    }
+
+    @VisibleForTesting
+    public final int size() {
+        return items.size();
+    }
+
+    abstract Object writeReplace();
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("invoker", invoker).add("items", items).toString();
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistry.java
new file mode 100644 (file)
index 0000000..73a471c
--- /dev/null
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorRef;
+import akka.actor.Address;
+import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteActionRegistryMXBeanImpl;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+
+/**
+ * Registry to look up cluster nodes that have registered for a given Action.
+ *
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
+ * cluster wide information.
+ */
+public class ActionRegistry extends BucketStoreActor<ActionRoutingTable> {
+    private final ActorRef rpcRegistrar;
+    private final RemoteActionRegistryMXBeanImpl mxBean;
+
+    public ActionRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
+                          final ActorRef rpcRegistrar) {
+        super(config, config.getRpcRegistryPersistenceId(), new ActionRoutingTable(rpcInvoker, ImmutableSet.of()));
+        this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
+        this.mxBean = new RemoteActionRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+                config.getAskDuration()), config.getAskDuration());
+    }
+
+    /**
+     * Create a new props instance for instantiating an ActionRegistry actor.
+     *
+     * @param config Provider configuration
+     * @param opsRegistrar Local RPC provider interface, used to register routers to remote nodes
+     * @param opsInvoker Actor handling RPC invocation requests from remote nodes
+     * @return A new {@link Props} instance
+     */
+    public static Props props(final RemoteOpsProviderConfig config, final ActorRef opsInvoker,
+                              final ActorRef opsRegistrar) {
+        return Props.create(ActionRegistry.class, config, opsInvoker, opsRegistrar);
+    }
+
+    @Override
+    public void postStop() {
+        super.postStop();
+        this.mxBean.unregister();
+    }
+
+    @Override
+    protected void handleCommand(final Object message) throws Exception {
+        if (message instanceof ActionRegistry.Messages.UpdateActions) {
+            LOG.debug("handling updatesActionRoutes message");
+            updatesActionRoutes((Messages.UpdateActions) message);
+        } else {
+            super.handleCommand(message);
+        }
+    }
+
+    private void updatesActionRoutes(final Messages.UpdateActions msg) {
+        LOG.debug("addedActions: {}", msg.getAddedActions());
+        LOG.debug("removedActions: {}", msg.getRemovedActions());
+        updateLocalBucket(getLocalData().updateActions(msg.getAddedActions(), msg.getRemovedActions()));
+    }
+
+    @Override
+    protected void onBucketRemoved(final Address address, final Bucket<ActionRoutingTable> bucket) {
+        rpcRegistrar.tell(new Messages.UpdateRemoteActionEndpoints(ImmutableMap.of(address, Optional.empty())),
+            ActorRef.noSender());
+    }
+
+    @Override
+    protected void onBucketsUpdated(final Map<Address, Bucket<ActionRoutingTable>> buckets) {
+        LOG.debug("Updating buckets for action registry");
+        final Map<Address, Optional<RemoteActionEndpoint>> endpoints = new HashMap<>(buckets.size());
+
+        for (Map.Entry<Address, Bucket<ActionRoutingTable>> e : buckets.entrySet()) {
+            final ActionRoutingTable table = e.getValue().getData();
+
+            final Collection<DOMActionInstance> actions = table.getItems();
+            endpoints.put(e.getKey(), actions.isEmpty() ? Optional.empty()
+                : Optional.of(new RemoteActionEndpoint(table.getInvoker(), actions)));
+        }
+
+        if (!endpoints.isEmpty()) {
+            rpcRegistrar.tell(new Messages.UpdateRemoteActionEndpoints(endpoints), ActorRef.noSender());
+        }
+    }
+
+    public static final class RemoteActionEndpoint {
+        private final Set<DOMActionInstance> actions;
+        private final ActorRef router;
+
+        @VisibleForTesting
+        public RemoteActionEndpoint(final ActorRef router, final Collection<DOMActionInstance> actions) {
+            this.router = Preconditions.checkNotNull(router);
+            this.actions = ImmutableSet.copyOf(actions);
+        }
+
+        public ActorRef getRouter() {
+            return router;
+        }
+
+        public Set<DOMActionInstance> getActions() {
+            return actions;
+        }
+    }
+
+        /**
+         * All messages used by the ActionRegistry.
+         */
+    public static class Messages {
+        abstract static class AbstractActionRouteMessage {
+            final Collection<DOMActionInstance> addedActions;
+            final Collection<DOMActionInstance> removedActions;
+
+            AbstractActionRouteMessage(final Collection<DOMActionInstance> addedActions,
+                                       final Collection<DOMActionInstance> removedActions) {
+                this.addedActions = ImmutableList.copyOf(addedActions);
+                this.removedActions = ImmutableList.copyOf(removedActions);
+            }
+
+            Collection<DOMActionInstance> getAddedActions() {
+                return this.addedActions;
+            }
+
+            Collection<DOMActionInstance> getRemovedActions() {
+                return this.removedActions;
+            }
+
+
+            @Override
+            public String toString() {
+                return "ContainsRoute{" + "addedActions=" + addedActions + " removedActions=" + removedActions + '}';
+            }
+        }
+
+
+        public static final class UpdateActions extends AbstractActionRouteMessage {
+            public UpdateActions(final Collection<DOMActionInstance> addedActions,
+                                 final Collection<DOMActionInstance> removedActions) {
+                super(addedActions, removedActions);
+            }
+
+        }
+
+        public static final class UpdateRemoteActionEndpoints {
+            private final Map<Address, Optional<RemoteActionEndpoint>> actionEndpoints;
+
+            @VisibleForTesting
+            public UpdateRemoteActionEndpoints(final Map<Address, Optional<RemoteActionEndpoint>>
+                                                                   actionEndpoints) {
+                this.actionEndpoints = ImmutableMap.copyOf(actionEndpoints);
+            }
+
+            public Map<Address, Optional<RemoteActionEndpoint>> getActionEndpoints() {
+                return actionEndpoints;
+            }
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRoutingTable.java
new file mode 100644 (file)
index 0000000..3c96859
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ActionRoutingTable extends AbstractRoutingTable<ActionRoutingTable, DOMActionInstance> {
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+        private static final Logger LOG = LoggerFactory.getLogger(ActionRoutingTable.class);
+
+        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
+        private Collection<DOMActionInstance> actions;
+        private ActorRef opsInvoker;
+
+        // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+        // be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final ActionRoutingTable table) {
+            actions = table.getItems();
+            opsInvoker = table.getInvoker();
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            LOG.debug("serializing ActionRoutingTable.");
+            out.writeObject(Serialization.serializedActorPath(opsInvoker));
+
+            final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out);
+            nnout.writeInt(actions.size());
+            for (DOMActionInstance id : actions) {
+                nnout.writeSchemaPath(id.getType());
+                YangInstanceIdentifier actionPath = YangInstanceIdentifier.create(
+                        new YangInstanceIdentifier.NodeIdentifier(id.getType().getLastComponent()));
+                nnout.writeYangInstanceIdentifier(actionPath);
+            }
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            LOG.debug("deserializing ActionRoutingTable");
+            opsInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
+
+            final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in);
+            final int size = nnin.readInt();
+            actions = new ArrayList<>(size);
+            for (int i = 0; i < size; ++i) {
+                actions.add(DOMActionInstance.of(nnin.readSchemaPath(), LogicalDatastoreType.OPERATIONAL,
+                        nnin.readYangInstanceIdentifier()));
+            }
+        }
+
+        private Object readResolve() {
+            return new ActionRoutingTable(opsInvoker, actions);
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(ActionRoutingTable.class);
+
+    ActionRoutingTable(final ActorRef invoker, final Collection<DOMActionInstance> actions) {
+        super(invoker, actions);
+    }
+
+    ActionRoutingTable updateActions(final Collection<DOMActionInstance> toAdd,
+                                     final Collection<DOMActionInstance> toRemove) {
+        LOG.debug("Updating actions in ActionRoutingTable");
+        final Set<DOMActionInstance> newActions = new HashSet<>(getItems());
+        newActions.addAll(toAdd);
+        newActions.removeAll(toRemove);
+        return new ActionRoutingTable(getInvoker(), newActions);
+    }
+
+    @Override
+    Object writeReplace() {
+        return new Proxy(this);
+    }
+}
index 5159b96..8d67b3b 100644 (file)
@@ -10,33 +10,27 @@ package org.opendaylight.controller.remote.rpc.registry;
 import akka.actor.ActorRef;
 import akka.serialization.JavaSerializer;
 import akka.serialization.Serialization;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Optional;
 import java.util.Set;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData;
 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
 
-public final class RoutingTable implements BucketData<RoutingTable>, Serializable {
+public final class RoutingTable extends AbstractRoutingTable<RoutingTable, DOMRpcIdentifier> {
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
         @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
         private Collection<DOMRpcIdentifier> rpcs;
-        private ActorRef rpcInvoker;
+        private ActorRef opsInvoker;
 
         // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
         // be able to create instances via reflection.
@@ -46,13 +40,13 @@ public final class RoutingTable implements BucketData<RoutingTable>, Serializabl
         }
 
         Proxy(final RoutingTable table) {
-            rpcs = table.getRoutes();
-            rpcInvoker = table.getRpcInvoker();
+            rpcs = table.getItems();
+            opsInvoker = table.getInvoker();
         }
 
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
-            out.writeObject(Serialization.serializedActorPath(rpcInvoker));
+            out.writeObject(Serialization.serializedActorPath(opsInvoker));
 
             final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out);
             nnout.writeInt(rpcs.size());
@@ -64,7 +58,7 @@ public final class RoutingTable implements BucketData<RoutingTable>, Serializabl
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
-            rpcInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
+            opsInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
 
             final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in);
             final int size = nnin.readInt();
@@ -75,62 +69,30 @@ public final class RoutingTable implements BucketData<RoutingTable>, Serializabl
         }
 
         private Object readResolve() {
-            return new RoutingTable(rpcInvoker, rpcs);
+            return new RoutingTable(opsInvoker, rpcs);
         }
     }
 
     private static final long serialVersionUID = 1L;
 
-    @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
-    private final Set<DOMRpcIdentifier> rpcs;
-    private final ActorRef rpcInvoker;
-
-    RoutingTable(final ActorRef rpcInvoker, final Collection<DOMRpcIdentifier> table) {
-        this.rpcInvoker = Preconditions.checkNotNull(rpcInvoker);
-        this.rpcs = ImmutableSet.copyOf(table);
-    }
-
-    @Override
-    public Optional<ActorRef> getWatchActor() {
-        return Optional.of(rpcInvoker);
-    }
-
-    public Set<DOMRpcIdentifier> getRoutes() {
-        return rpcs;
-    }
-
-    ActorRef getRpcInvoker() {
-        return rpcInvoker;
+    RoutingTable(final ActorRef invoker, final Collection<DOMRpcIdentifier> table) {
+        super(invoker, table);
     }
 
     RoutingTable addRpcs(final Collection<DOMRpcIdentifier> toAdd) {
-        final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(rpcs);
+        final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(getItems());
         newRpcs.addAll(toAdd);
-        return new RoutingTable(rpcInvoker, newRpcs);
+        return new RoutingTable(getInvoker(), newRpcs);
     }
 
     RoutingTable removeRpcs(final Collection<DOMRpcIdentifier> toRemove) {
-        final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(rpcs);
+        final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(getItems());
         newRpcs.removeAll(toRemove);
-        return new RoutingTable(rpcInvoker, newRpcs);
-    }
-
-    private Object writeReplace() {
-        return new Proxy(this);
-    }
-
-    @VisibleForTesting
-    boolean contains(final DOMRpcIdentifier routeId) {
-        return rpcs.contains(routeId);
-    }
-
-    @VisibleForTesting
-    int size() {
-        return rpcs.size();
+        return new RoutingTable(getInvoker(), newRpcs);
     }
 
     @Override
-    public String toString() {
-        return "RoutingTable{" + "rpcs=" + rpcs + ", rpcInvoker=" + rpcInvoker + '}';
+    Object writeReplace() {
+        return new Proxy(this);
     }
 }
index 5ba97a3..68fead4 100644 (file)
@@ -22,10 +22,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
@@ -43,7 +42,7 @@ public class RpcRegistry extends BucketStoreActor<RoutingTable> {
     private final ActorRef rpcRegistrar;
     private final RemoteRpcRegistryMXBeanImpl mxBean;
 
-    public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+    public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
         super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
         this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
@@ -58,8 +57,8 @@ public class RpcRegistry extends BucketStoreActor<RoutingTable> {
      * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
      * @return A new {@link Props} instance
      */
-    public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
-            final ActorRef rpcRegistrar) {
+    public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
+                              final ActorRef rpcRegistrar) {
         return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
     }
 
@@ -97,7 +96,8 @@ public class RpcRegistry extends BucketStoreActor<RoutingTable> {
 
     @Override
     protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
-        rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
+        rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
+                ActorRef.noSender());
     }
 
     @Override
@@ -107,13 +107,13 @@ public class RpcRegistry extends BucketStoreActor<RoutingTable> {
         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
             final RoutingTable table = e.getValue().getData();
 
-            final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
+            final Collection<DOMRpcIdentifier> rpcs = table.getItems();
             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
-                    : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
+                    : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
         }
 
         if (!endpoints.isEmpty()) {
-            rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+            rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
         }
     }
 
@@ -141,46 +141,48 @@ public class RpcRegistry extends BucketStoreActor<RoutingTable> {
      */
     public static class Messages {
         abstract static class AbstractRouteMessage {
-            final List<DOMRpcIdentifier> routeIdentifiers;
+            final List<DOMRpcIdentifier> rpcRouteIdentifiers;
 
-            AbstractRouteMessage(final Collection<DOMRpcIdentifier> routeIdentifiers) {
-                Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+            AbstractRouteMessage(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                Preconditions.checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
                         "Route Identifiers must be supplied");
-                this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers);
+                this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
             }
 
             List<DOMRpcIdentifier> getRouteIdentifiers() {
-                return this.routeIdentifiers;
+                return this.rpcRouteIdentifiers;
             }
 
             @Override
             public String toString() {
-                return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
+                return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
             }
         }
 
-        public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
-            public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
-                super(routeIdentifiers);
+        public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
+            public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                super(rpcRouteIdentifiers);
             }
+
         }
 
         public static final class RemoveRoutes extends AbstractRouteMessage {
-            public RemoveRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
-                super(routeIdentifiers);
+            public RemoveRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                super(rpcRouteIdentifiers);
             }
         }
 
         public static final class UpdateRemoteEndpoints {
-            private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
+            private final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints;
+
 
             @VisibleForTesting
-            public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
-                this.endpoints = ImmutableMap.copyOf(endpoints);
+            public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
+                this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
             }
 
-            public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
-                return endpoints;
+            public Map<Address, Optional<RemoteRpcEndpoint>> getRpcEndpoints() {
+                return rpcEndpoints;
             }
         }
     }
index 9a84b91..2b07d1b 100644 (file)
@@ -37,7 +37,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -72,7 +72,7 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
      */
     private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
 
-    private final RemoteRpcProviderConfig config;
+    private final RemoteOpsProviderConfig config;
     private final String persistenceId;
 
     /**
@@ -88,7 +88,7 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
     private Integer incarnation;
     private boolean persisting;
 
-    protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
+    protected BucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) {
         this.config = Preconditions.checkNotNull(config);
         this.initialData = Preconditions.checkNotNull(initialData);
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
@@ -212,7 +212,7 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
         }
     }
 
-    protected final RemoteRpcProviderConfig getConfig() {
+    protected final RemoteOpsProviderConfig getConfig() {
         return config;
     }
 
index 56060dd..e75bef2 100644 (file)
@@ -31,7 +31,7 @@ import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -61,7 +61,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     };
 
     private final boolean autoStartGossipTicks;
-    private final RemoteRpcProviderConfig config;
+    private final RemoteOpsProviderConfig config;
 
     /**
      * All known cluster members.
@@ -84,20 +84,20 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
     private BucketStoreAccess bucketStore;
 
-    Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+    Gossiper(final RemoteOpsProviderConfig config, final Boolean autoStartGossipTicks) {
         this.config = Preconditions.checkNotNull(config);
         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
     }
 
-    Gossiper(final RemoteRpcProviderConfig config) {
+    Gossiper(final RemoteOpsProviderConfig config) {
         this(config, Boolean.TRUE);
     }
 
-    public static Props props(final RemoteRpcProviderConfig config) {
+    public static Props props(final RemoteOpsProviderConfig config) {
         return Props.create(Gossiper.class, config);
     }
 
-    static Props testProps(final RemoteRpcProviderConfig config) {
+    static Props testProps(final RemoteOpsProviderConfig config) {
         return Props.create(Gossiper.class, config, Boolean.FALSE);
     }
 
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/AbstractRegistryMXBean.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/AbstractRegistryMXBean.java
new file mode 100644 (file)
index 0000000..bfe60e5
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.Address;
+import akka.util.Timeout;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Map;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.remote.rpc.registry.AbstractRoutingTable;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+abstract class AbstractRegistryMXBean<T extends AbstractRoutingTable<T, I>, I> extends AbstractMXBean {
+    static final String LOCAL_CONSTANT = "local";
+    static final String ROUTE_CONSTANT = "route:";
+    static final String NAME_CONSTANT = " | name:";
+
+    @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE")
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final BucketStoreAccess bucketAccess;
+    private final FiniteDuration timeout;
+
+    AbstractRegistryMXBean(final @NonNull String beanName, final @NonNull String beanType,
+            final @NonNull BucketStoreAccess bucketAccess, final @NonNull Timeout timeout) {
+        super(beanName, beanType, null);
+        this.bucketAccess = requireNonNull(bucketAccess);
+        this.timeout = timeout.duration();
+        registerMBean();
+    }
+
+    @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+    final T localData() {
+        try {
+            return (T) Await.result((Future) bucketAccess.getLocalData(), timeout);
+        } catch (Exception e) {
+            throw new RuntimeException("getLocalData failed", e);
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+    final Map<Address, Bucket<T>> remoteBuckets() {
+        try {
+            return (Map<Address, Bucket<T>>) Await.result((Future)bucketAccess.getRemoteBuckets(), timeout);
+        } catch (Exception e) {
+            throw new RuntimeException("getRemoteBuckets failed", e);
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+    final String bucketVersions() {
+        try {
+            return Await.result((Future)bucketAccess.getBucketVersions(), timeout).toString();
+        } catch (Exception e) {
+            throw new RuntimeException("getVersions failed", e);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBean.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBean.java
new file mode 100644 (file)
index 0000000..5826384
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface RemoteActionRegistryMXBean {
+
+    String getBucketVersions();
+
+    Set<String> getLocalRegisteredAction();
+
+    Map<String, String> findActionByName(String name);
+
+    Map<String, String> findActionByRoute(String route);
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImpl.java
new file mode 100644 (file)
index 0000000..cc117bd
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import akka.actor.Address;
+import akka.util.Timeout;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.remote.rpc.registry.ActionRoutingTable;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+
+public class RemoteActionRegistryMXBeanImpl extends AbstractRegistryMXBean<ActionRoutingTable, DOMActionInstance>
+        implements RemoteActionRegistryMXBean {
+
+    public RemoteActionRegistryMXBeanImpl(final BucketStoreAccess actionRegistryAccess, final Timeout timeout) {
+        super("RemoteActionRegistry", "RemoteActionBroker", actionRegistryAccess, timeout);
+    }
+
+    @Override
+    public Set<String> getLocalRegisteredAction() {
+        ActionRoutingTable table = localData();
+        Set<String> routedAction = new HashSet<>(table.getItems().size());
+        for (DOMActionInstance route : table.getItems()) {
+            if (route.getType().getLastComponent() != null) {
+                final YangInstanceIdentifier actionPath = YangInstanceIdentifier.create(
+                        new NodeIdentifier(route.getType().getLastComponent()));
+                if (!actionPath.isEmpty()) {
+                    routedAction.add(ROUTE_CONSTANT + actionPath + NAME_CONSTANT + route.getType());
+                }
+            }
+        }
+
+        log.debug("Locally registered routed RPCs {}", routedAction);
+        return routedAction;
+    }
+
+    @Override
+    public Map<String, String> findActionByName(final String name) {
+        ActionRoutingTable localTable = localData();
+        // Get all Actions from local bucket
+        Map<String, String> rpcMap = new HashMap<>(getActionMemberMapByName(localTable, name, LOCAL_CONSTANT));
+
+        // Get all Actions from remote bucket
+        Map<Address, Bucket<ActionRoutingTable>> buckets = remoteBuckets();
+        for (Map.Entry<Address, Bucket<ActionRoutingTable>> entry : buckets.entrySet()) {
+            ActionRoutingTable table = entry.getValue().getData();
+            rpcMap.putAll(getActionMemberMapByName(table, name, entry.getKey().toString()));
+        }
+
+        log.debug("list of Actions {} searched by name {}", rpcMap, name);
+        return rpcMap;
+    }
+
+    @Override
+    public Map<String, String> findActionByRoute(final String routeId) {
+        ActionRoutingTable localTable = localData();
+        Map<String, String> rpcMap = new HashMap<>(getActionMemberMapByAction(localTable, routeId, LOCAL_CONSTANT));
+
+        Map<Address, Bucket<ActionRoutingTable>> buckets = remoteBuckets();
+        for (Map.Entry<Address, Bucket<ActionRoutingTable>> entry : buckets.entrySet()) {
+            ActionRoutingTable table = entry.getValue().getData();
+            rpcMap.putAll(getActionMemberMapByAction(table, routeId, entry.getKey().toString()));
+        }
+
+        log.debug("list of Actions {} searched by route {}", rpcMap, routeId);
+        return rpcMap;
+    }
+
+    /**
+     * Search if the routing table route String contains routeName.
+     */
+    private static Map<String, String> getActionMemberMapByAction(final ActionRoutingTable table,
+                                                                  final String routeName, final String address) {
+        Collection<DOMActionInstance> routes = table.getItems();
+        Map<String, String> actionMap = new HashMap<>(routes.size());
+        for (DOMActionInstance route : routes) {
+            if (route.getType().getLastComponent() != null) {
+                final YangInstanceIdentifier actionPath = YangInstanceIdentifier.create(
+                        new NodeIdentifier(route.getType().getLastComponent()));
+                if (!actionPath.isEmpty()) {
+                    String routeString = actionPath.toString();
+                    if (routeString.contains(routeName)) {
+                        actionMap.put(ROUTE_CONSTANT + routeString + NAME_CONSTANT + route.getType(), address);
+                    }
+                }
+            }
+        }
+        return actionMap;
+    }
+
+    /**
+     * Search if the routing table route type contains name.
+     */
+    private static Map<String, String> getActionMemberMapByName(final ActionRoutingTable table, final String name,
+                                                                final String address) {
+        Collection<DOMActionInstance> routes = table.getItems();
+        Map<String, String> actionMap = new HashMap<>(routes.size());
+        for (DOMActionInstance route : routes) {
+            if (route.getType().getLastComponent() != null) {
+                final YangInstanceIdentifier actionPath = YangInstanceIdentifier.create(
+                        new NodeIdentifier(route.getType().getLastComponent()));
+                if (!actionPath.isEmpty()) {
+                    String type = route.getType().toString();
+                    if (type.contains(name)) {
+                        actionMap.put(ROUTE_CONSTANT + actionPath + NAME_CONSTANT + type, address);
+                    }
+                }
+            }
+        }
+        return actionMap;
+    }
+
+    @Override
+    public String getBucketVersions() {
+        return bucketVersions();
+    }
+}
index 47903bb..dc4ee8f 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.remote.rpc.registry.mbeans;
 
 import java.util.Map;
@@ -22,7 +21,7 @@ public interface RemoteRpcRegistryMXBean {
 
     Set<String> getLocalRegisteredRoutedRpc();
 
-    Map<String,String> findRpcByName(String name);
+    Map<String, String> findRpcByName(String name);
 
-    Map<String,String> findRpcByRoute(String route);
+    Map<String, String> findRpcByRoute(String route);
 }
index 87adef3..d5c72fc 100644 (file)
@@ -9,67 +9,27 @@ package org.opendaylight.controller.remote.rpc.registry.mbeans;
 
 import akka.actor.Address;
 import akka.util.Timeout;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-
-public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean {
-
-    @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE")
-    protected final Logger log = LoggerFactory.getLogger(getClass());
-
-    private static final String LOCAL_CONSTANT = "local";
-
-    private static final String ROUTE_CONSTANT = "route:";
-
-    private static final String NAME_CONSTANT = " | name:";
-
-    private final BucketStoreAccess rpcRegistryAccess;
-    private final Timeout timeout;
 
+public class RemoteRpcRegistryMXBeanImpl extends AbstractRegistryMXBean<RoutingTable, DOMRpcIdentifier>
+        implements RemoteRpcRegistryMXBean {
     public RemoteRpcRegistryMXBeanImpl(final BucketStoreAccess rpcRegistryAccess, final Timeout timeout) {
-        super("RemoteRpcRegistry", "RemoteRpcBroker", null);
-        this.rpcRegistryAccess = rpcRegistryAccess;
-        this.timeout = timeout;
-        registerMBean();
-    }
-
-    @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
-    private RoutingTable getLocalData() {
-        try {
-            return (RoutingTable) Await.result((Future) rpcRegistryAccess.getLocalData(), timeout.duration());
-        } catch (Exception e) {
-            throw new RuntimeException("getLocalData failed", e);
-        }
-    }
-
-    @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
-    private Map<Address, Bucket<RoutingTable>> getRemoteBuckets() {
-        try {
-            return (Map<Address, Bucket<RoutingTable>>) Await.result((Future)rpcRegistryAccess.getRemoteBuckets(),
-                    timeout.duration());
-        } catch (Exception e) {
-            throw new RuntimeException("getRemoteBuckets failed", e);
-        }
+        super("RemoteRpcRegistry", "RemoteRpcBroker", rpcRegistryAccess, timeout);
     }
 
     @Override
     public Set<String> getGlobalRpc() {
-        RoutingTable table = getLocalData();
-        Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
-        for (DOMRpcIdentifier route : table.getRoutes()) {
+        RoutingTable table = localData();
+        Set<String> globalRpc = new HashSet<>(table.getItems().size());
+        for (DOMRpcIdentifier route : table.getItems()) {
             if (route.getContextReference().isEmpty()) {
                 globalRpc.add(route.getType().toString());
             }
@@ -81,9 +41,9 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Set<String> getLocalRegisteredRoutedRpc() {
-        RoutingTable table = getLocalData();
-        Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
-        for (DOMRpcIdentifier route : table.getRoutes()) {
+        RoutingTable table = localData();
+        Set<String> routedRpc = new HashSet<>(table.getItems().size());
+        for (DOMRpcIdentifier route : table.getItems()) {
             if (!route.getContextReference().isEmpty()) {
                 routedRpc.add(ROUTE_CONSTANT + route.getContextReference() + NAME_CONSTANT + route.getType());
             }
@@ -95,12 +55,12 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Map<String, String> findRpcByName(final String name) {
-        RoutingTable localTable = getLocalData();
+        RoutingTable localTable = localData();
         // Get all RPCs from local bucket
         Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
 
         // Get all RPCs from remote bucket
-        Map<Address, Bucket<RoutingTable>> buckets = getRemoteBuckets();
+        Map<Address, Bucket<RoutingTable>> buckets = remoteBuckets();
         for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
             RoutingTable table = entry.getValue().getData();
             rpcMap.putAll(getRpcMemberMapByName(table, name, entry.getKey().toString()));
@@ -112,10 +72,10 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Map<String, String> findRpcByRoute(final String routeId) {
-        RoutingTable localTable = getLocalData();
+        RoutingTable localTable = localData();
         Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
 
-        Map<Address, Bucket<RoutingTable>> buckets = getRemoteBuckets();
+        Map<Address, Bucket<RoutingTable>> buckets = remoteBuckets();
         for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
             RoutingTable table = entry.getValue().getData();
             rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, entry.getKey().toString()));
@@ -129,8 +89,8 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
      * Search if the routing table route String contains routeName.
      */
     private static Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
-                                                      final String address) {
-        Set<DOMRpcIdentifier> routes = table.getRoutes();
+                                                             final String address) {
+        Set<DOMRpcIdentifier> routes = table.getItems();
         Map<String, String> rpcMap = new HashMap<>(routes.size());
         for (DOMRpcIdentifier route : routes) {
             if (!route.getContextReference().isEmpty()) {
@@ -146,9 +106,9 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     /**
      * Search if the routing table route type contains name.
      */
-    private static Map<String, String>  getRpcMemberMapByName(final RoutingTable table, final String name,
-                                                       final String address) {
-        Set<DOMRpcIdentifier> routes = table.getRoutes();
+    private static Map<String, String> getRpcMemberMapByName(final RoutingTable table, final String name,
+                                                             final String address) {
+        Set<DOMRpcIdentifier> routes = table.getItems();
         Map<String, String> rpcMap = new HashMap<>(routes.size());
         for (DOMRpcIdentifier route : routes) {
             if (!route.getContextReference().isEmpty()) {
@@ -162,12 +122,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     }
 
     @Override
-    @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
     public String getBucketVersions() {
-        try {
-            return Await.result((Future)rpcRegistryAccess.getBucketVersions(), timeout.duration()).toString();
-        } catch (Exception e) {
-            throw new RuntimeException("getVersions failed", e);
-        }
+        return bucketVersions();
     }
 }
index e6d19a1..c33659a 100644 (file)
   <reference id="actorSystemProvider" interface="org.opendaylight.controller.cluster.ActorSystemProvider" />
   <reference id="domRpcService" interface="org.opendaylight.mdsal.dom.api.DOMRpcService"/>
   <reference id="domRpcRegistry" interface="org.opendaylight.mdsal.dom.api.DOMRpcProviderService"/>
+  <reference id="domActionService" interface="org.opendaylight.mdsal.dom.api.DOMActionService"/>
+  <reference id="domActionRegistry" interface="org.opendaylight.mdsal.dom.api.DOMActionProviderService"/>
 
   <bean id="actorSystem" factory-ref="actorSystemProvider" factory-method="getActorSystem"/>
 
-  <bean id="remoteRpcProviderConfig" class="org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig"
+  <bean id="remoteOpsProviderConfig" class="org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig"
           factory-method="newInstance">
     <argument>
       <bean factory-ref="actorSystem" factory-method="name"/>
     <argument value="${bounded-mailbox-capacity}"/>
   </bean>
 
-  <bean id="remoteRpcProvider" class="org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory"
+  <bean id="remoteOpsProvider" class="org.opendaylight.controller.remote.rpc.RemoteOpsProviderFactory"
           factory-method="createInstance" init-method="start" destroy-method="close">
     <argument ref="domRpcRegistry"/>
     <argument ref="domRpcService"/>
     <argument ref="actorSystem"/>
-    <argument ref="remoteRpcProviderConfig"/>
+    <argument ref="remoteOpsProviderConfig"/>
+    <argument ref="domActionRegistry"/>
+    <argument ref="domActionService"/>
   </bean>
 
 </blueprint>
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.remote.rpc;
 
 import static org.junit.Assert.assertEquals;
@@ -23,6 +22,9 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.mdsal.dom.api.DOMRpcService;
@@ -45,26 +47,27 @@ import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
  *
  * @author Thomas Pantelis
  */
-public class AbstractRpcTest {
+public class AbstractOpsTest {
     static final String TEST_REV = "2014-08-28";
     static final String TEST_NS = "urn:test";
     static final URI TEST_URI = URI.create(TEST_NS);
-    static final QName TEST_RPC = QName.create(TEST_NS, TEST_REV, "test-rpc");
+    static final QName TEST_RPC = QName.create(TEST_NS, TEST_REV, "test-something");
     static final QName TEST_RPC_INPUT = QName.create(TEST_NS, TEST_REV, "input");
     static final QName TEST_RPC_INPUT_DATA = QName.create(TEST_NS, TEST_REV, "input-data");
     static final QName TEST_RPC_OUTPUT = QName.create(TEST_NS, TEST_REV, "output");
-    static final QName TEST_RPC_OUTPUT_DATA = QName.create(TEST_URI, "output-data");
 
 
     static final SchemaPath TEST_RPC_TYPE = SchemaPath.create(true, TEST_RPC);
     static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create(
             new YangInstanceIdentifier.NodeIdentifier(TEST_RPC));
     public static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH);
+    public static final DOMDataTreeIdentifier TEST_DATA_TREE_ID = new DOMDataTreeIdentifier(
+            LogicalDatastoreType.OPERATIONAL, TEST_PATH);
 
     static ActorSystem node1;
     static ActorSystem node2;
-    static RemoteRpcProviderConfig config1;
-    static RemoteRpcProviderConfig config2;
+    static RemoteOpsProviderConfig config1;
+    static RemoteOpsProviderConfig config2;
 
     protected ActorRef rpcInvoker1;
     protected TestKit rpcRegistry1Probe;
@@ -73,16 +76,22 @@ public class AbstractRpcTest {
     protected SchemaContext schemaContext;
     protected RemoteRpcImplementation remoteRpcImpl1;
     protected RemoteRpcImplementation remoteRpcImpl2;
+    protected RemoteActionImplementation remoteActionImpl1;
+    protected RemoteActionImplementation remoteActionImpl2;
 
     @Mock
     protected DOMRpcService domRpcService1;
     @Mock
+    protected DOMActionService domActionService1;
+    @Mock
     protected DOMRpcService domRpcService2;
+    @Mock
+    protected DOMActionService domActionService2;
 
     @BeforeClass
     public static void setup() {
-        config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
-        config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+        config1 = new RemoteOpsProviderConfig.Builder("memberA").build();
+        config2 = new RemoteOpsProviderConfig.Builder("memberB").build();
         node1 = ActorSystem.create("opendaylight-rpc", config1.get());
         node2 = ActorSystem.create("opendaylight-rpc", config2.get());
     }
@@ -97,21 +106,23 @@ public class AbstractRpcTest {
 
     @Before
     public void setUp() {
-        schemaContext = YangParserTestUtils.parseYangResources(AbstractRpcTest.class, "/test-rpc.yang");
+        schemaContext = YangParserTestUtils.parseYangResources(AbstractOpsTest.class, "/test-rpc.yang");
 
         MockitoAnnotations.initMocks(this);
 
         rpcRegistry1Probe = new TestKit(node1);
-        rpcInvoker1 = node1.actorOf(RpcInvoker.props(domRpcService1));
+        rpcInvoker1 = node1.actorOf(OpsInvoker.props(domRpcService1, domActionService1));
         rpcRegistry2Probe = new TestKit(node2);
-        rpcInvoker2 = node2.actorOf(RpcInvoker.props(domRpcService2));
+        rpcInvoker2 = node2.actorOf(OpsInvoker.props(domRpcService2, domActionService2));
         remoteRpcImpl1 = new RemoteRpcImplementation(rpcInvoker2, config1);
         remoteRpcImpl2 = new RemoteRpcImplementation(rpcInvoker1, config2);
+        remoteActionImpl1 = new RemoteActionImplementation(rpcInvoker2, config1);
+        remoteActionImpl2 = new RemoteActionImplementation(rpcInvoker1, config2);
     }
 
     static void assertRpcErrorEquals(final RpcError rpcError, final ErrorSeverity severity,
-            final ErrorType errorType, final String tag, final String message, final String applicationTag,
-            final String info, final String causeMsg) {
+                                     final ErrorType errorType, final String tag, final String message,
+                                     final String applicationTag, final String info, final String causeMsg) {
         assertEquals("getSeverity", severity, rpcError.getSeverity());
         assertEquals("getErrorType", errorType, rpcError.getErrorType());
         assertEquals("getTag", tag, rpcError.getTag());
@@ -132,7 +143,7 @@ public class AbstractRpcTest {
 
     public static ContainerNode makeRPCInput(final String data) {
         return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_INPUT))
-            .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build();
+                .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build();
 
     }
 
@@ -142,8 +153,8 @@ public class AbstractRpcTest {
     }
 
     static void assertFailedRpcResult(final DOMRpcResult rpcResult, final ErrorSeverity severity,
-            final ErrorType errorType, final String tag, final String message, final String applicationTag,
-            final String info, final String causeMsg) {
+                                      final ErrorType errorType, final String tag, final String message,
+                                      final String applicationTag, final String info, final String causeMsg) {
         assertNotNull("RpcResult was null", rpcResult);
         final Collection<? extends RpcError> rpcErrors = rpcResult.getErrors();
         assertEquals("RpcErrors count", 1, rpcErrors.size());
@@ -152,7 +163,7 @@ public class AbstractRpcTest {
     }
 
     static void assertSuccessfulRpcResult(final DOMRpcResult rpcResult,
-            final NormalizedNode<? , ?> expOutput) {
+                                          final NormalizedNode<? , ?> expOutput) {
         assertNotNull("RpcResult was null", rpcResult);
         assertCompositeNodeEquals(expOutput, rpcResult.getResult());
     }
@@ -25,7 +25,7 @@ import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 
-public class RpcBrokerTest extends AbstractRpcTest {
+public class OpsBrokerTest extends AbstractOpsTest {
 
     @Test
     public void testExecuteRpc() {
@@ -33,14 +33,13 @@ public class RpcBrokerTest extends AbstractRpcTest {
         final DOMRpcResult rpcResult = new DefaultDOMRpcResult(invokeRpcResult);
         when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), any())).thenReturn(
             FluentFutures.immediateFluentFuture(rpcResult));
+        final ExecuteRpc executeRpc = ExecuteRpc.from(TEST_RPC_ID, null);
 
-        final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
-
-        rpcInvoker1.tell(executeMsg, rpcRegistry1Probe.getRef());
+        rpcInvoker1.tell(executeRpc, rpcRegistry1Probe.getRef());
 
         final RpcResponse rpcResponse = rpcRegistry1Probe.expectMsgClass(Duration.ofSeconds(5), RpcResponse.class);
 
-        assertEquals(rpcResult.getResult(), rpcResponse.getResultNormalizedNode());
+        assertEquals(rpcResult.getResult(), rpcResponse.getOutput());
     }
 
     @Test
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.remote.rpc;
 
 import akka.actor.ActorRef;
@@ -16,19 +15,24 @@ import java.util.Collections;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
-public class RpcListenerTest {
+public class OpsListenerTest {
 
     private static final QName TEST_QNAME = QName.create("test", "2015-06-12", "test");
     private static final SchemaPath RPC_TYPE = SchemaPath.create(true, TEST_QNAME);
     private static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier
             .create(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME));
     private static final DOMRpcIdentifier RPC_ID = DOMRpcIdentifier.create(RPC_TYPE, TEST_PATH);
+    private static final DOMActionInstance ACTION_INSTANCE = DOMActionInstance.of(RPC_TYPE,
+            LogicalDatastoreType.OPERATIONAL, TEST_PATH);
 
     private static ActorSystem SYSTEM;
 
@@ -49,19 +53,40 @@ public class RpcListenerTest {
         final TestKit probeReg = new TestKit(SYSTEM);
         final ActorRef rpcRegistry = probeReg.getRef();
 
-        final RpcListener rpcListener = new RpcListener(rpcRegistry);
-        rpcListener.onRpcAvailable(Collections.singleton(RPC_ID));
+        final OpsListener opsListener = new OpsListener(rpcRegistry, rpcRegistry);
+        opsListener.onRpcAvailable(Collections.singleton(RPC_ID));
         probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class);
     }
 
+    @Test
+    public void testActionRouteAdd() {
+        // Test announcements
+        final TestKit probeReg = new TestKit(SYSTEM);
+        final ActorRef actionRegistry = probeReg.getRef();
+
+        final OpsListener opsListener = new OpsListener(actionRegistry, actionRegistry);
+        opsListener.onActionsChanged(Collections.emptySet(),Collections.singleton(ACTION_INSTANCE));
+        probeReg.expectMsgClass(ActionRegistry.Messages.UpdateActions.class);
+    }
+
     @Test
     public void testRouteRemove() {
         // Test announcements
         final TestKit probeReg = new TestKit(SYSTEM);
         final ActorRef rpcRegistry = probeReg.getRef();
 
-        final RpcListener rpcListener = new RpcListener(rpcRegistry);
-        rpcListener.onRpcUnavailable(Collections.singleton(RPC_ID));
+        final OpsListener opsListener = new OpsListener(rpcRegistry, rpcRegistry);
+        opsListener.onRpcUnavailable(Collections.singleton(RPC_ID));
         probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class);
     }
+
+//    @Test
+//    public void testAcceptsImplementation() {
+//
+//        final TestKit probeReg = new TestKit(SYSTEM);
+//        final ActorRef opsRegistry = probeReg.getRef();
+//
+//        final OpsListener opsListener = new OpsListener(opsRegistry, opsRegistry);
+//        opsListener.acceptsImplementation()
+//    }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsRegistrarTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsRegistrarTest.java
new file mode 100644 (file)
index 0000000..59db730
--- /dev/null
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class OpsRegistrarTest {
+    @Mock
+    private DOMRpcProviderService rpcService;
+    @Mock
+    private DOMActionProviderService actionService;
+    @Mock
+    private DOMRpcImplementationRegistration<RemoteRpcImplementation> oldReg;
+    @Mock
+    private DOMRpcImplementationRegistration<RemoteRpcImplementation> newReg;
+    @Mock
+    private ObjectRegistration<RemoteActionImplementation> oldActionReg;
+    @Mock
+    private ObjectRegistration<RemoteActionImplementation> newActionReg;
+
+    private ActorSystem system;
+    private TestActorRef<OpsRegistrar> testActorRef;
+    private Address endpointAddress;
+    private RemoteRpcEndpoint firstEndpoint;
+    private RemoteRpcEndpoint secondEndpoint;
+    private RemoteActionEndpoint firstActionEndpoint;
+    private RemoteActionEndpoint secondActionEndpoint;
+    private OpsRegistrar opsRegistrar;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        system = ActorSystem.create("test");
+
+        final TestKit testKit = new TestKit(system);
+        final RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("system").build();
+        final Props props = OpsRegistrar.props(config, rpcService, actionService);
+        testActorRef = new TestActorRef<>(system, props, testKit.getRef(), "actorRef");
+        endpointAddress = new Address("http", "local");
+
+        final DOMRpcIdentifier firstEndpointId = DOMRpcIdentifier.create(
+                SchemaPath.create(true, QName.create("first:identifier", "foo")));
+        final DOMRpcIdentifier secondEndpointId = DOMRpcIdentifier.create(
+                SchemaPath.create(true, QName.create("second:identifier", "bar")));
+        final QName firstActionQName = QName.create("first:actionIdentifier", "fooAction");
+
+        final DOMActionInstance firstActionInstance = DOMActionInstance.of(
+                SchemaPath.create(true, firstActionQName), LogicalDatastoreType.OPERATIONAL,
+                YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(firstActionQName)));
+
+        final DOMActionInstance secondActionInstance = DOMActionInstance.of(
+                SchemaPath.create(true, firstActionQName), LogicalDatastoreType.OPERATIONAL,
+                YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(firstActionQName)));
+
+        final TestKit senderKit = new TestKit(system);
+        firstEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(firstEndpointId));
+        secondEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(secondEndpointId));
+        firstActionEndpoint = new RemoteActionEndpoint(senderKit.getRef(),
+                Collections.singletonList(firstActionInstance));
+        secondActionEndpoint = new RemoteActionEndpoint(senderKit.getRef(),
+                Collections.singletonList(secondActionInstance));
+
+        doReturn(oldReg).when(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
+            eq(firstEndpoint.getRpcs()));
+        doReturn(newReg).when(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
+            eq(secondEndpoint.getRpcs()));
+
+        doReturn(oldActionReg).when(actionService).registerActionImplementation(any(RemoteActionImplementation.class),
+            eq(secondActionEndpoint.getActions()));
+        doReturn(oldActionReg).when(actionService).registerActionImplementation(any(RemoteActionImplementation.class),
+                eq(secondActionEndpoint.getActions()));
+
+        opsRegistrar = testActorRef.underlyingActor();
+    }
+
+    @After
+    public void tearDown() {
+        TestKit.shutdownActorSystem(system, true);
+    }
+
+    @Test
+    public void testHandleReceiveAddEndpoint() {
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of(
+                endpointAddress, Optional.of(firstEndpoint));
+        testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+
+        verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
+            eq(firstEndpoint.getRpcs()));
+        verifyNoMoreInteractions(rpcService, oldReg, newReg);
+    }
+
+    @Test
+    public void testHandleReceiveRemoveEndpoint() {
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of(
+                endpointAddress, Optional.empty());
+        testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+        verifyNoMoreInteractions(rpcService, oldReg, newReg);
+    }
+
+    @Test
+    public void testHandleReceiveUpdateRpcEndpoint() {
+        final InOrder inOrder = inOrder(rpcService, oldReg, newReg);
+
+        testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(firstEndpoint))),
+                ActorRef.noSender());
+
+        inOrder.verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
+                eq(firstEndpoint.getRpcs()));
+
+        testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(secondEndpoint))),
+                ActorRef.noSender());
+
+        inOrder.verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
+                eq(secondEndpoint.getRpcs()));
+
+
+        verifyNoMoreInteractions(rpcService, oldReg, newReg);
+    }
+
+    @Test
+    public void testHandleReceiveUpdateActionEndpoint() {
+        final InOrder inOrder = inOrder(actionService, oldActionReg, newActionReg);
+
+        testActorRef.tell(new UpdateRemoteActionEndpoints(ImmutableMap.of(endpointAddress,
+                Optional.of(firstActionEndpoint))), ActorRef.noSender());
+
+        inOrder.verify(actionService).registerActionImplementation(any(RemoteActionImplementation.class),
+                eq(firstActionEndpoint.getActions()));
+
+        testActorRef.tell(new UpdateRemoteActionEndpoints(ImmutableMap.of(endpointAddress,
+                Optional.of(secondActionEndpoint))), ActorRef.noSender());
+
+        inOrder.verify(actionService).registerActionImplementation(any(RemoteActionImplementation.class),
+                eq(secondActionEndpoint.getActions()));
+
+        // verify first registration is closed
+//        inOrder.verify(oldReg).close();
+
+        verifyNoMoreInteractions(actionService, oldActionReg, newActionReg);
+    }
+}
@@ -13,18 +13,24 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.opendaylight.mdsal.dom.api.DOMActionException;
+import org.opendaylight.mdsal.dom.api.DOMActionResult;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMRpcException;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -35,7 +41,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaPath;
  *
  * @author Thomas Pantelis
  */
-public class RemoteRpcImplementationTest extends AbstractRpcTest {
+public class RemoteOpsImplementationTest extends AbstractOpsTest {
 
     /**
      * This test method invokes and executes the remote rpc.
@@ -45,10 +51,12 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final ContainerNode rpcOutput = makeRPCOutput("bar");
         final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
 
+//        Answer<FluentFuture<DOMRpcResult>> answer = FluentFutures.immediateFluentFuture(rpcResult);
+
         final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
         @SuppressWarnings({"unchecked", "rawtypes"})
         final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
-                (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+                ArgumentCaptor.forClass(NormalizedNode.class);
 
         when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
                 FluentFutures.immediateFluentFuture(rpcResult));
@@ -60,6 +68,27 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         assertEquals(rpcOutput, result.getResult());
     }
 
+    /**
+     * This test method invokes and executes the remote action.
+     */
+    @Test
+    public void testInvokeAction() throws Exception {
+        final ContainerNode actionOutput = makeRPCOutput("bar");
+        final DOMActionResult actionResult = new SimpleDOMActionResult(actionOutput, Collections.emptyList());
+        final NormalizedNode<?, ?> invokeActionInput = makeRPCInput("foo");
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        final ArgumentCaptor<ContainerNode> inputCaptor =
+                ArgumentCaptor.forClass(ContainerNode.class);
+        doReturn(FluentFutures.immediateFluentFuture(actionResult)).when(domActionService2).invokeAction(
+                eq(TEST_RPC_TYPE), eq(TEST_DATA_TREE_ID), inputCaptor.capture());
+        final ListenableFuture<DOMActionResult> frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE,
+                TEST_DATA_TREE_ID, (ContainerNode) invokeActionInput);
+        assertTrue(frontEndFuture instanceof RemoteDOMActionFuture);
+        final DOMActionResult result = frontEndFuture.get(5, TimeUnit.SECONDS);
+        assertEquals(actionOutput, result.getOutput().get());
+
+    }
+
     /**
      * This test method invokes and executes the remote rpc.
      */
@@ -82,6 +111,28 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         assertEquals(rpcOutput, result.getResult());
     }
 
+    /**
+     * This test method invokes and executes the remote action.
+     */
+    @Test
+    public void testInvokeActionWithNullInput() throws Exception {
+        final ContainerNode actionOutput = makeRPCOutput("bar");
+        final DOMActionResult actionResult = new SimpleDOMActionResult(actionOutput);
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+            final ArgumentCaptor<ContainerNode> inputCaptor =
+                  ArgumentCaptor.forClass(ContainerNode.class);
+        doReturn(FluentFutures.immediateFluentFuture(actionResult)).when(domActionService2).invokeAction(
+                eq(TEST_RPC_TYPE), eq(TEST_DATA_TREE_ID), inputCaptor.capture());
+
+        ListenableFuture<DOMActionResult> frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE,
+                TEST_DATA_TREE_ID, actionOutput);
+        assertTrue(frontEndFuture instanceof RemoteDOMActionFuture);
+
+        final DOMActionResult result = frontEndFuture.get(5, TimeUnit.SECONDS);
+        assertEquals(actionOutput, result.getOutput().get());
+    }
+
     /**
      * This test method invokes and executes the remote rpc.
      */
@@ -129,6 +180,32 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         }
     }
 
+    /**
+     * This test method invokes and executes the remote rpc.
+     */
+    @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
+    @Test(expected = DOMActionException.class)
+    public void testInvokeActionWithRemoteFailedFuture() throws Throwable {
+        final ContainerNode invokeActionInput = makeRPCInput("foo");
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        final ArgumentCaptor<ContainerNode> inputCaptor =
+                ArgumentCaptor.forClass(ContainerNode.class);
+
+        when(domActionService2.invokeAction(eq(TEST_RPC_TYPE), eq(TEST_DATA_TREE_ID),
+                inputCaptor.capture())).thenReturn(FluentFutures.immediateFailedFluentFuture(
+                        new RemoteDOMRpcException("Test Exception", null)));
+
+        final ListenableFuture<DOMActionResult> frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE,
+                TEST_DATA_TREE_ID, invokeActionInput);
+        assertTrue(frontEndFuture instanceof RemoteDOMActionFuture);
+
+        try {
+            frontEndFuture.get(5, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        }
+    }
+
     /**
      * This test method invokes and tests exceptions when akka timeout occured
      * Currently ignored since this test with current config takes around 15 seconds to complete.
@@ -164,4 +241,27 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
             throw e.getCause();
         }
     }
+
+    /**
+     * This test method invokes remote rpc and lookup failed
+     * with runtime exception.
+     */
+    @Test(expected = DOMActionException.class)
+    @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
+    public void testInvokeActionWithLookupException() throws Throwable {
+        final ContainerNode invokeRpcInput = makeRPCInput("foo");
+
+        doThrow(new RuntimeException("test")).when(domActionService2).invokeAction(any(SchemaPath.class),
+                any(DOMDataTreeIdentifier.class), any(ContainerNode.class));
+
+        final ListenableFuture<DOMActionResult> frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE,
+                TEST_DATA_TREE_ID, invokeRpcInput);
+        assertTrue(frontEndFuture instanceof RemoteDOMActionFuture);
+
+        try {
+            frontEndFuture.get(5, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        }
+    }
 }
@@ -7,6 +7,11 @@
  */
 package org.opendaylight.controller.remote.rpc;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedAbstractActor;
@@ -14,33 +19,34 @@ import akka.testkit.TestActorRef;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
-import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
 import scala.concurrent.duration.FiniteDuration;
 
-public class RemoteRpcProviderConfigTest {
+public class RemoteOpsProviderConfigTest {
 
     @Test
     public void testConfigDefaults() {
-        RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test").build();
+        RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("unit-test").build();
 
         //Assert on configurations from common config
-        Assert.assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default
-        Assert.assertNotNull(config.getMailBoxCapacity());
-        Assert.assertNotNull(config.getMailBoxName());
-        Assert.assertNotNull(config.getMailBoxPushTimeout());
+        assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default
+        assertNotNull(config.getMailBoxCapacity());
+        assertNotNull(config.getMailBoxName());
+        assertNotNull(config.getMailBoxPushTimeout());
 
         //rest of the configurations should be set
-        Assert.assertNotNull(config.getActorSystemName());
-        Assert.assertNotNull(config.getRpcBrokerName());
-        Assert.assertNotNull(config.getRpcBrokerPath());
-        Assert.assertNotNull(config.getRpcManagerName());
-        Assert.assertNotNull(config.getRpcManagerPath());
-        Assert.assertNotNull(config.getRpcRegistryName());
-        Assert.assertNotNull(config.getRpcRegistryPath());
-        Assert.assertNotNull(config.getAskDuration());
-        Assert.assertNotNull(config.getGossipTickInterval());
+        assertNotNull(config.getActorSystemName());
+        assertNotNull(config.getRpcBrokerName());
+        assertNotNull(config.getRpcBrokerPath());
+        assertNotNull(config.getRpcManagerName());
+        assertNotNull(config.getRpcManagerPath());
+        assertNotNull(config.getRpcRegistryName());
+        assertNotNull(config.getActionRegistryName());
+        assertNotNull(config.getRpcRegistryPath());
+        assertNotNull(config.getActionRegistryPath());
+        assertNotNull(config.getAskDuration());
+        assertNotNull(config.getGossipTickInterval());
     }
 
     @Test
@@ -52,16 +58,16 @@ public class RemoteRpcProviderConfigTest {
         String timeOutVal = "10ms";
         FiniteDuration expectedTimeout = FiniteDuration.create(10, TimeUnit.MILLISECONDS);
 
-        RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test")
+        RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("unit-test")
                 .metricCaptureEnabled(true)//enable metric capture
                 .mailboxCapacity(expectedCapacity)
                 .mailboxPushTimeout(timeOutVal)
                 .withConfigReader(reader)
                 .build();
 
-        Assert.assertTrue(config.isMetricCaptureEnabled());
-        Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
-        Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+        assertTrue(config.isMetricCaptureEnabled());
+        assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+        assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
 
         //Now check this config inside an actor
         ActorSystem system = ActorSystem.create("unit-test", config.get());
@@ -71,11 +77,11 @@ public class RemoteRpcProviderConfigTest {
         ConfigTestActor actor = configTestActorTestActorRef.underlyingActor();
         Config actorConfig = actor.getConfig();
 
-        config = new RemoteRpcProviderConfig(actorConfig);
+        config = new RemoteOpsProviderConfig(actorConfig);
 
-        Assert.assertTrue(config.isMetricCaptureEnabled());
-        Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
-        Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+        assertTrue(config.isMetricCaptureEnabled());
+        assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+        assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
     }
 
     public static class ConfigTestActor extends UntypedAbstractActor {
@@ -87,7 +93,7 @@ public class RemoteRpcProviderConfigTest {
         }
 
         @Override
-        public void onReceive(Object message) {
+        public void onReceive(final Object message) {
         }
 
         /**
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactoryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactoryTest.java
new file mode 100644 (file)
index 0000000..851e37e
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import akka.actor.ActorSystem;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+
+public class RemoteOpsProviderFactoryTest {
+
+    @Mock
+    private DOMRpcProviderService providerService;
+    @Mock
+    private DOMRpcService rpcService;
+    @Mock
+    private ActorSystem actorSystem;
+    @Mock
+    private RemoteOpsProviderConfig providerConfig;
+    @Mock
+    private DOMActionProviderService actionProviderService;
+    @Mock
+    private DOMActionService actionService;
+
+    @Before
+    public void setUp() {
+        initMocks(this);
+    }
+
+    @Test
+    public void testCreateInstance() {
+        Assert.assertNotNull(RemoteOpsProviderFactory
+                .createInstance(providerService, rpcService, actorSystem, providerConfig,
+                        actionProviderService, actionService));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testCreateInstanceMissingProvideService() {
+        RemoteOpsProviderFactory.createInstance(null, rpcService, actorSystem, providerConfig,
+                actionProviderService, actionService);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testCreateInstanceMissingRpcService() {
+        RemoteOpsProviderFactory.createInstance(providerService, null, actorSystem, providerConfig,
+                actionProviderService, actionService);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testCreateInstanceMissingActorSystem() {
+        RemoteOpsProviderFactory.createInstance(providerService, rpcService, null, providerConfig,
+                actionProviderService, actionService);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testCreateInstanceMissingProviderConfig() {
+        RemoteOpsProviderFactory.createInstance(providerService, rpcService, actorSystem, null,
+                actionProviderService, actionService);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testCreateInstanceMissingActionProvider() {
+        RemoteOpsProviderFactory.createInstance(providerService, rpcService, actorSystem, providerConfig,
+                null, actionService);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testCreateInstanceMissingActionService() {
+        RemoteOpsProviderFactory.createInstance(providerService, rpcService, actorSystem, providerConfig,
+                actionProviderService, null);
+    }
+}
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.remote.rpc;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import akka.actor.ActorRef;
@@ -16,21 +17,22 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
 import org.opendaylight.mdsal.dom.api.DOMRpcService;
 import scala.concurrent.Await;
 import scala.concurrent.duration.FiniteDuration;
 
-public class RemoteRpcProviderTest {
+public class RemoteOpsProviderTest {
     static ActorSystem system;
-    static RemoteRpcProviderConfig moduleConfig;
+    static RemoteOpsProviderConfig moduleConfig;
 
     @BeforeClass
     public static void setup() {
-        moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc")
+        moduleConfig = new RemoteOpsProviderConfig.Builder("odl-cluster-rpc")
                 .withConfigReader(ConfigFactory::load).build();
         final Config config = moduleConfig.get();
         system = ActorSystem.create("odl-cluster-rpc", config);
@@ -45,16 +47,16 @@ public class RemoteRpcProviderTest {
 
     @Test
     public void testRemoteRpcProvider() throws Exception {
-        try (RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(DOMRpcProviderService.class),
-            mock(DOMRpcService.class), new RemoteRpcProviderConfig(system.settings().config()))) {
+        try (RemoteOpsProvider rpcProvider = new RemoteOpsProvider(system, mock(DOMRpcProviderService.class),
+                mock(DOMRpcService.class), new RemoteOpsProviderConfig(system.settings().config()),
+                mock(DOMActionProviderService.class), mock(DOMActionService.class))) {
 
             rpcProvider.start();
-
             final ActorRef actorRef = Await.result(
                     system.actorSelection(moduleConfig.getRpcManagerPath()).resolveOne(
                             FiniteDuration.create(1, TimeUnit.SECONDS)), FiniteDuration.create(2, TimeUnit.SECONDS));
 
-            Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
+            assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactoryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactoryTest.java
deleted file mode 100644 (file)
index 3481d5b..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc;
-
-import static org.mockito.MockitoAnnotations.initMocks;
-
-import akka.actor.ActorSystem;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-
-public class RemoteRpcProviderFactoryTest {
-
-    @Mock
-    private DOMRpcProviderService providerService;
-    @Mock
-    private DOMRpcService rpcService;
-    @Mock
-    private ActorSystem actorSystem;
-    @Mock
-    private RemoteRpcProviderConfig providerConfig;
-
-    @Before
-    public void setUp() {
-        initMocks(this);
-    }
-
-    @Test
-    public void testCreateInstance() {
-        Assert.assertNotNull(RemoteRpcProviderFactory
-                .createInstance(providerService, rpcService, actorSystem, providerConfig));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testCreateInstanceMissingProvideService() {
-        RemoteRpcProviderFactory.createInstance(null, rpcService, actorSystem, providerConfig);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testCreateInstanceMissingRpcService() {
-        RemoteRpcProviderFactory.createInstance(providerService, null, actorSystem, providerConfig);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testCreateInstanceMissingActorSystem() {
-        RemoteRpcProviderFactory.createInstance(providerService, rpcService, null, providerConfig);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testCreateInstanceMissingProviderConfig() {
-        RemoteRpcProviderFactory.createInstance(providerService, rpcService, actorSystem, null);
-    }
-}
index 941ae2b..6ced096 100644 (file)
@@ -7,9 +7,10 @@
  */
 package org.opendaylight.controller.remote.rpc;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.ArrayList;
 import java.util.List;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -37,22 +38,25 @@ public class RpcErrorsExceptionTest {
 
     @Test
     public void testGetMessage() {
-        Assert.assertEquals(ERROR_MESSAGE, exception.getMessage());
+        assertEquals(ERROR_MESSAGE, exception.getMessage());
     }
 
     @Test
     public void testGetRpcErrors() {
         final List<RpcError> actualErrors = (List<RpcError>) exception.getRpcErrors();
-        Assert.assertEquals(rpcErrors.size(), actualErrors.size());
+        assertEquals(rpcErrors.size(), actualErrors.size());
 
         for (int i = 0; i < actualErrors.size(); i++) {
-            Assert.assertEquals(rpcErrors.get(i).getApplicationTag(), actualErrors.get(i).getApplicationTag());
-            Assert.assertEquals(rpcErrors.get(i).getSeverity(), actualErrors.get(i).getSeverity());
-            Assert.assertEquals(rpcErrors.get(i).getMessage(), actualErrors.get(i).getMessage());
-            Assert.assertEquals(rpcErrors.get(i).getErrorType(), actualErrors.get(i).getErrorType());
-            Assert.assertEquals(rpcErrors.get(i).getCause(), actualErrors.get(i).getCause());
-            Assert.assertEquals(rpcErrors.get(i).getInfo(), actualErrors.get(i).getInfo());
-            Assert.assertEquals(rpcErrors.get(i).getTag(), actualErrors.get(i).getTag());
+            final RpcError expected = rpcErrors.get(i);
+            final RpcError actual = actualErrors.get(i);
+
+            assertEquals(expected.getApplicationTag(), actual.getApplicationTag());
+            assertEquals(expected.getSeverity(), actual.getSeverity());
+            assertEquals(expected.getMessage(), actual.getMessage());
+            assertEquals(expected.getErrorType(), actual.getErrorType());
+            assertEquals(expected.getCause(), actual.getCause());
+            assertEquals(expected.getInfo(), actual.getInfo());
+            assertEquals(expected.getTag(), actual.getTag());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcRegistrarTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcRegistrarTest.java
deleted file mode 100644 (file)
index 48b825a..0000000
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.Props;
-import akka.testkit.TestActorRef;
-import akka.testkit.javadsl.TestKit;
-import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.InOrder;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
-import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
-public class RpcRegistrarTest {
-    @Mock
-    private DOMRpcProviderService service;
-    @Mock
-    private DOMRpcImplementationRegistration<RemoteRpcImplementation> oldReg;
-    @Mock
-    private DOMRpcImplementationRegistration<RemoteRpcImplementation> newReg;
-
-    private ActorSystem system;
-    private TestActorRef<RpcRegistrar> testActorRef;
-    private Address endpointAddress;
-    private RemoteRpcEndpoint firstEndpoint;
-    private RemoteRpcEndpoint secondEndpoint;
-    private RpcRegistrar rpcRegistrar;
-
-    @Before
-    public void setUp() {
-        MockitoAnnotations.initMocks(this);
-        system = ActorSystem.create("test");
-
-        final TestKit testKit = new TestKit(system);
-        final RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("system").build();
-        final Props props = RpcRegistrar.props(config, service);
-        testActorRef = new TestActorRef<>(system, props, testKit.getRef(), "actorRef");
-        endpointAddress = new Address("http", "local");
-
-        final DOMRpcIdentifier firstEndpointId = DOMRpcIdentifier.create(
-                SchemaPath.create(true, QName.create("first:identifier", "foo")));
-        final DOMRpcIdentifier secondEndpointId = DOMRpcIdentifier.create(
-                SchemaPath.create(true, QName.create("second:identifier", "bar")));
-
-        final TestKit senderKit = new TestKit(system);
-        firstEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(firstEndpointId));
-        secondEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(secondEndpointId));
-
-        Mockito.doReturn(oldReg).when(service).registerRpcImplementation(
-                Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs()));
-
-        Mockito.doReturn(newReg).when(service).registerRpcImplementation(
-                Mockito.any(RemoteRpcImplementation.class), Mockito.eq(secondEndpoint.getRpcs()));
-
-        rpcRegistrar = testActorRef.underlyingActor();
-    }
-
-    @After
-    public void tearDown() {
-        TestKit.shutdownActorSystem(system, true);
-    }
-
-    @Test
-    public void testPostStop() throws Exception {
-        testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(firstEndpoint))),
-                ActorRef.noSender());
-        testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(secondEndpoint))),
-                ActorRef.noSender());
-
-        rpcRegistrar.postStop();
-
-        Mockito.verify(oldReg).close();
-        Mockito.verify(newReg).close();
-    }
-
-    @Test
-    public void testHandleReceiveAddEndpoint() {
-        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of(
-                endpointAddress, Optional.of(firstEndpoint));
-        testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
-
-        Mockito.verify(service).registerRpcImplementation(
-                Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs()));
-        Mockito.verifyNoMoreInteractions(service, oldReg, newReg);
-    }
-
-    @Test
-    public void testHandleReceiveRemoveEndpoint() {
-        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of(
-                endpointAddress, Optional.empty());
-        testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
-        Mockito.verifyNoMoreInteractions(service, oldReg, newReg);
-    }
-
-    @Test
-    public void testHandleReceiveUpdateEndpoint() {
-        final InOrder inOrder = Mockito.inOrder(service, oldReg, newReg);
-
-        testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(firstEndpoint))),
-                ActorRef.noSender());
-
-        // first registration
-        inOrder.verify(service).registerRpcImplementation(
-                Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs()));
-
-        testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(secondEndpoint))),
-                ActorRef.noSender());
-
-        // second registration
-        inOrder.verify(service).registerRpcImplementation(
-                Mockito.any(RemoteRpcImplementation.class), Mockito.eq(secondEndpoint.getRpcs()));
-
-        // verify first registration is closed
-        inOrder.verify(oldReg).close();
-
-        Mockito.verifyNoMoreInteractions(service, oldReg, newReg);
-    }
-}
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -11,23 +11,19 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.AbstractRpcTest;
+import org.opendaylight.controller.remote.rpc.AbstractOpsTest;
 
-/**
- * Unit tests for ExecuteRpc.
- *
- * @author Thomas Pantelis
- */
-public class ExecuteRpcTest {
+public class ExecuteOpsTest {
 
     @Test
-    public void testSerialization() {
-        ExecuteRpc expected = ExecuteRpc.from(AbstractRpcTest.TEST_RPC_ID,
-                AbstractRpcTest.makeRPCInput("serialization-test"));
+    public void testOpsSerialization() {
+        ExecuteRpc expected = ExecuteRpc.from(AbstractOpsTest.TEST_RPC_ID,
+                AbstractOpsTest.makeRPCInput("serialization-test"));
 
         ExecuteRpc actual = (ExecuteRpc) SerializationUtils.clone(expected);
 
-        assertEquals("getRpc", expected.getRpc(), actual.getRpc());
-        assertEquals("getInputNormalizedNode", expected.getInputNormalizedNode(), actual.getInputNormalizedNode());
+        assertEquals("getName", expected.getType(), actual.getType());
+        assertEquals("getInputNormalizedNode", expected.getInput(), actual.getInput());
+        assertEquals("getPath", expected.getType(), actual.getType());
     }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/OpsResponseTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/OpsResponseTest.java
new file mode 100644 (file)
index 0000000..938f148
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.AbstractOpsTest;
+
+/**
+ * Unit tests for RpcResponse.
+ *
+ * @author Thomas Pantelis
+ */
+public class OpsResponseTest {
+
+    @Test
+    public void testSerialization() {
+        RpcResponse expectedRpc = new RpcResponse(AbstractOpsTest.makeRPCOutput("serialization-test"));
+
+        ActionResponse expectedAction = new ActionResponse(
+            Optional.of(AbstractOpsTest.makeRPCOutput("serialization-test")), Collections.emptyList());
+
+        RpcResponse actualRpc = (RpcResponse) SerializationUtils.clone(expectedRpc);
+
+        ActionResponse actualAction = (ActionResponse) SerializationUtils.clone(expectedAction);
+
+        assertEquals("getResultNormalizedNode", expectedRpc.getOutput(),
+                actualRpc.getOutput());
+
+        assertEquals("getResultNormalizedNode", expectedAction.getOutput(),
+                actualAction.getOutput());
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java
deleted file mode 100644 (file)
index f4ec377..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc.messages;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.AbstractRpcTest;
-
-/**
- * Unit tests for RpcResponse.
- *
- * @author Thomas Pantelis
- */
-public class RpcResponseTest {
-
-    @Test
-    public void testSerialization() {
-        RpcResponse expected = new RpcResponse(AbstractRpcTest.makeRPCOutput("serialization-test"));
-
-        RpcResponse actual = (RpcResponse) SerializationUtils.clone(expected);
-
-        assertEquals("getResultNormalizedNode", expected.getResultNormalizedNode(), actual.getResultNormalizedNode());
-    }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistryTest.java
new file mode 100644 (file)
index 0000000..82b2c79
--- /dev/null
@@ -0,0 +1,423 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.UniqueAddress;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
+import java.net.URI;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateActions;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ActionRegistryTest {
+    private static final Logger LOG = LoggerFactory.getLogger(ActionRegistryTest.class);
+
+    private static ActorSystem node1;
+    private static ActorSystem node2;
+    private static ActorSystem node3;
+
+    private TestKit invoker1;
+    private TestKit invoker2;
+    private TestKit invoker3;
+    private TestKit registrar1;
+    private TestKit registrar2;
+    private TestKit registrar3;
+    private ActorRef registry1;
+    private ActorRef registry2;
+    private ActorRef registry3;
+
+    private int routeIdCounter = 1;
+
+    @BeforeClass
+    public static void staticSetup() {
+        AkkaConfigurationReader reader = ConfigFactory::load;
+
+        RemoteOpsProviderConfig config1 = new RemoteOpsProviderConfig.Builder("memberA").gossipTickInterval("200ms")
+                .withConfigReader(reader).build();
+        RemoteOpsProviderConfig config2 = new RemoteOpsProviderConfig.Builder("memberB").gossipTickInterval("200ms")
+                .withConfigReader(reader).build();
+        RemoteOpsProviderConfig config3 = new RemoteOpsProviderConfig.Builder("memberC").gossipTickInterval("200ms")
+                .withConfigReader(reader).build();
+        node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+        node2 = ActorSystem.create("opendaylight-rpc", config2.get());
+        node3 = ActorSystem.create("opendaylight-rpc", config3.get());
+
+        waitForMembersUp(node1, Cluster.get(node2).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
+        waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
+    }
+
+    static void waitForMembersUp(final ActorSystem node, final UniqueAddress... addresses) {
+        Set<UniqueAddress> otherMembersSet = Sets.newHashSet(addresses);
+        Stopwatch sw = Stopwatch.createStarted();
+        while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
+            CurrentClusterState state = Cluster.get(node).state();
+            for (Member m : state.getMembers()) {
+                if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress())
+                        && otherMembersSet.isEmpty()) {
+                    return;
+                }
+            }
+
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+
+        fail("Member(s) " + otherMembersSet + " are not Up");
+    }
+
+    @AfterClass
+    public static void staticTeardown() {
+        TestKit.shutdownActorSystem(node1);
+        TestKit.shutdownActorSystem(node2);
+        TestKit.shutdownActorSystem(node3);
+    }
+
+    @Before
+    public void setup() {
+        invoker1 = new TestKit(node1);
+        registrar1 = new TestKit(node1);
+        registry1 = node1.actorOf(ActionRegistry.props(config(node1), invoker1.getRef(), registrar1.getRef()));
+        invoker2 = new TestKit(node2);
+        registrar2 = new TestKit(node2);
+        registry2 = node2.actorOf(ActionRegistry.props(config(node2), invoker2.getRef(), registrar2.getRef()));
+        invoker3 = new TestKit(node3);
+        registrar3 = new TestKit(node3);
+        registry3 = node3.actorOf(ActionRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef()));
+    }
+
+    private static RemoteOpsProviderConfig config(final ActorSystem node) {
+        return new RemoteOpsProviderConfig(node.settings().config());
+    }
+
+    @After
+    public void teardown() {
+        if (registry1 != null) {
+            node1.stop(registry1);
+        }
+        if (registry2 != null) {
+            node2.stop(registry2);
+        }
+        if (registry3 != null) {
+            node3.stop(registry3);
+        }
+
+        if (invoker1 != null) {
+            node1.stop(invoker1.getRef());
+        }
+        if (invoker2 != null) {
+            node2.stop(invoker2.getRef());
+        }
+        if (invoker3 != null) {
+            node3.stop(invoker3.getRef());
+        }
+
+        if (registrar1 != null) {
+            node1.stop(registrar1.getRef());
+        }
+        if (registrar2 != null) {
+            node2.stop(registrar2.getRef());
+        }
+        if (registrar3 != null) {
+            node3.stop(registrar3.getRef());
+        }
+    }
+
+    /**
+     * One node cluster. 1. Register action, ensure router can be found 2. Then remove action, ensure its
+     * deleted
+     */
+    @Test
+    public void testAddRemoveActionOnSameNode() {
+        LOG.info("testAddRemoveActionOnSameNode starting");
+
+        Address nodeAddress = node1.provider().getDefaultAddress();
+
+        // Add action on node 1
+
+        List<DOMActionInstance> addedRouteIds = createRouteIds();
+
+        registry1.tell(new ActionRegistry.Messages.UpdateActions(addedRouteIds,
+                Collections.emptyList()), ActorRef.noSender());
+
+        // Bucket store should get an update bucket message. Updated bucket contains added action.
+        final TestKit testKit = new TestKit(node1);
+
+        Map<Address, Bucket<ActionRoutingTable>> buckets = retrieveBuckets(registry1, testKit, nodeAddress);
+        verifyBucket(buckets.get(nodeAddress), addedRouteIds);
+
+        Map<Address, Long> versions = retrieveVersions(registry1, testKit);
+        assertEquals("Version for bucket " + nodeAddress, (Long) buckets.get(nodeAddress).getVersion(),
+                versions.get(nodeAddress));
+
+        // Now remove action
+        registry1.tell(new UpdateActions(Collections.emptyList(),addedRouteIds), ActorRef.noSender());
+
+        // Bucket store should get an update bucket message. Action is removed in the updated bucket
+
+        verifyEmptyBucket(testKit, registry1, nodeAddress);
+
+        LOG.info("testAddRemoveActionOnSameNode ending");
+
+    }
+
+    /**
+     * Three node cluster. 1. Register action on 1 node, ensure 2nd node gets updated 2. Remove action on
+     * 1 node, ensure 2nd node gets updated
+     */
+    @Test
+    public void testActionAddRemoveInCluster() {
+
+        LOG.info("testActionAddRemoveInCluster starting");
+
+        List<DOMActionInstance> addedRouteIds = createRouteIds();
+
+        Address node1Address = node1.provider().getDefaultAddress();
+
+        // Add action on node 1
+        registry1.tell(new UpdateActions(addedRouteIds, Collections.emptyList()), ActorRef.noSender());
+
+        // Bucket store on node2 should get a message to update its local copy of remote buckets
+        final TestKit testKit = new TestKit(node2);
+
+        Map<Address, Bucket<ActionRoutingTable>> buckets = retrieveBuckets(registry2, testKit, node1Address);
+        verifyBucket(buckets.get(node1Address), addedRouteIds);
+
+        // Now remove
+        registry1.tell(new UpdateActions(Collections.emptyList(), addedRouteIds), ActorRef.noSender());
+
+        // Bucket store on node2 should get a message to update its local copy of remote buckets.
+        // Wait for the bucket for node1 to be empty.
+
+        verifyEmptyBucket(testKit, registry2, node1Address);
+
+        LOG.info("testActionAddRemoveInCluster ending");
+    }
+
+    private void verifyEmptyBucket(final TestKit testKit, final ActorRef registry, final Address address)
+            throws AssertionError {
+        Map<Address, Bucket<ActionRoutingTable>> buckets;
+        int numTries = 0;
+        while (true) {
+            buckets = retrieveBuckets(registry1, testKit, address);
+
+            try {
+                verifyBucket(buckets.get(address), Collections.emptyList());
+                break;
+            } catch (AssertionError e) {
+                if (++numTries >= 50) {
+                    throw e;
+                }
+            }
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * Three node cluster. Register action on 2 nodes. Ensure 3rd gets updated.
+     */
+    @Test
+    public void testActionAddedOnMultiNodes() {
+        final TestKit testKit = new TestKit(node3);
+
+        // Add action on node 1
+        List<DOMActionInstance> addedRouteIds1 = createRouteIds();
+        registry1.tell(new UpdateActions(addedRouteIds1, Collections.emptyList()), ActorRef.noSender());
+
+        final UpdateRemoteActionEndpoints req1 = registrar3.expectMsgClass(Duration.ofSeconds(3),
+                UpdateRemoteActionEndpoints.class);
+
+        // Add action on node 2
+        List<DOMActionInstance> addedRouteIds2 = createRouteIds();
+        registry2.tell(new UpdateActions(addedRouteIds2, Collections.emptyList()), ActorRef.noSender());
+
+        final UpdateRemoteActionEndpoints req2 = registrar3.expectMsgClass(Duration.ofSeconds(3),
+                UpdateRemoteActionEndpoints.class);
+        Address node2Address = node2.provider().getDefaultAddress();
+        Address node1Address = node1.provider().getDefaultAddress();
+
+        Map<Address, Bucket<ActionRoutingTable>> buckets = retrieveBuckets(registry3, testKit, node1Address,
+                node2Address);
+
+        verifyBucket(buckets.get(node1Address), addedRouteIds1);
+        verifyBucket(buckets.get(node2Address), addedRouteIds2);
+
+        Map<Address, Long> versions = retrieveVersions(registry3, testKit);
+        assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(),
+                versions.get(node1Address));
+        assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(),
+                versions.get(node2Address));
+
+        assertEndpoints(req1, node1Address, invoker1);
+        assertEndpoints(req2, node2Address, invoker2);
+
+    }
+
+    private static void assertEndpoints(final ActionRegistry.Messages.UpdateRemoteActionEndpoints msg,
+                                        final Address address, final TestKit invoker) {
+        final Map<Address, Optional<RemoteActionEndpoint>> endpoints = msg.getActionEndpoints();
+        assertEquals(1, endpoints.size());
+
+        final Optional<RemoteActionEndpoint> maybeEndpoint = endpoints.get(address);
+        assertNotNull(maybeEndpoint);
+        assertTrue(maybeEndpoint.isPresent());
+
+        final RemoteActionEndpoint endpoint = maybeEndpoint.get();
+        final ActorRef router = endpoint.getRouter();
+        assertNotNull(router);
+
+        router.tell("hello", ActorRef.noSender());
+        final String s = invoker.expectMsgClass(Duration.ofSeconds(3), String.class);
+        assertEquals("hello", s);
+    }
+
+    private static Map<Address, Long> retrieveVersions(final ActorRef bucketStore, final TestKit testKit) {
+        bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef());
+        @SuppressWarnings("unchecked")
+        final Map<Address, Long> reply = testKit.expectMsgClass(Duration.ofSeconds(3), Map.class);
+        return reply;
+    }
+
+    private static void verifyBucket(final Bucket<ActionRoutingTable> bucket,
+                                     final List<DOMActionInstance> expRouteIds) {
+        ActionRoutingTable table = bucket.getData();
+        assertNotNull("Bucket ActionRoutingTable is null", table);
+        for (DOMActionInstance r : expRouteIds) {
+            if (!table.contains(r)) {
+                fail("ActionRoutingTable does not contain " + r + ". Actual: " + table);
+            }
+        }
+
+        assertEquals("ActionRoutingTable size", expRouteIds.size(), table.size());
+    }
+
+    private static Map<Address, Bucket<ActionRoutingTable>> retrieveBuckets(
+            final ActorRef bucketStore, final TestKit testKit, final Address... addresses) {
+        int numTries = 0;
+        while (true) {
+            bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef());
+            @SuppressWarnings("unchecked")
+            Map<Address, Bucket<ActionRoutingTable>> buckets = testKit.expectMsgClass(Duration.ofSeconds(3),
+                    Map.class);
+
+            boolean foundAll = true;
+            for (Address addr : addresses) {
+                Bucket<ActionRoutingTable> bucket = buckets.get(addr);
+                if (bucket == null) {
+                    foundAll = false;
+                    break;
+                }
+            }
+
+            if (foundAll) {
+                return buckets;
+            }
+
+            if (++numTries >= 50) {
+                fail("Missing expected buckets for addresses: " + Arrays.toString(addresses)
+                        + ", Actual: " + buckets);
+            }
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Test
+    public void testAddRoutesConcurrency() {
+        final TestKit testKit = new TestKit(node1);
+
+        final int nRoutes = 500;
+        final Collection<DOMActionInstance> added = new ArrayList<>(nRoutes);
+        for (int i = 0; i < nRoutes; i++) {
+            QName type = QName.create(URI.create("/mockaction"), "mockaction" + routeIdCounter++);
+            final DOMActionInstance routeId = DOMActionInstance.of(SchemaPath.create(true,
+                    type), LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.create(new
+                    YangInstanceIdentifier.NodeIdentifier(type)));
+            added.add(routeId);
+
+            //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            registry1.tell(new UpdateActions(Arrays.asList(routeId), Collections.emptyList()),
+                    ActorRef.noSender());
+        }
+
+        int numTries = 0;
+        while (true) {
+            registry1.tell(GET_ALL_BUCKETS, testKit.getRef());
+            @SuppressWarnings("unchecked")
+            Map<Address, Bucket<ActionRoutingTable>> buckets = testKit.expectMsgClass(Duration.ofSeconds(3),
+                    Map.class);
+
+            Bucket<ActionRoutingTable> localBucket = buckets.values().iterator().next();
+            ActionRoutingTable table = localBucket.getData();
+            if (table != null && table.size() == nRoutes) {
+                for (DOMActionInstance r : added) {
+                    assertTrue("ActionRoutingTable contains " + r, table.contains(r));
+                }
+
+                break;
+            }
+
+            if (++numTries >= 50) {
+                fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
+            }
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private List<DOMActionInstance> createRouteIds() {
+        QName type = QName.create(URI.create("/mockaction"), "mockaction" + routeIdCounter++);
+        List<DOMActionInstance> routeIds = new ArrayList<>(1);
+        routeIds.add(DOMActionInstance.of(SchemaPath.create(true, type),
+                LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.create(
+                        new YangInstanceIdentifier.NodeIdentifier(type))));
+        return routeIds;
+    }
+}
index d32e8d8..4ae6e1e 100644 (file)
@@ -44,7 +44,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
@@ -79,11 +79,11 @@ public class RpcRegistryTest {
     public static void staticSetup() {
         AkkaConfigurationReader reader = ConfigFactory::load;
 
-        RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms")
+        RemoteOpsProviderConfig config1 = new RemoteOpsProviderConfig.Builder("memberA").gossipTickInterval("200ms")
                 .withConfigReader(reader).build();
-        RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").gossipTickInterval("200ms")
+        RemoteOpsProviderConfig config2 = new RemoteOpsProviderConfig.Builder("memberB").gossipTickInterval("200ms")
                 .withConfigReader(reader).build();
-        RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").gossipTickInterval("200ms")
+        RemoteOpsProviderConfig config3 = new RemoteOpsProviderConfig.Builder("memberC").gossipTickInterval("200ms")
                 .withConfigReader(reader).build();
         node1 = ActorSystem.create("opendaylight-rpc", config1.get());
         node2 = ActorSystem.create("opendaylight-rpc", config2.get());
@@ -131,8 +131,8 @@ public class RpcRegistryTest {
         registry3 = node3.actorOf(RpcRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef()));
     }
 
-    private static RemoteRpcProviderConfig config(final ActorSystem node) {
-        return new RemoteRpcProviderConfig(node.settings().config());
+    private static RemoteOpsProviderConfig config(final ActorSystem node) {
+        return new RemoteOpsProviderConfig(node.settings().config());
     }
 
     @After
@@ -299,7 +299,7 @@ public class RpcRegistryTest {
     }
 
     private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address, final TestKit invoker) {
-        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = msg.getEndpoints();
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = msg.getRpcEndpoints();
         assertEquals(1, endpoints.size());
 
         final Optional<RemoteRpcEndpoint> maybeEndpoint = endpoints.get(address);
index b9784ab..ed6d12c 100644 (file)
@@ -22,7 +22,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 import org.opendaylight.controller.remote.rpc.TerminationMonitor;
 
 public class BucketStoreTest {
@@ -140,13 +140,13 @@ public class BucketStoreTest {
      */
     private static BucketStoreActor<T> createStore() {
         final Props props = Props.create(TestingBucketStoreActor.class,
-                new RemoteRpcProviderConfig(system.settings().config()), "testing-store",new T());
+                new RemoteOpsProviderConfig(system.settings().config()), "testing-store",new T());
         return TestActorRef.<BucketStoreActor<T>>create(system, props, "testStore").underlyingActor();
     }
 
     private static final class TestingBucketStoreActor extends BucketStoreActor<T> {
 
-        protected TestingBucketStoreActor(final RemoteRpcProviderConfig config,
+        protected TestingBucketStoreActor(final RemoteOpsProviderConfig config,
                                           final String persistenceId,
                                           final T initialData) {
             super(config, persistenceId, initialData);
index 6c826ac..896c024 100644 (file)
@@ -29,7 +29,7 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 import org.opendaylight.controller.remote.rpc.TerminationMonitor;
 
 
@@ -106,8 +106,8 @@ public class GossiperTest {
      * @return instance of Gossiper class
      */
     private static Gossiper createGossiper() {
-        final RemoteRpcProviderConfig config =
-                new RemoteRpcProviderConfig.Builder("unit-test")
+        final RemoteOpsProviderConfig config =
+                new RemoteOpsProviderConfig.Builder("unit-test")
                         .withConfigReader(ConfigFactory::load).build();
         final Props props = Gossiper.testProps(config);
         final TestActorRef<Gossiper> testRef = TestActorRef.create(system, props, "testGossiper");
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImplTest.java
new file mode 100644 (file)
index 0000000..67aba15
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
+import akka.util.Timeout;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class RemoteActionRegistryMXBeanImplTest {
+
+    private static final QName LOCAL_QNAME = QName.create("base", "local");
+    private static final SchemaPath EMPTY_SCHEMA_PATH = SchemaPath.ROOT;
+    private static final SchemaPath LOCAL_SCHEMA_PATH = SchemaPath.create(true, LOCAL_QNAME);
+
+    private ActorSystem system;
+    private TestActorRef<ActionRegistry> testActor;
+    private List<DOMActionInstance> buckets;
+    private RemoteActionRegistryMXBeanImpl mxBean;
+
+    @Before
+    public void setUp() {
+        system = ActorSystem.create("test");
+
+        final DOMActionInstance emptyActionIdentifier = DOMActionInstance.of(
+                EMPTY_SCHEMA_PATH, LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY);
+        final DOMActionInstance localActionIdentifier = DOMActionInstance.of(
+                LOCAL_SCHEMA_PATH, LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(LOCAL_QNAME));
+
+        buckets = Lists.newArrayList(emptyActionIdentifier, localActionIdentifier);
+
+        final RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("system").build();
+        final TestKit invoker = new TestKit(system);
+        final TestKit registrar = new TestKit(system);
+        final TestKit supervisor = new TestKit(system);
+        final Props props = ActionRegistry.props(config, invoker.getRef(), registrar.getRef())
+                .withDispatcher(Dispatchers.DefaultDispatcherId());
+        testActor = new TestActorRef<>(system, props, supervisor.getRef(), "testActor");
+
+        final Timeout timeout = Timeout.apply(10, TimeUnit.SECONDS);
+        mxBean = new RemoteActionRegistryMXBeanImpl(new BucketStoreAccess(testActor, system.dispatcher(), timeout),
+                timeout);
+    }
+
+    @After
+    public void tearDown() {
+        TestKit.shutdownActorSystem(system, Boolean.TRUE);
+    }
+
+    @Test
+    public void testGetLocalRegisteredRoutedActionEmptyBuckets() {
+        final Set<String> localRegisteredRoutedAction = mxBean.getLocalRegisteredAction();
+
+        Assert.assertNotNull(localRegisteredRoutedAction);
+        Assert.assertTrue(localRegisteredRoutedAction.isEmpty());
+    }
+
+    @Test
+    public void testGetLocalRegisteredRoutedAction() {
+        testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets),
+                Collections.emptyList()), ActorRef.noSender());
+        final Set<String> localRegisteredRoutedAction = mxBean.getLocalRegisteredAction();
+
+        Assert.assertNotNull(localRegisteredRoutedAction);
+        Assert.assertEquals(1, localRegisteredRoutedAction.size());
+
+        final String localAction = localRegisteredRoutedAction.iterator().next();
+        Assert.assertTrue(localAction.contains(LOCAL_QNAME.toString()));
+        Assert.assertTrue(localAction.contains(LOCAL_SCHEMA_PATH.toString()));
+    }
+
+    @Test
+    public void testFindActionByNameEmptyBuckets() {
+        final Map<String, String> rpcByName = mxBean.findActionByName("");
+
+        Assert.assertNotNull(rpcByName);
+        Assert.assertTrue(rpcByName.isEmpty());
+    }
+
+    @Test
+    public void testFindActionByName() {
+        testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets),
+                Collections.emptyList()), ActorRef.noSender());
+        final Map<String, String> rpcByName = mxBean.findActionByName("");
+
+        Assert.assertNotNull(rpcByName);
+        Assert.assertEquals(1, rpcByName.size());
+        Assert.assertTrue(rpcByName.containsValue(LOCAL_QNAME.getLocalName()));
+    }
+
+    @Test
+    public void testFindActionByRouteEmptyBuckets() {
+        final Map<String, String> rpcByRoute = mxBean.findActionByRoute("");
+
+        Assert.assertNotNull(rpcByRoute);
+        Assert.assertTrue(rpcByRoute.isEmpty());
+    }
+
+    @Test
+    public void testFindActionByRoute() {
+        testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets),
+                Collections.emptyList()), ActorRef.noSender());
+        final Map<String, String> rpcByRoute = mxBean.findActionByRoute("");
+
+        Assert.assertNotNull(rpcByRoute);
+        Assert.assertEquals(1, rpcByRoute.size());
+        Assert.assertTrue(rpcByRoute.containsValue(LOCAL_QNAME.getLocalName()));
+    }
+
+    @Test
+    public void testGetBucketVersionsEmptyBuckets() {
+        final String bucketVersions = mxBean.getBucketVersions();
+        Assert.assertEquals(Collections.emptyMap().toString(), bucketVersions);
+    }
+
+    @Test
+    public void testGetBucketVersions() {
+        testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets),
+                Collections.emptyList()), ActorRef.noSender());
+        final String bucketVersions = mxBean.getBucketVersions();
+
+        Assert.assertTrue(bucketVersions.contains(testActor.provider().getDefaultAddress().toString()));
+    }
+}
index d445ae1..dbeccb2 100644 (file)
@@ -24,7 +24,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
@@ -54,7 +54,7 @@ public class RemoteRpcRegistryMXBeanImplTest {
 
         buckets = Lists.newArrayList(emptyRpcIdentifier, localRpcIdentifier);
 
-        final RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("system").build();
+        final RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("system").build();
         final TestKit invoker = new TestKit(system);
         final TestKit registrar = new TestKit(system);
         final TestKit supervisor = new TestKit(system);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.