From: EmmettCox Date: Thu, 4 Jul 2019 16:31:38 +0000 (+0000) Subject: Teach sal-remoterpc-connector to route actions X-Git-Tag: release/sodium~36 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=927bce5688e4b9d33d3e5e9b769d8a0dba5ccdd4 Teach sal-remoterpc-connector to route actions sal-remoterpc-connector already handles routing of RPC registrations and invocations across a cluster. Actions are very similar to RPCs, hence it is natural to keep both in the same component. This patch refactors common bits that go into tracking both, so that we share common actors and concepts. JIRA: CONTROLLER-1894 Change-Id: I0b9005bc3560b4dd5977a280d83eceebe132bec9 Signed-off-by: EmmettCox --- diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteFuture.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteFuture.java new file mode 100644 index 0000000000..ae977d6d22 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteFuture.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import static java.util.Objects.requireNonNull; + +import akka.dispatch.OnComplete; +import com.google.common.util.concurrent.AbstractFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; + +abstract class AbstractRemoteFuture extends AbstractFuture { + private static final Logger LOG = LoggerFactory.getLogger(AbstractRemoteFuture.class); + + private final @NonNull SchemaPath type; + + AbstractRemoteFuture(final @NonNull SchemaPath type, final Future requestFuture) { + this.type = requireNonNull(type); + requestFuture.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global()); + } + + @Override + public final T get() throws InterruptedException, ExecutionException { + try { + return super.get(); + } catch (ExecutionException e) { + throw mapException(e); + } + } + + @Override + public final T get(final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + try { + return super.get(timeout, unit); + } catch (final ExecutionException e) { + throw mapException(e); + } + } + + @Override + protected final boolean set(final T value) { + final boolean ret = super.set(value); + if (ret) { + LOG.debug("Future {} for action {} successfully completed", this, type); + } + return ret; + } + + final void failNow(final Throwable error) { + LOG.debug("Failing future {} for operation {}", this, type, error); + setException(error); + } + + abstract @Nullable T processReply(Object reply); + + abstract @NonNull Class exceptionClass(); + + abstract @NonNull E wrapCause(Throwable cause); + + private ExecutionException mapException(final ExecutionException ex) { + final Throwable cause = ex.getCause(); + return exceptionClass().isInstance(cause) ? ex : new ExecutionException(ex.getMessage(), wrapCause(cause)); + } + + private final class FutureUpdater extends OnComplete { + @Override + public void onComplete(final Throwable error, final Object reply) { + if (error == null) { + final T result = processReply(reply); + if (result != null) { + LOG.debug("Received response for operation {}: result is {}", type, result); + set(result); + } else { + failNow(new IllegalStateException("Incorrect reply type " + reply + " from Akka")); + } + } else { + failNow(error); + } + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteImplementation.java new file mode 100644 index 0000000000..6a8030f7c7 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractRemoteImplementation.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorRef; +import akka.pattern.Patterns; +import akka.util.Timeout; +import org.opendaylight.controller.remote.rpc.messages.AbstractExecute; +import scala.concurrent.Future; + +/** + * An abstract base class for remote RPC/action implementations. + */ +abstract class AbstractRemoteImplementation> { + // 0 for local, 1 for binding, 2 for remote + static final long COST = 2; + + private final ActorRef remoteInvoker; + private final Timeout askDuration; + + AbstractRemoteImplementation(final ActorRef remoteInvoker, final RemoteOpsProviderConfig config) { + this.remoteInvoker = requireNonNull(remoteInvoker); + this.askDuration = config.getAskDuration(); + } + + final Future ask(final T message) { + return Patterns.ask(remoteInvoker, requireNonNull(message), askDuration); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsInvoker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsInvoker.java new file mode 100644 index 0000000000..2f1ac92adb --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsInvoker.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Status.Failure; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collection; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.remote.rpc.messages.ActionResponse; +import org.opendaylight.controller.remote.rpc.messages.ExecuteAction; +import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; +import org.opendaylight.controller.remote.rpc.messages.RpcResponse; +import org.opendaylight.mdsal.dom.api.DOMActionResult; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +/** + * Actor receiving invocation requests from remote nodes, routing them to + * {@link DOMRpcService#invokeRpc(SchemaPath, NormalizedNode)} and + * {@link DOMActionService#invokeAction(SchemaPath, DOMDataTreeIdentifier, ContainerNode)}. + * + *

+ * Note that while the two interfaces are very similar, invocation strategies are slightly different due to historic + * behavior of RPCs: + *

    + *
  • RPCs allow both null input and output, and this is passed to the infrastructure. Furthermore any invocation + * which results in errors being reported drops the output content, even if it is present -- which is wrong, as + * 'errors' in this case can also be just warnings.
  • + *
  • Actions do not allow null input, but allow null output. If the output is present, it is passed along with any + * errors reported.
  • + *
+ */ +final class OpsInvoker extends AbstractUntypedActor { + private final DOMRpcService rpcService; + private final DOMActionService actionService; + + private OpsInvoker(final DOMRpcService rpcService, final DOMActionService actionService) { + this.rpcService = requireNonNull(rpcService); + this.actionService = requireNonNull(actionService); + } + + public static Props props(final DOMRpcService rpcService, final DOMActionService actionService) { + return Props.create(OpsInvoker.class, + requireNonNull(rpcService, "DOMRpcService can not be null"), + requireNonNull(actionService, "DOMActionService can not be null")); + } + + @Override + protected void handleReceive(final Object message) { + if (message instanceof ExecuteRpc) { + LOG.debug("Handling ExecuteOps Message"); + execute((ExecuteRpc) message); + } else if (message instanceof ExecuteAction) { + execute((ExecuteAction) message); + } else { + unknownMessage(message); + } + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void execute(final ExecuteRpc msg) { + LOG.debug("Executing RPC {}", msg.getType()); + final ActorRef sender = getSender(); + + final ListenableFuture future; + try { + future = rpcService.invokeRpc(msg.getType(), msg.getInput()); + } catch (final RuntimeException e) { + LOG.debug("Failed to invoke RPC {}", msg.getType(), e); + sender.tell(new Failure(e), self()); + return; + } + + Futures.addCallback(future, new AbstractCallback(getSender(), msg.getType()) { + @Override + Object nullResponse(final SchemaPath type) { + LOG.warn("Execution of {} resulted in null result", type); + return new RpcResponse(null); + } + + @Override + Object response(final SchemaPath type, final DOMRpcResult result) { + final Collection errors = result.getErrors(); + return errors.isEmpty() ? new RpcResponse(result.getResult()) + // This is legacy (wrong) behavior, which ignores the fact that errors may be just warnings, + // discarding any output + : new Failure(new RpcErrorsException(String.format("Execution of rpc %s failed", type), + errors)); + } + }, MoreExecutors.directExecutor()); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void execute(final ExecuteAction msg) { + LOG.debug("Executing Action {}", msg.getType()); + + final ActorRef sender = getSender(); + + final ListenableFuture future; + try { + future = actionService.invokeAction(msg.getType(), msg.getPath(), msg.getInput()); + } catch (final RuntimeException e) { + LOG.debug("Failed to invoke action {}", msg.getType(), e); + sender.tell(new Failure(e), self()); + return; + } + + Futures.addCallback(future, new AbstractCallback(getSender(), msg.getType()) { + @Override + Object nullResponse(final SchemaPath type) { + throw new IllegalStateException("Null invocation result of action " + type); + } + + @Override + Object response(final SchemaPath type, final DOMActionResult result) { + final Collection errors = result.getErrors(); + return errors.isEmpty() ? new ActionResponse(result.getOutput(), result.getErrors()) + // This is legacy (wrong) behavior, which ignores the fact that errors may be just warnings, + // discarding any output + : new Failure(new RpcErrorsException(String.format("Execution of action %s failed", type), + errors)); + } + }, MoreExecutors.directExecutor()); + } + + private abstract class AbstractCallback implements FutureCallback { + private final ActorRef replyTo; + private final SchemaPath type; + + AbstractCallback(final ActorRef replyTo, final SchemaPath type) { + this.replyTo = requireNonNull(replyTo); + this.type = requireNonNull(type); + } + + @Override + public final void onSuccess(final T result) { + final Object response; + if (result == null) { + // This shouldn't happen but the FutureCallback annotates the result param with Nullable so handle null + // here to avoid FindBugs warning. + response = nullResponse(type); + } else { + response = response(type, result); + } + + LOG.debug("Sending response for execution of {} : {}", type, response); + replyTo.tell(response, self()); + } + + @Override + public final void onFailure(final Throwable failure) { + LOG.debug("Failed to execute operation {}", type, failure); + LOG.error("Failed to execute operation {} due to {}. More details are available on DEBUG level.", type, + Throwables.getRootCause(failure).getMessage()); + replyTo.tell(new Failure(failure), self()); + } + + abstract @NonNull Object nullResponse(@NonNull SchemaPath type); + + abstract @NonNull Object response(@NonNull SchemaPath type, @NonNull T result); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsListener.java similarity index 59% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsListener.java index 20f32cb0da..8c5004a8d0 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsListener.java @@ -12,9 +12,14 @@ import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import java.util.Collection; +import java.util.Set; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.mdsal.dom.api.DOMActionAvailabilityExtension; +import org.opendaylight.mdsal.dom.api.DOMActionImplementation; +import org.opendaylight.mdsal.dom.api.DOMActionInstance; import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener; import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; import org.opendaylight.mdsal.dom.api.DOMRpcImplementation; @@ -22,17 +27,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A {@link DOMRpcAvailabilityListener} reacting to RPC implementations different than {@link RemoteRpcImplementation}. - * The knowledge of such implementations is forwarded to {@link RpcRegistry}, which is responsible for advertising - * their presence to other nodes. + * A {@link DOMRpcAvailabilityListener} reacting to RPC implementations different than + * {@link RemoteRpcImplementation} or {@link RemoteActionImplementation}. + * The knowledge of such implementations is forwarded to {@link RpcRegistry} and {@link ActionRegistry}, + * which is responsible for advertising their presence to other nodes. */ -final class RpcListener implements DOMRpcAvailabilityListener { - private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class); +final class OpsListener implements DOMRpcAvailabilityListener, DOMActionAvailabilityExtension.AvailabilityListener { + private static final Logger LOG = LoggerFactory.getLogger(OpsListener.class); private final ActorRef rpcRegistry; + private final ActorRef actionRegistry; - RpcListener(final ActorRef rpcRegistry) { + OpsListener(final ActorRef rpcRegistry, final ActorRef actionRegistry) { this.rpcRegistry = requireNonNull(rpcRegistry); + this.actionRegistry = requireNonNull(actionRegistry); } @Override @@ -55,4 +63,16 @@ final class RpcListener implements DOMRpcAvailabilityListener { public boolean acceptsImplementation(final DOMRpcImplementation impl) { return !(impl instanceof RemoteRpcImplementation); } + + @Override + public boolean acceptsImplementation(final DOMActionImplementation impl) { + return !(impl instanceof RemoteActionImplementation); + } + + @Override + public void onActionsChanged(final Set removed, final Set added) { + LOG.debug("adding registration for [{}]", added); + LOG.debug("removing registration for [{}]", removed); + actionRegistry.tell(new ActionRegistry.Messages.UpdateActions(added, removed), ActorRef.noSender()); + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsManager.java new file mode 100644 index 0000000000..ea085651b4 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsManager.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorRef; +import akka.actor.OneForOneStrategy; +import akka.actor.Props; +import akka.actor.SupervisorStrategy; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.mdsal.dom.api.DOMActionProviderService; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; +import org.opendaylight.mdsal.dom.api.DOMRpcService; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import scala.concurrent.duration.FiniteDuration; + +/** + * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers + * {@link OpsListener} with the local {@link DOMRpcService}. + */ +public class OpsManager extends AbstractUntypedActor { + private final DOMRpcProviderService rpcProvisionRegistry; + private final RemoteOpsProviderConfig config; + private final DOMRpcService rpcServices; + private DOMActionProviderService actionProvisionRegistry; + private DOMActionService actionService; + + private ListenerRegistration listenerReg; + private ActorRef opsInvoker; + private ActorRef actionRegistry; + private ActorRef rpcRegistry; + private ActorRef opsRegistrar; + + OpsManager(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices, + final RemoteOpsProviderConfig config, final DOMActionProviderService actionProviderService, + final DOMActionService actionService) { + this.rpcProvisionRegistry = requireNonNull(rpcProvisionRegistry); + this.rpcServices = requireNonNull(rpcServices); + this.config = requireNonNull(config); + this.actionProvisionRegistry = requireNonNull(actionProviderService); + this.actionService = requireNonNull(actionService); + } + + public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices, + final RemoteOpsProviderConfig config, + final DOMActionProviderService actionProviderService, + final DOMActionService actionService) { + requireNonNull(rpcProvisionRegistry, "RpcProviderService can not be null!"); + requireNonNull(rpcServices, "RpcService can not be null!"); + requireNonNull(config, "RemoteOpsProviderConfig can not be null!"); + requireNonNull(actionProviderService, "ActionProviderService can not be null!"); + requireNonNull(actionService, "ActionService can not be null!"); + return Props.create(OpsManager.class, rpcProvisionRegistry, rpcServices, config, + actionProviderService, actionService); + } + + @Override + public void preStart() throws Exception { + super.preStart(); + + opsInvoker = getContext().actorOf(OpsInvoker.props(rpcServices, actionService) + .withMailbox(config.getMailBoxName()), config.getRpcBrokerName()); + LOG.debug("Listening for RPC invocation requests with {}", opsInvoker); + + opsRegistrar = getContext().actorOf(OpsRegistrar.props(config, rpcProvisionRegistry, actionProvisionRegistry) + .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName()); + LOG.debug("Registering remote RPCs with {}", opsRegistrar); + + rpcRegistry = getContext().actorOf(RpcRegistry.props(config, opsInvoker, opsRegistrar) + .withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); + LOG.debug("Propagating RPC information with {}", rpcRegistry); + + actionRegistry = getContext().actorOf(RpcRegistry.props(config, opsInvoker, opsRegistrar) + .withMailbox(config.getMailBoxName()), config.getActionRegistryName()); + LOG.debug("Propagating RPC information with {}", actionRegistry); + + final OpsListener opsListener = new OpsListener(rpcRegistry, actionRegistry); + LOG.debug("Registering local availability listener {}", opsListener); + listenerReg = rpcServices.registerRpcListener(opsListener); + } + + @Override + public void postStop() throws Exception { + if (listenerReg != null) { + listenerReg.close(); + listenerReg = null; + } + + super.postStop(); + } + + @Override + protected void handleReceive(final Object message) { + unknownMessage(message); + } + + @Override + public SupervisorStrategy supervisorStrategy() { + return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), t -> { + LOG.error("An exception happened actor will be resumed", t); + return SupervisorStrategy.resume(); + }); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsRegistrar.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsRegistrar.java new file mode 100644 index 0000000000..fdda197c3b --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsRegistrar.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import static java.util.Objects.requireNonNull; + +import akka.actor.Address; +import akka.actor.Props; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint; +import org.opendaylight.mdsal.dom.api.DOMActionImplementation; +import org.opendaylight.mdsal.dom.api.DOMActionProviderService; +import org.opendaylight.mdsal.dom.api.DOMRpcImplementation; +import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; +import org.opendaylight.yangtools.concepts.ObjectRegistration; + +/** + * Actor handling registration of RPCs and Actions available on remote nodes with the local + * {@link DOMRpcProviderService} and {@link DOMActionProviderService}. + */ +final class OpsRegistrar extends AbstractUntypedActor { + private final Map> rpcRegs = new HashMap<>(); + private final Map> actionRegs = new HashMap<>(); + private final DOMRpcProviderService rpcProviderService; + private final RemoteOpsProviderConfig config; + private final DOMActionProviderService actionProviderService; + + OpsRegistrar(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService, + final DOMActionProviderService actionProviderService) { + this.config = requireNonNull(config); + this.rpcProviderService = requireNonNull(rpcProviderService); + this.actionProviderService = requireNonNull(actionProviderService); + } + + public static Props props(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService, + final DOMActionProviderService actionProviderService) { + return Props.create(OpsRegistrar.class, requireNonNull(config), + requireNonNull(rpcProviderService, "DOMRpcProviderService cannot be null"), + requireNonNull(actionProviderService, "DOMActionProviderService cannot be null")); + } + + @Override + public void postStop() throws Exception { + rpcRegs.clear(); + actionRegs.clear(); + + super.postStop(); + } + + @Override + protected void handleReceive(final Object message) { + if (message instanceof UpdateRemoteEndpoints) { + LOG.debug("Handling updateRemoteEndpoints message"); + updateRemoteRpcEndpoints(((UpdateRemoteEndpoints) message).getRpcEndpoints()); + } else if (message instanceof UpdateRemoteActionEndpoints) { + LOG.debug("Handling updateRemoteActionEndpoints message"); + updateRemoteActionEndpoints(((UpdateRemoteActionEndpoints) message).getActionEndpoints()); + } else { + unknownMessage(message); + } + } + + private void updateRemoteRpcEndpoints(final Map> rpcEndpoints) { + /* + * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close + * the old registration. This minimizes churn observed by listeners, as they will not observe RPC + * unavailability which would occur if we were to do it the other way around. + * + * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap, + * hence we register all new implementations before closing all registrations. + */ + for (Entry> e : rpcEndpoints.entrySet()) { + LOG.debug("Updating RPC registrations for {}", e.getKey()); + + final Optional maybeEndpoint = e.getValue(); + if (maybeEndpoint.isPresent()) { + final RemoteRpcEndpoint endpoint = maybeEndpoint.get(); + final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config); + rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl, + endpoint.getRpcs())); + } else { + rpcRegs.remove(e.getKey()); + } + } + } + + /** + * Updates the action endpoints, Adding new registrations first before removing previous registrations. + */ + private void updateRemoteActionEndpoints(final Map> actionEndpoints) { + /* + * Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close + * the old registration. This minimizes churn observed by listeners, as they will not observe RPC + * unavailability which would occur if we were to do it the other way around. + * + * Note that when an Action moves from one remote node to another, we also do not want to expose the gap, + * hence we register all new implementations before closing all registrations. + */ + for (Entry> e : actionEndpoints.entrySet()) { + LOG.debug("Updating Action registrations for {}", e.getKey()); + + final Optional maybeEndpoint = e.getValue(); + if (maybeEndpoint.isPresent()) { + final RemoteActionEndpoint endpoint = maybeEndpoint.get(); + final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config); + actionRegs.put(e.getKey(), + actionProviderService.registerActionImplementation(impl, endpoint.getActions())); + } else { + actionRegs.remove(e.getKey()); + } + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteActionImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteActionImplementation.java new file mode 100644 index 0000000000..c3eeeedcd3 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteActionImplementation.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import akka.actor.ActorRef; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.remote.rpc.messages.ExecuteAction; +import org.opendaylight.mdsal.dom.api.DOMActionImplementation; +import org.opendaylight.mdsal.dom.api.DOMActionResult; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link DOMActionImplementation} which routes invocation requests to a remote invoker actor. + */ +final class RemoteActionImplementation extends AbstractRemoteImplementation + implements DOMActionImplementation { + private static final Logger LOG = LoggerFactory.getLogger(RemoteActionImplementation.class); + + RemoteActionImplementation(final ActorRef remoteInvoker, final RemoteOpsProviderConfig config) { + super(remoteInvoker, config); + } + + /** + * Routes action request to a remote invoker, which will execute the action and return with result. + */ + @Override + public ListenableFuture invokeAction(final SchemaPath type, final DOMDataTreeIdentifier path, + final ContainerNode input) { + LOG.debug("invoking action {} with path {}", type, path); + return new RemoteDOMActionFuture(type, ask(ExecuteAction.from(type, path, input))); + } + + @Override + public long invocationCost() { + return COST; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionException.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionException.java new file mode 100644 index 0000000000..b8664996f3 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionException.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import org.opendaylight.mdsal.dom.api.DOMActionException; + +public class RemoteDOMActionException extends DOMActionException { + private static final long serialVersionUID = 1L; + + RemoteDOMActionException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionFuture.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionFuture.java new file mode 100644 index 0000000000..8dbbff5bbe --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMActionFuture.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.remote.rpc.messages.ActionResponse; +import org.opendaylight.mdsal.dom.api.DOMActionException; +import org.opendaylight.mdsal.dom.api.DOMActionResult; +import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import scala.concurrent.Future; + +final class RemoteDOMActionFuture extends AbstractRemoteFuture { + RemoteDOMActionFuture(final @NonNull SchemaPath type, final @NonNull Future requestFuture) { + super(type, requestFuture); + } + + @Override + DOMActionResult processReply(final Object reply) { + if (reply instanceof ActionResponse) { + final ActionResponse actionReply = (ActionResponse) reply; + final ContainerNode output = actionReply.getOutput(); + return output == null ? new SimpleDOMActionResult(actionReply.getErrors()) + : new SimpleDOMActionResult(output, actionReply.getErrors()); + } + + return null; + } + + @Override + Class exceptionClass() { + return DOMActionException.class; + } + + @Override + DOMActionException wrapCause(final Throwable cause) { + return new RemoteDOMActionException("Exception during invoking ACTION", cause); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java index d5e46a9a84..b858f8056a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java @@ -7,94 +7,31 @@ */ package org.opendaylight.controller.remote.rpc; -import static java.util.Objects.requireNonNull; - -import akka.dispatch.OnComplete; -import com.google.common.util.concurrent.AbstractFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; import org.opendaylight.mdsal.dom.api.DOMRpcException; import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import scala.concurrent.Future; -final class RemoteDOMRpcFuture extends AbstractFuture { - - private static final Logger LOG = LoggerFactory.getLogger(RemoteDOMRpcFuture.class); - - private final QName rpcName; - - private RemoteDOMRpcFuture(final QName rpcName) { - this.rpcName = requireNonNull(rpcName, "rpcName"); - } - - public static RemoteDOMRpcFuture create(final QName rpcName) { - return new RemoteDOMRpcFuture(rpcName); - } - - protected void failNow(final Throwable error) { - LOG.debug("Failing future {} for rpc {}", this, rpcName, error); - setException(error); - } - - protected void completeWith(final Future future) { - future.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global()); +final class RemoteDOMRpcFuture extends AbstractRemoteFuture { + RemoteDOMRpcFuture(final @NonNull SchemaPath type, final @NonNull Future requestFuture) { + super(type, requestFuture); } @Override - public DOMRpcResult get() throws InterruptedException, ExecutionException { - try { - return super.get(); - } catch (ExecutionException e) { - throw mapException(e); - } + DOMRpcResult processReply(final Object reply) { + return reply instanceof RpcResponse ? new DefaultDOMRpcResult(((RpcResponse) reply).getOutput()) : null; } @Override - public DOMRpcResult get(final long timeout, final TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - try { - return super.get(timeout, unit); - } catch (final ExecutionException e) { - throw mapException(e); - } - } - - private static ExecutionException mapException(final ExecutionException ex) { - final Throwable cause = ex.getCause(); - if (cause instanceof DOMRpcException) { - return ex; - } - return new ExecutionException(ex.getMessage(), - new RemoteDOMRpcException("Exception during invoking RPC", ex.getCause())); + Class exceptionClass() { + return DOMRpcException.class; } - private final class FutureUpdater extends OnComplete { - - @Override - public void onComplete(final Throwable error, final Object reply) { - if (error != null) { - RemoteDOMRpcFuture.this.failNow(error); - } else if (reply instanceof RpcResponse) { - final RpcResponse rpcReply = (RpcResponse) reply; - final NormalizedNode result = rpcReply.getResultNormalizedNode(); - - LOG.debug("Received response for rpc {}: result is {}", rpcName, result); - - RemoteDOMRpcFuture.this.set(new DefaultDOMRpcResult(result)); - - LOG.debug("Future {} for rpc {} successfully completed", RemoteDOMRpcFuture.this, rpcName); - } else { - RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply - + "from Akka")); - } - } + @Override + DOMRpcException wrapCause(final Throwable cause) { + return new RemoteDOMRpcException("Exception during invoking RPC", cause); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProvider.java new file mode 100644 index 0000000000..dabe5be4d5 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProvider.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import org.opendaylight.mdsal.dom.api.DOMActionProviderService; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; +import org.opendaylight.mdsal.dom.api.DOMRpcService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is the base class which initialize all the actors, listeners and + * default RPc implementation so remote invocation of rpcs. + */ +public class RemoteOpsProvider implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(RemoteOpsProvider.class); + + private final DOMRpcProviderService rpcProvisionRegistry; + private final RemoteOpsProviderConfig config; + private final ActorSystem actorSystem; + private final DOMRpcService rpcService; + private final DOMActionProviderService actionProvisionRegistry; + private final DOMActionService actionService; + + private ActorRef opsManager; + + public RemoteOpsProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry, + final DOMRpcService rpcService, final RemoteOpsProviderConfig config, + final DOMActionProviderService actionProviderService, + final DOMActionService actionService) { + this.actorSystem = requireNonNull(actorSystem); + this.rpcProvisionRegistry = requireNonNull(rpcProvisionRegistry); + this.rpcService = requireNonNull(rpcService); + this.config = requireNonNull(config); + this.actionProvisionRegistry = requireNonNull(actionProviderService); + this.actionService = requireNonNull(actionService); + } + + @Override + public void close() { + if (opsManager != null) { + LOG.info("Stopping Ops Manager at {}", opsManager); + opsManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + opsManager = null; + } + } + + public void start() { + LOG.info("Starting Remote Ops service..."); + opsManager = actorSystem.actorOf(OpsManager.props(rpcProvisionRegistry, rpcService, config, + actionProvisionRegistry, actionService), config.getRpcManagerName()); + LOG.debug("Ops Manager started at {}", opsManager); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderConfig.java similarity index 78% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderConfig.java index 1d663877c5..4fe130158f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderConfig.java @@ -14,25 +14,28 @@ import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import scala.concurrent.duration.FiniteDuration; -public class RemoteRpcProviderConfig extends CommonConfig { +public class RemoteOpsProviderConfig extends CommonConfig { protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name"; protected static final String TAG_RPC_REGISTRAR_NAME = "rpc-registrar-name"; protected static final String TAG_RPC_REGISTRY_NAME = "registry-name"; + protected static final String TAG_ACTION_REGISTRY_NAME = "action-registry-name"; protected static final String TAG_RPC_MGR_NAME = "rpc-manager-name"; protected static final String TAG_RPC_BROKER_PATH = "rpc-broker-path"; protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path"; + protected static final String TAG_ACTION_REGISTRY_PATH = "action-registry-path"; protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path"; protected static final String TAG_ASK_DURATION = "ask-duration"; private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval"; private static final String TAG_RPC_REGISTRY_PERSISTENCE_ID = "rpc-registry-persistence-id"; + private static final String TAG_ACTION_REGISTRY_PERSISTENCE_ID = "action-registry-persistence-id"; //locally cached values private Timeout cachedAskDuration; private FiniteDuration cachedGossipTickInterval; - public RemoteRpcProviderConfig(final Config config) { + public RemoteOpsProviderConfig(final Config config) { super(config); } @@ -48,6 +51,10 @@ public class RemoteRpcProviderConfig extends CommonConfig { return get().getString(TAG_RPC_REGISTRY_NAME); } + public String getActionRegistryName() { + return get().getString(TAG_ACTION_REGISTRY_NAME); + } + public String getRpcManagerName() { return get().getString(TAG_RPC_MGR_NAME); } @@ -64,6 +71,14 @@ public class RemoteRpcProviderConfig extends CommonConfig { return get().getString(TAG_RPC_REGISTRY_PERSISTENCE_ID); } + public String getActionRegistryPath() { + return get().getString(TAG_ACTION_REGISTRY_PATH); + } + + public String getActionRegistryPersistenceId() { + return get().getString(TAG_ACTION_REGISTRY_PERSISTENCE_ID); + } + public String getRpcManagerPath() { return get().getString(TAG_RPC_MGR_PATH); } @@ -95,10 +110,10 @@ public class RemoteRpcProviderConfig extends CommonConfig { */ @SuppressFBWarnings(value = "BC_UNCONFIRMED_CAST_OF_RETURN_VALUE", justification = "Findbugs flags this as an unconfirmed cast of return value but the build method clearly " - + "returns RemoteRpcProviderConfig. Perhaps it's confused b/c the build method is overloaded and " + + "returns RemoteOpsProviderConfig. Perhaps it's confused b/c the build method is overloaded and " + "and differs in return type from the base class.") - public static RemoteRpcProviderConfig newInstance(final String actorSystemName, final boolean metricCaptureEnabled, - final int mailboxCapacity) { + public static RemoteOpsProviderConfig newInstance(final String actorSystemName, final boolean metricCaptureEnabled, + final int mailboxCapacity) { return new Builder(actorSystemName).metricCaptureEnabled(metricCaptureEnabled) .mailboxCapacity(mailboxCapacity).build(); } @@ -112,11 +127,13 @@ public class RemoteRpcProviderConfig extends CommonConfig { configHolder.put(TAG_RPC_BROKER_NAME, "broker"); configHolder.put(TAG_RPC_REGISTRAR_NAME, "registrar"); configHolder.put(TAG_RPC_REGISTRY_NAME, "registry"); + configHolder.put(TAG_ACTION_REGISTRY_NAME, "action-registry"); configHolder.put(TAG_RPC_MGR_NAME, "rpc"); //Actor paths configHolder.put(TAG_RPC_BROKER_PATH, "/user/rpc/broker"); configHolder.put(TAG_RPC_REGISTRY_PATH, "/user/rpc/registry"); + configHolder.put(TAG_ACTION_REGISTRY_PATH, "/user/action/registry"); configHolder.put(TAG_RPC_MGR_PATH, "/user/rpc"); //durations @@ -125,6 +142,7 @@ public class RemoteRpcProviderConfig extends CommonConfig { // persistence configHolder.put(TAG_RPC_REGISTRY_PERSISTENCE_ID, "remote-rpc-registry"); + configHolder.put(TAG_ACTION_REGISTRY_PERSISTENCE_ID, "remote-action-registry"); } public Builder gossipTickInterval(final String interval) { @@ -133,8 +151,8 @@ public class RemoteRpcProviderConfig extends CommonConfig { } @Override - public RemoteRpcProviderConfig build() { - return new RemoteRpcProviderConfig(merge()); + public RemoteOpsProviderConfig build() { + return new RemoteOpsProviderConfig(merge()); } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactory.java new file mode 100644 index 0000000000..35043c3e86 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactory.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import akka.actor.ActorSystem; +import org.opendaylight.mdsal.dom.api.DOMActionProviderService; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; +import org.opendaylight.mdsal.dom.api.DOMRpcService; + +public final class RemoteOpsProviderFactory { + private RemoteOpsProviderFactory() { + + } + + public static RemoteOpsProvider createInstance(final DOMRpcProviderService rpcProviderService, + final DOMRpcService rpcService, final ActorSystem actorSystem, + final RemoteOpsProviderConfig config, + final DOMActionProviderService actionProviderService, + final DOMActionService actionService) { + + return new RemoteOpsProvider(actorSystem, rpcProviderService, rpcService, config, + actionProviderService, actionService); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java index 71b2759991..a7370490dc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -7,11 +7,7 @@ */ package org.opendaylight.controller.remote.rpc; -import static java.util.Objects.requireNonNull; - import akka.actor.ActorRef; -import akka.pattern.Patterns; -import akka.util.Timeout; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; @@ -24,24 +20,15 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; * * @author Robert Varga */ -final class RemoteRpcImplementation implements DOMRpcImplementation { - // 0 for local, 1 for binding, 2 for remote - private static final long COST = 2; - - private final ActorRef remoteInvoker; - private final Timeout askDuration; - - RemoteRpcImplementation(final ActorRef remoteInvoker, final RemoteRpcProviderConfig config) { - this.remoteInvoker = requireNonNull(remoteInvoker); - this.askDuration = config.getAskDuration(); +final class RemoteRpcImplementation extends AbstractRemoteImplementation implements DOMRpcImplementation { + RemoteRpcImplementation(final ActorRef remoteInvoker, final RemoteOpsProviderConfig config) { + super(remoteInvoker, config); } @Override public ListenableFuture invokeRpc(final DOMRpcIdentifier rpc, final NormalizedNode input) { - final RemoteDOMRpcFuture ret = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent()); - ret.completeWith(Patterns.ask(remoteInvoker, ExecuteRpc.from(rpc, input), askDuration)); - return ret; + return new RemoteDOMRpcFuture(rpc.getType(), ask(ExecuteRpc.from(rpc, input))); } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java deleted file mode 100644 index 6bf84a9783..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import com.google.common.base.Preconditions; -import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; -import org.opendaylight.mdsal.dom.api.DOMRpcService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is the base class which initialize all the actors, listeners and - * default RPc implementation so remote invocation of rpcs. - */ -public class RemoteRpcProvider implements AutoCloseable { - - private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class); - - private final DOMRpcProviderService rpcProvisionRegistry; - private final RemoteRpcProviderConfig config; - private final ActorSystem actorSystem; - private final DOMRpcService rpcService; - - private ActorRef rpcManager; - - public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry, - final DOMRpcService rpcService, final RemoteRpcProviderConfig config) { - this.actorSystem = Preconditions.checkNotNull(actorSystem); - this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry); - this.rpcService = Preconditions.checkNotNull(rpcService); - this.config = Preconditions.checkNotNull(config); - } - - @Override - public void close() { - if (rpcManager != null) { - LOG.info("Stopping RPC Manager at {}", rpcManager); - rpcManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); - rpcManager = null; - } - } - - public void start() { - LOG.info("Starting Remote RPC service..."); - rpcManager = actorSystem.actorOf(RpcManager.props(rpcProvisionRegistry, rpcService, config), - config.getRpcManagerName()); - LOG.debug("RPC Manager started at {}", rpcManager); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java deleted file mode 100644 index 58a70016fa..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc; - -import akka.actor.ActorSystem; -import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; -import org.opendaylight.mdsal.dom.api.DOMRpcService; - -public final class RemoteRpcProviderFactory { - private RemoteRpcProviderFactory() { - - } - - public static RemoteRpcProvider createInstance(final DOMRpcProviderService rpcProviderService, - final DOMRpcService rpcService, final ActorSystem actorSystem, final RemoteRpcProviderConfig config) { - - return new RemoteRpcProvider(actorSystem, rpcProviderService, rpcService, config); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcInvoker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcInvoker.java deleted file mode 100644 index 4d3a66c00e..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcInvoker.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc; - -import akka.actor.ActorRef; -import akka.actor.Props; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; -import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; -import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.mdsal.dom.api.DOMRpcResult; -import org.opendaylight.mdsal.dom.api.DOMRpcService; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; - -/** - * Actor receiving invocation requests from remote nodes, routing them to - * {@link DOMRpcService#invokeRpc(SchemaPath, NormalizedNode)}. - */ -final class RpcInvoker extends AbstractUntypedActor { - private final DOMRpcService rpcService; - - private RpcInvoker(final DOMRpcService rpcService) { - this.rpcService = Preconditions.checkNotNull(rpcService); - } - - public static Props props(final DOMRpcService rpcService) { - Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null"); - return Props.create(RpcInvoker.class, rpcService); - } - - @Override - protected void handleReceive(final Object message) { - if (message instanceof ExecuteRpc) { - executeRpc((ExecuteRpc) message); - } else { - unknownMessage(message); - } - } - - @SuppressWarnings("checkstyle:IllegalCatch") - private void executeRpc(final ExecuteRpc msg) { - LOG.debug("Executing rpc {}", msg.getRpc()); - final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc()); - final ActorRef sender = getSender(); - final ActorRef self = self(); - - final ListenableFuture future; - try { - future = rpcService.invokeRpc(schemaPath, msg.getInputNormalizedNode()); - } catch (final RuntimeException e) { - LOG.debug("Failed to invoke RPC {}", msg.getRpc(), e); - sender.tell(new akka.actor.Status.Failure(e), sender); - return; - } - - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final DOMRpcResult result) { - if (result == null) { - // This shouldn't happen but the FutureCallback annotates the result param with Nullable so - // handle null here to avoid FindBugs warning. - LOG.debug("Got null DOMRpcResult - sending null response for execute rpc : {}", msg.getRpc()); - sender.tell(new RpcResponse(null), self); - return; - } - - if (!result.getErrors().isEmpty()) { - final String message = String.format("Execution of RPC %s failed", msg.getRpc()); - sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, result.getErrors())), - self); - } else { - LOG.debug("Sending response for execute rpc : {}", msg.getRpc()); - sender.tell(new RpcResponse(result.getResult()), self); - } - } - - @Override - public void onFailure(final Throwable failure) { - LOG.debug("Failed to execute RPC {}", msg.getRpc(), failure); - LOG.error("Failed to execute RPC {} due to {}. More details are available on DEBUG level.", - msg.getRpc(), Throwables.getRootCause(failure).getMessage()); - sender.tell(new akka.actor.Status.Failure(failure), self); - } - }, MoreExecutors.directExecutor()); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java deleted file mode 100644 index 5dbc1cdb39..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc; - -import akka.actor.ActorRef; -import akka.actor.OneForOneStrategy; -import akka.actor.Props; -import akka.actor.SupervisorStrategy; -import com.google.common.base.Preconditions; -import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; -import org.opendaylight.mdsal.dom.api.DOMRpcService; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import scala.concurrent.duration.FiniteDuration; - -/** - * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers - * {@link RpcListener} with the local {@link DOMRpcService}. - */ -public class RpcManager extends AbstractUntypedActor { - private final DOMRpcProviderService rpcProvisionRegistry; - private final RemoteRpcProviderConfig config; - private final DOMRpcService rpcServices; - - private ListenerRegistration listenerReg; - private ActorRef rpcInvoker; - private ActorRef rpcRegistry; - private ActorRef rpcRegistrar; - - RpcManager(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices, - final RemoteRpcProviderConfig config) { - this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry); - this.rpcServices = Preconditions.checkNotNull(rpcServices); - this.config = Preconditions.checkNotNull(config); - } - - public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices, - final RemoteRpcProviderConfig config) { - Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!"); - Preconditions.checkNotNull(rpcServices, "RpcService can not be null!"); - Preconditions.checkNotNull(config, "RemoteRpcProviderConfig can not be null!"); - return Props.create(RpcManager.class, rpcProvisionRegistry, rpcServices, config); - } - - @Override - public void preStart() throws Exception { - super.preStart(); - - rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices) - .withMailbox(config.getMailBoxName()), config.getRpcBrokerName()); - LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker); - - rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry) - .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName()); - LOG.debug("Registering remote RPCs with {}", rpcRegistrar); - - rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar) - .withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); - LOG.debug("Propagating RPC information with {}", rpcRegistry); - - final RpcListener rpcListener = new RpcListener(rpcRegistry); - LOG.debug("Registering local availabitility listener {}", rpcListener); - listenerReg = rpcServices.registerRpcListener(rpcListener); - } - - @Override - public void postStop() throws Exception { - if (listenerReg != null) { - listenerReg.close(); - listenerReg = null; - } - - super.postStop(); - } - - @Override - protected void handleReceive(final Object message) { - unknownMessage(message); - } - - @Override - public SupervisorStrategy supervisorStrategy() { - return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), t -> { - LOG.error("An exception happened actor will be resumed", t); - return SupervisorStrategy.resume(); - }); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcRegistrar.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcRegistrar.java deleted file mode 100644 index 5fc088df38..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcRegistrar.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc; - -import akka.actor.Address; -import akka.actor.Props; -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint; -import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration; -import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; - -/** - * Actor handling registration of RPCs available on remote nodes with the local {@link DOMRpcProviderService}. - * - * @author Robert Varga - */ -final class RpcRegistrar extends AbstractUntypedActor { - private final Map> regs = new HashMap<>(); - private final DOMRpcProviderService rpcProviderService; - private final RemoteRpcProviderConfig config; - - RpcRegistrar(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) { - this.config = Preconditions.checkNotNull(config); - this.rpcProviderService = Preconditions.checkNotNull(rpcProviderService); - } - - public static Props props(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) { - Preconditions.checkNotNull(rpcProviderService, "DOMRpcProviderService cannot be null"); - return Props.create(RpcRegistrar.class, config, rpcProviderService); - } - - @Override - public void postStop() throws Exception { - regs.values().forEach(DOMRpcImplementationRegistration::close); - regs.clear(); - - super.postStop(); - } - - @Override - protected void handleReceive(final Object message) { - if (message instanceof UpdateRemoteEndpoints) { - updateRemoteEndpoints(((UpdateRemoteEndpoints) message).getEndpoints()); - } else { - unknownMessage(message); - } - } - - private void updateRemoteEndpoints(final Map> endpoints) { - /* - * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close - * the old registration. This minimizes churn observed by listeners, as they will not observe RPC - * unavailability which would occur if we were to do it the other way around. - * - * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap, - * hence we register all new implementations before closing all registrations. - */ - final Collection> prevRegs = new ArrayList<>(endpoints.size()); - - for (Entry> e : endpoints.entrySet()) { - LOG.debug("Updating RPC registrations for {}", e.getKey()); - - final DOMRpcImplementationRegistration prevReg; - final Optional maybeEndpoint = e.getValue(); - if (maybeEndpoint.isPresent()) { - final RemoteRpcEndpoint endpoint = maybeEndpoint.get(); - final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config); - prevReg = regs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl, - endpoint.getRpcs())); - } else { - prevReg = regs.remove(e.getKey()); - } - - if (prevReg != null) { - prevRegs.add(prevReg); - } - } - - for (DOMRpcImplementationRegistration r : prevRegs) { - r.close(); - } - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java index 6590941e39..fb034300f9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.remote.rpc; import akka.actor.Terminated; @@ -21,7 +20,8 @@ public class TerminationMonitor extends UntypedAbstractActor { LOG.debug("Created TerminationMonitor"); } - @Override public void onReceive(Object message) { + @Override + public void onReceive(final Object message) { if (message instanceof Terminated) { Terminated terminated = (Terminated) message; LOG.debug("Actor terminated : {}", terminated.actor()); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractExecute.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractExecute.java new file mode 100644 index 0000000000..a23bcb3a9f --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractExecute.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import com.google.common.base.MoreObjects.ToStringHelper; +import java.io.Serializable; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +/** + * An abstract base class for invocation requests. Specialized via {@link ExecuteAction} and {@link ExecuteRpc}. + */ +public abstract class AbstractExecute> implements Serializable { + private static final long serialVersionUID = 1L; + + private final transient @NonNull SchemaPath type; + private final transient T input; + + AbstractExecute(final @NonNull SchemaPath type, final T input) { + this.type = requireNonNull(type); + this.input = input; + } + + public final @NonNull SchemaPath getType() { + return type; + } + + public final T getInput() { + return input; + } + + @Override + public final String toString() { + // We want 'type' to be always first + return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues().add("type", type)).toString(); + } + + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return helper.add("input", input); + } + + abstract Object writeReplace(); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractResponse.java new file mode 100644 index 0000000000..17897532ec --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AbstractResponse.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import java.io.Serializable; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * An abstract base class for invocation responses. Specialized via {@link ActionResponse} and {@link RpcResponse}. + */ +public abstract class AbstractResponse> implements Serializable { + private static final long serialVersionUID = 1L; + + private final transient @Nullable T output; + + public AbstractResponse(final @Nullable T output) { + this.output = output; + } + + public final @Nullable T getOutput() { + return output; + } + + abstract Object writeReplace(); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ActionResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ActionResponse.java new file mode 100644 index 0000000000..d6a23583aa --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ActionResponse.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableList; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collection; +import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +@SuppressFBWarnings({"SE_TRANSIENT_FIELD_NOT_RESTORED", "DMI_NONSERIALIZABLE_OBJECT_WRITTEN"}) +public class ActionResponse extends AbstractResponse { + private static final long serialVersionUID = 1L; + + private final transient @NonNull ImmutableList<@NonNull RpcError> errors; + + public ActionResponse(final @NonNull Optional output, @NonNull final Collection errors) { + super(output.orElse(null)); + this.errors = ImmutableList.copyOf(errors); + } + + public @NonNull ImmutableList<@NonNull RpcError> getErrors() { + return errors; + } + + @Override + Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private ActionResponse actionResponse; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + } + + Proxy(final ActionResponse actionResponse) { + this.actionResponse = requireNonNull(actionResponse); + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeObject(actionResponse.getErrors()); + SerializationUtils.writeNormalizedNode(out, actionResponse.getOutput()); + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + @SuppressWarnings("unchecked") + final ImmutableList errors = (ImmutableList) in.readObject(); + final Optional> output = SerializationUtils.readNormalizedNode(in); + actionResponse = new ActionResponse(output.map(ContainerNode.class::cast), errors); + } + + private Object readResolve() { + return actionResponse; + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteAction.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteAction.java new file mode 100644 index 0000000000..5f9d99f698 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteAction.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects.ToStringHelper; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +public final class ExecuteAction extends AbstractExecute<@NonNull ContainerNode> { + private static final long serialVersionUID = 1128904894827335676L; + + private final @NonNull DOMDataTreeIdentifier path; + + private ExecuteAction(final @NonNull SchemaPath type, final @NonNull DOMDataTreeIdentifier path, + final @NonNull ContainerNode input) { + super(type, requireNonNull(input)); + this.path = requireNonNull(path); + } + + public static @NonNull ExecuteAction from(final @NonNull SchemaPath type, @NonNull final DOMDataTreeIdentifier path, + final @NonNull ContainerNode input) { + return new ExecuteAction(type, path, input); + } + + public @NonNull DOMDataTreeIdentifier getPath() { + return path; + } + + @Override + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return super.addToStringAttributes(helper.add("path", path)); + } + + @Override + Object writeReplace() { + return new Proxy(this); + } + + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private ExecuteAction executeAction; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + + } + + Proxy(final ExecuteAction executeAction) { + this.executeAction = requireNonNull(executeAction); + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + try (NormalizedNodeDataOutput stream = NormalizedNodeInputOutput.newDataOutput(out)) { + stream.writeSchemaPath(executeAction.getType()); + // FIXME: deal with data store types? + stream.writeYangInstanceIdentifier(executeAction.getPath().getRootIdentifier()); + stream.writeOptionalNormalizedNode(executeAction.getInput()); + } + } + + @Override + public void readExternal(final ObjectInput in) throws IOException { + final NormalizedNodeDataInput stream = NormalizedNodeInputOutput.newDataInput(in); + final SchemaPath name = stream.readSchemaPath(); + final YangInstanceIdentifier path = stream.readYangInstanceIdentifier(); + final ContainerNode input = (ContainerNode) stream.readOptionalNormalizedNode().orElse(null); + + executeAction = new ExecuteAction(name, new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, path), + input); + } + + private Object readResolve() { + return verifyNotNull(executeAction); + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java index c9fb52b606..cfa0c8964f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java @@ -9,61 +9,37 @@ package org.opendaylight.controller.remote.rpc.messages; import static java.util.Objects.requireNonNull; -import com.google.common.base.MoreObjects; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.io.Serializable; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; -public final class ExecuteRpc implements Serializable { +public final class ExecuteRpc extends AbstractExecute<@Nullable NormalizedNode> { private static final long serialVersionUID = 1128904894827335676L; - @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class " - + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class " - + "aren't serialized. FindBugs does not recognize this.") - private final NormalizedNode inputNormalizedNode; - private final QName rpc; - - private ExecuteRpc(final @Nullable NormalizedNode inputNormalizedNode, final @NonNull QName rpc) { - this.rpc = requireNonNull(rpc, "rpc Qname should not be null"); - this.inputNormalizedNode = inputNormalizedNode; - } - - public static ExecuteRpc from(final @NonNull DOMRpcIdentifier rpc, final @Nullable NormalizedNode input) { - return new ExecuteRpc(input, rpc.getType().getLastComponent()); - } - - public @Nullable NormalizedNode getInputNormalizedNode() { - return inputNormalizedNode; + private ExecuteRpc(final @NonNull SchemaPath type, final @Nullable NormalizedNode input) { + super(type, input); } - public @NonNull QName getRpc() { - return rpc; - } - - private Object writeReplace() { - return new Proxy(this); + public static @NonNull ExecuteRpc from(final @NonNull DOMRpcIdentifier rpc, + final @Nullable NormalizedNode input) { + return new ExecuteRpc(rpc.getType(), input); } @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("rpc", rpc) - .add("normalizedNode", inputNormalizedNode) - .toString(); + Object writeReplace() { + return new Proxy(this); } - private static class Proxy implements Externalizable { + private static final class Proxy implements Externalizable { private static final long serialVersionUID = 1L; private ExecuteRpc executeRpc; @@ -72,25 +48,27 @@ public final class ExecuteRpc implements Serializable { // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. @SuppressWarnings("checkstyle:RedundantModifier") public Proxy() { + } Proxy(final ExecuteRpc executeRpc) { - this.executeRpc = executeRpc; + this.executeRpc = requireNonNull(executeRpc); } @Override public void writeExternal(final ObjectOutput out) throws IOException { try (NormalizedNodeDataOutput stream = NormalizedNodeInputOutput.newDataOutput(out)) { - stream.writeQName(executeRpc.getRpc()); - stream.writeOptionalNormalizedNode(executeRpc.getInputNormalizedNode()); + stream.writeQName(executeRpc.getType().getLastComponent()); + stream.writeOptionalNormalizedNode(executeRpc.getInput()); } } @Override - public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + public void readExternal(final ObjectInput in) throws IOException { final NormalizedNodeDataInput stream = NormalizedNodeInputOutput.newDataInput(in); - final QName qname = stream.readQName(); - executeRpc = new ExecuteRpc(stream.readOptionalNormalizedNode().orElse(null), qname); + final SchemaPath type = SchemaPath.ROOT.createChild(stream.readQName()); + final NormalizedNode input = stream.readOptionalNormalizedNode().orElse(null); + executeRpc = new ExecuteRpc(type, input); } private Object readResolve() { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java index 6f1b44b8f8..f141f09b3c 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java @@ -7,33 +7,23 @@ */ package org.opendaylight.controller.remote.rpc.messages; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.io.Serializable; import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public class RpcResponse implements Serializable { +public class RpcResponse extends AbstractResponse> { private static final long serialVersionUID = -4211279498688989245L; - @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class " - + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class " - + "aren't serialized. FindBugs does not recognize this.") - private final NormalizedNode resultNormalizedNode; - - public RpcResponse(final @Nullable NormalizedNode inputNormalizedNode) { - resultNormalizedNode = inputNormalizedNode; - } - - public @Nullable NormalizedNode getResultNormalizedNode() { - return resultNormalizedNode; + public RpcResponse(final @Nullable NormalizedNode output) { + super(output); } - private Object writeReplace() { + @Override + Object writeReplace() { return new Proxy(this); } @@ -54,7 +44,7 @@ public class RpcResponse implements Serializable { @Override public void writeExternal(final ObjectOutput out) throws IOException { - SerializationUtils.writeNormalizedNode(out, rpcResponse.getResultNormalizedNode()); + SerializationUtils.writeNormalizedNode(out, rpcResponse.getOutput()); } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/AbstractRoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/AbstractRoutingTable.java new file mode 100644 index 0000000000..c25ee44b4a --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/AbstractRoutingTable.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry; + +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorRef; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableSet; +import java.io.Serializable; +import java.util.Collection; +import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData; + +/** + * Common class for routing tables. + * + * @param Table type + * @param Item type + */ +public abstract class AbstractRoutingTable, I> implements BucketData, + Serializable { + private static final long serialVersionUID = 1L; + + private final @NonNull ActorRef invoker; + private final @NonNull ImmutableSet items; + + AbstractRoutingTable(final ActorRef invoker, final Collection items) { + this.invoker = requireNonNull(invoker); + this.items = ImmutableSet.copyOf(items); + } + + @Override + public final Optional getWatchActor() { + return Optional.of(invoker); + } + + public final @NonNull ImmutableSet getItems() { + return items; + } + + final @NonNull ActorRef getInvoker() { + return invoker; + } + + @VisibleForTesting + public final boolean contains(final I routeId) { + return items.contains(routeId); + } + + @VisibleForTesting + public final int size() { + return items.size(); + } + + abstract Object writeReplace(); + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("invoker", invoker).add("items", items).toString(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistry.java new file mode 100644 index 0000000000..73a471c5b2 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistry.java @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry; + +import akka.actor.ActorRef; +import akka.actor.Address; +import akka.actor.Props; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor; +import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteActionRegistryMXBeanImpl; +import org.opendaylight.mdsal.dom.api.DOMActionInstance; + +/** + * Registry to look up cluster nodes that have registered for a given Action. + * + *

+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this + * cluster wide information. + */ +public class ActionRegistry extends BucketStoreActor { + private final ActorRef rpcRegistrar; + private final RemoteActionRegistryMXBeanImpl mxBean; + + public ActionRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, + final ActorRef rpcRegistrar) { + super(config, config.getRpcRegistryPersistenceId(), new ActionRoutingTable(rpcInvoker, ImmutableSet.of())); + this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar); + this.mxBean = new RemoteActionRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(), + config.getAskDuration()), config.getAskDuration()); + } + + /** + * Create a new props instance for instantiating an ActionRegistry actor. + * + * @param config Provider configuration + * @param opsRegistrar Local RPC provider interface, used to register routers to remote nodes + * @param opsInvoker Actor handling RPC invocation requests from remote nodes + * @return A new {@link Props} instance + */ + public static Props props(final RemoteOpsProviderConfig config, final ActorRef opsInvoker, + final ActorRef opsRegistrar) { + return Props.create(ActionRegistry.class, config, opsInvoker, opsRegistrar); + } + + @Override + public void postStop() { + super.postStop(); + this.mxBean.unregister(); + } + + @Override + protected void handleCommand(final Object message) throws Exception { + if (message instanceof ActionRegistry.Messages.UpdateActions) { + LOG.debug("handling updatesActionRoutes message"); + updatesActionRoutes((Messages.UpdateActions) message); + } else { + super.handleCommand(message); + } + } + + private void updatesActionRoutes(final Messages.UpdateActions msg) { + LOG.debug("addedActions: {}", msg.getAddedActions()); + LOG.debug("removedActions: {}", msg.getRemovedActions()); + updateLocalBucket(getLocalData().updateActions(msg.getAddedActions(), msg.getRemovedActions())); + } + + @Override + protected void onBucketRemoved(final Address address, final Bucket bucket) { + rpcRegistrar.tell(new Messages.UpdateRemoteActionEndpoints(ImmutableMap.of(address, Optional.empty())), + ActorRef.noSender()); + } + + @Override + protected void onBucketsUpdated(final Map> buckets) { + LOG.debug("Updating buckets for action registry"); + final Map> endpoints = new HashMap<>(buckets.size()); + + for (Map.Entry> e : buckets.entrySet()) { + final ActionRoutingTable table = e.getValue().getData(); + + final Collection actions = table.getItems(); + endpoints.put(e.getKey(), actions.isEmpty() ? Optional.empty() + : Optional.of(new RemoteActionEndpoint(table.getInvoker(), actions))); + } + + if (!endpoints.isEmpty()) { + rpcRegistrar.tell(new Messages.UpdateRemoteActionEndpoints(endpoints), ActorRef.noSender()); + } + } + + public static final class RemoteActionEndpoint { + private final Set actions; + private final ActorRef router; + + @VisibleForTesting + public RemoteActionEndpoint(final ActorRef router, final Collection actions) { + this.router = Preconditions.checkNotNull(router); + this.actions = ImmutableSet.copyOf(actions); + } + + public ActorRef getRouter() { + return router; + } + + public Set getActions() { + return actions; + } + } + + /** + * All messages used by the ActionRegistry. + */ + public static class Messages { + abstract static class AbstractActionRouteMessage { + final Collection addedActions; + final Collection removedActions; + + AbstractActionRouteMessage(final Collection addedActions, + final Collection removedActions) { + this.addedActions = ImmutableList.copyOf(addedActions); + this.removedActions = ImmutableList.copyOf(removedActions); + } + + Collection getAddedActions() { + return this.addedActions; + } + + Collection getRemovedActions() { + return this.removedActions; + } + + + @Override + public String toString() { + return "ContainsRoute{" + "addedActions=" + addedActions + " removedActions=" + removedActions + '}'; + } + } + + + public static final class UpdateActions extends AbstractActionRouteMessage { + public UpdateActions(final Collection addedActions, + final Collection removedActions) { + super(addedActions, removedActions); + } + + } + + public static final class UpdateRemoteActionEndpoints { + private final Map> actionEndpoints; + + @VisibleForTesting + public UpdateRemoteActionEndpoints(final Map> + actionEndpoints) { + this.actionEndpoints = ImmutableMap.copyOf(actionEndpoints); + } + + public Map> getActionEndpoints() { + return actionEndpoints; + } + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRoutingTable.java new file mode 100644 index 0000000000..3c968599ea --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ActionRoutingTable.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry; + +import akka.actor.ActorRef; +import akka.serialization.JavaSerializer; +import akka.serialization.Serialization; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMActionInstance; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ActionRoutingTable extends AbstractRoutingTable { + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(ActionRoutingTable.class); + + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.") + private Collection actions; + private ActorRef opsInvoker; + + // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to + // be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final ActionRoutingTable table) { + actions = table.getItems(); + opsInvoker = table.getInvoker(); + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + LOG.debug("serializing ActionRoutingTable."); + out.writeObject(Serialization.serializedActorPath(opsInvoker)); + + final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out); + nnout.writeInt(actions.size()); + for (DOMActionInstance id : actions) { + nnout.writeSchemaPath(id.getType()); + YangInstanceIdentifier actionPath = YangInstanceIdentifier.create( + new YangInstanceIdentifier.NodeIdentifier(id.getType().getLastComponent())); + nnout.writeYangInstanceIdentifier(actionPath); + } + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + LOG.debug("deserializing ActionRoutingTable"); + opsInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject()); + + final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in); + final int size = nnin.readInt(); + actions = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + actions.add(DOMActionInstance.of(nnin.readSchemaPath(), LogicalDatastoreType.OPERATIONAL, + nnin.readYangInstanceIdentifier())); + } + } + + private Object readResolve() { + return new ActionRoutingTable(opsInvoker, actions); + } + } + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(ActionRoutingTable.class); + + ActionRoutingTable(final ActorRef invoker, final Collection actions) { + super(invoker, actions); + } + + ActionRoutingTable updateActions(final Collection toAdd, + final Collection toRemove) { + LOG.debug("Updating actions in ActionRoutingTable"); + final Set newActions = new HashSet<>(getItems()); + newActions.addAll(toAdd); + newActions.removeAll(toRemove); + return new ActionRoutingTable(getInvoker(), newActions); + } + + @Override + Object writeReplace() { + return new Proxy(this); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java index 5159b96876..8d67b3bdd3 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -10,33 +10,27 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; import akka.serialization.JavaSerializer; import akka.serialization.Serialization; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; -import java.util.Optional; import java.util.Set; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; -import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData; import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; -public final class RoutingTable implements BucketData, Serializable { +public final class RoutingTable extends AbstractRoutingTable { private static final class Proxy implements Externalizable { private static final long serialVersionUID = 1L; @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.") private Collection rpcs; - private ActorRef rpcInvoker; + private ActorRef opsInvoker; // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to // be able to create instances via reflection. @@ -46,13 +40,13 @@ public final class RoutingTable implements BucketData, Serializabl } Proxy(final RoutingTable table) { - rpcs = table.getRoutes(); - rpcInvoker = table.getRpcInvoker(); + rpcs = table.getItems(); + opsInvoker = table.getInvoker(); } @Override public void writeExternal(final ObjectOutput out) throws IOException { - out.writeObject(Serialization.serializedActorPath(rpcInvoker)); + out.writeObject(Serialization.serializedActorPath(opsInvoker)); final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out); nnout.writeInt(rpcs.size()); @@ -64,7 +58,7 @@ public final class RoutingTable implements BucketData, Serializabl @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { - rpcInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject()); + opsInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject()); final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in); final int size = nnin.readInt(); @@ -75,62 +69,30 @@ public final class RoutingTable implements BucketData, Serializabl } private Object readResolve() { - return new RoutingTable(rpcInvoker, rpcs); + return new RoutingTable(opsInvoker, rpcs); } } private static final long serialVersionUID = 1L; - @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.") - private final Set rpcs; - private final ActorRef rpcInvoker; - - RoutingTable(final ActorRef rpcInvoker, final Collection table) { - this.rpcInvoker = Preconditions.checkNotNull(rpcInvoker); - this.rpcs = ImmutableSet.copyOf(table); - } - - @Override - public Optional getWatchActor() { - return Optional.of(rpcInvoker); - } - - public Set getRoutes() { - return rpcs; - } - - ActorRef getRpcInvoker() { - return rpcInvoker; + RoutingTable(final ActorRef invoker, final Collection table) { + super(invoker, table); } RoutingTable addRpcs(final Collection toAdd) { - final Set newRpcs = new HashSet<>(rpcs); + final Set newRpcs = new HashSet<>(getItems()); newRpcs.addAll(toAdd); - return new RoutingTable(rpcInvoker, newRpcs); + return new RoutingTable(getInvoker(), newRpcs); } RoutingTable removeRpcs(final Collection toRemove) { - final Set newRpcs = new HashSet<>(rpcs); + final Set newRpcs = new HashSet<>(getItems()); newRpcs.removeAll(toRemove); - return new RoutingTable(rpcInvoker, newRpcs); - } - - private Object writeReplace() { - return new Proxy(this); - } - - @VisibleForTesting - boolean contains(final DOMRpcIdentifier routeId) { - return rpcs.contains(routeId); - } - - @VisibleForTesting - int size() { - return rpcs.size(); + return new RoutingTable(getInvoker(), newRpcs); } @Override - public String toString() { - return "RoutingTable{" + "rpcs=" + rpcs + ", rpcInvoker=" + rpcInvoker + '}'; + Object writeReplace() { + return new Proxy(this); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 5ba97a306f..68fead4407 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -22,10 +22,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor; @@ -43,7 +42,7 @@ public class RpcRegistry extends BucketStoreActor { private final ActorRef rpcRegistrar; private final RemoteRpcRegistryMXBeanImpl mxBean; - public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { + public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of())); this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar); this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(), @@ -58,8 +57,8 @@ public class RpcRegistry extends BucketStoreActor { * @param rpcInvoker Actor handling RPC invocation requests from remote nodes * @return A new {@link Props} instance */ - public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, - final ActorRef rpcRegistrar) { + public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, + final ActorRef rpcRegistrar) { return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar); } @@ -97,7 +96,8 @@ public class RpcRegistry extends BucketStoreActor { @Override protected void onBucketRemoved(final Address address, final Bucket bucket) { - rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender()); + rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), + ActorRef.noSender()); } @Override @@ -107,13 +107,13 @@ public class RpcRegistry extends BucketStoreActor { for (Entry> e : buckets.entrySet()) { final RoutingTable table = e.getValue().getData(); - final Collection rpcs = table.getRoutes(); + final Collection rpcs = table.getItems(); endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty() - : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs))); + : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs))); } if (!endpoints.isEmpty()) { - rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); + rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); } } @@ -141,46 +141,48 @@ public class RpcRegistry extends BucketStoreActor { */ public static class Messages { abstract static class AbstractRouteMessage { - final List routeIdentifiers; + final List rpcRouteIdentifiers; - AbstractRouteMessage(final Collection routeIdentifiers) { - Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(), + AbstractRouteMessage(final Collection rpcRouteIdentifiers) { + Preconditions.checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(), "Route Identifiers must be supplied"); - this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers); + this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers); } List getRouteIdentifiers() { - return this.routeIdentifiers; + return this.rpcRouteIdentifiers; } @Override public String toString() { - return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}'; + return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}'; } } - public static final class AddOrUpdateRoutes extends AbstractRouteMessage { - public AddOrUpdateRoutes(final Collection routeIdentifiers) { - super(routeIdentifiers); + public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage { + public AddOrUpdateRoutes(final Collection rpcRouteIdentifiers) { + super(rpcRouteIdentifiers); } + } public static final class RemoveRoutes extends AbstractRouteMessage { - public RemoveRoutes(final Collection routeIdentifiers) { - super(routeIdentifiers); + public RemoveRoutes(final Collection rpcRouteIdentifiers) { + super(rpcRouteIdentifiers); } } public static final class UpdateRemoteEndpoints { - private final Map> endpoints; + private final Map> rpcEndpoints; + @VisibleForTesting - public UpdateRemoteEndpoints(final Map> endpoints) { - this.endpoints = ImmutableMap.copyOf(endpoints); + public UpdateRemoteEndpoints(final Map> rpcEndpoints) { + this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints); } - public Map> getEndpoints() { - return endpoints; + public Map> getRpcEndpoints() { + return rpcEndpoints; } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java index 9a84b91300..2b07d1b7e4 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java @@ -37,7 +37,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.function.Consumer; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; /** * A store that syncs its data across nodes in the cluster. @@ -72,7 +72,7 @@ public abstract class BucketStoreActor> extends */ private final SetMultimap watchedActors = HashMultimap.create(1, 1); - private final RemoteRpcProviderConfig config; + private final RemoteOpsProviderConfig config; private final String persistenceId; /** @@ -88,7 +88,7 @@ public abstract class BucketStoreActor> extends private Integer incarnation; private boolean persisting; - protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) { + protected BucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) { this.config = Preconditions.checkNotNull(config); this.initialData = Preconditions.checkNotNull(initialData); this.persistenceId = Preconditions.checkNotNull(persistenceId); @@ -212,7 +212,7 @@ public abstract class BucketStoreActor> extends } } - protected final RemoteRpcProviderConfig getConfig() { + protected final RemoteOpsProviderConfig getConfig() { return config; } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 56060dd714..e75bef24c7 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -31,7 +31,7 @@ import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; import scala.concurrent.duration.FiniteDuration; /** @@ -61,7 +61,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { }; private final boolean autoStartGossipTicks; - private final RemoteRpcProviderConfig config; + private final RemoteOpsProviderConfig config; /** * All known cluster members. @@ -84,20 +84,20 @@ public class Gossiper extends AbstractUntypedActorWithMetering { private BucketStoreAccess bucketStore; - Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) { + Gossiper(final RemoteOpsProviderConfig config, final Boolean autoStartGossipTicks) { this.config = Preconditions.checkNotNull(config); this.autoStartGossipTicks = autoStartGossipTicks.booleanValue(); } - Gossiper(final RemoteRpcProviderConfig config) { + Gossiper(final RemoteOpsProviderConfig config) { this(config, Boolean.TRUE); } - public static Props props(final RemoteRpcProviderConfig config) { + public static Props props(final RemoteOpsProviderConfig config) { return Props.create(Gossiper.class, config); } - static Props testProps(final RemoteRpcProviderConfig config) { + static Props testProps(final RemoteOpsProviderConfig config) { return Props.create(Gossiper.class, config, Boolean.FALSE); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/AbstractRegistryMXBean.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/AbstractRegistryMXBean.java new file mode 100644 index 0000000000..bfe60e5c82 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/AbstractRegistryMXBean.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.mbeans; + +import static java.util.Objects.requireNonNull; + +import akka.actor.Address; +import akka.util.Timeout; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Map; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; +import org.opendaylight.controller.remote.rpc.registry.AbstractRoutingTable; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +abstract class AbstractRegistryMXBean, I> extends AbstractMXBean { + static final String LOCAL_CONSTANT = "local"; + static final String ROUTE_CONSTANT = "route:"; + static final String NAME_CONSTANT = " | name:"; + + @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE") + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private final BucketStoreAccess bucketAccess; + private final FiniteDuration timeout; + + AbstractRegistryMXBean(final @NonNull String beanName, final @NonNull String beanType, + final @NonNull BucketStoreAccess bucketAccess, final @NonNull Timeout timeout) { + super(beanName, beanType, null); + this.bucketAccess = requireNonNull(bucketAccess); + this.timeout = timeout.duration(); + registerMBean(); + } + + @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"}) + final T localData() { + try { + return (T) Await.result((Future) bucketAccess.getLocalData(), timeout); + } catch (Exception e) { + throw new RuntimeException("getLocalData failed", e); + } + } + + @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"}) + final Map> remoteBuckets() { + try { + return (Map>) Await.result((Future)bucketAccess.getRemoteBuckets(), timeout); + } catch (Exception e) { + throw new RuntimeException("getRemoteBuckets failed", e); + } + } + + @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"}) + final String bucketVersions() { + try { + return Await.result((Future)bucketAccess.getBucketVersions(), timeout).toString(); + } catch (Exception e) { + throw new RuntimeException("getVersions failed", e); + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBean.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBean.java new file mode 100644 index 0000000000..582638433c --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBean.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.mbeans; + +import java.util.Map; +import java.util.Set; + +public interface RemoteActionRegistryMXBean { + + String getBucketVersions(); + + Set getLocalRegisteredAction(); + + Map findActionByName(String name); + + Map findActionByRoute(String route); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImpl.java new file mode 100644 index 0000000000..cc117bd167 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImpl.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.mbeans; + +import akka.actor.Address; +import akka.util.Timeout; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.opendaylight.controller.remote.rpc.registry.ActionRoutingTable; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; +import org.opendaylight.mdsal.dom.api.DOMActionInstance; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; + +public class RemoteActionRegistryMXBeanImpl extends AbstractRegistryMXBean + implements RemoteActionRegistryMXBean { + + public RemoteActionRegistryMXBeanImpl(final BucketStoreAccess actionRegistryAccess, final Timeout timeout) { + super("RemoteActionRegistry", "RemoteActionBroker", actionRegistryAccess, timeout); + } + + @Override + public Set getLocalRegisteredAction() { + ActionRoutingTable table = localData(); + Set routedAction = new HashSet<>(table.getItems().size()); + for (DOMActionInstance route : table.getItems()) { + if (route.getType().getLastComponent() != null) { + final YangInstanceIdentifier actionPath = YangInstanceIdentifier.create( + new NodeIdentifier(route.getType().getLastComponent())); + if (!actionPath.isEmpty()) { + routedAction.add(ROUTE_CONSTANT + actionPath + NAME_CONSTANT + route.getType()); + } + } + } + + log.debug("Locally registered routed RPCs {}", routedAction); + return routedAction; + } + + @Override + public Map findActionByName(final String name) { + ActionRoutingTable localTable = localData(); + // Get all Actions from local bucket + Map rpcMap = new HashMap<>(getActionMemberMapByName(localTable, name, LOCAL_CONSTANT)); + + // Get all Actions from remote bucket + Map> buckets = remoteBuckets(); + for (Map.Entry> entry : buckets.entrySet()) { + ActionRoutingTable table = entry.getValue().getData(); + rpcMap.putAll(getActionMemberMapByName(table, name, entry.getKey().toString())); + } + + log.debug("list of Actions {} searched by name {}", rpcMap, name); + return rpcMap; + } + + @Override + public Map findActionByRoute(final String routeId) { + ActionRoutingTable localTable = localData(); + Map rpcMap = new HashMap<>(getActionMemberMapByAction(localTable, routeId, LOCAL_CONSTANT)); + + Map> buckets = remoteBuckets(); + for (Map.Entry> entry : buckets.entrySet()) { + ActionRoutingTable table = entry.getValue().getData(); + rpcMap.putAll(getActionMemberMapByAction(table, routeId, entry.getKey().toString())); + } + + log.debug("list of Actions {} searched by route {}", rpcMap, routeId); + return rpcMap; + } + + /** + * Search if the routing table route String contains routeName. + */ + private static Map getActionMemberMapByAction(final ActionRoutingTable table, + final String routeName, final String address) { + Collection routes = table.getItems(); + Map actionMap = new HashMap<>(routes.size()); + for (DOMActionInstance route : routes) { + if (route.getType().getLastComponent() != null) { + final YangInstanceIdentifier actionPath = YangInstanceIdentifier.create( + new NodeIdentifier(route.getType().getLastComponent())); + if (!actionPath.isEmpty()) { + String routeString = actionPath.toString(); + if (routeString.contains(routeName)) { + actionMap.put(ROUTE_CONSTANT + routeString + NAME_CONSTANT + route.getType(), address); + } + } + } + } + return actionMap; + } + + /** + * Search if the routing table route type contains name. + */ + private static Map getActionMemberMapByName(final ActionRoutingTable table, final String name, + final String address) { + Collection routes = table.getItems(); + Map actionMap = new HashMap<>(routes.size()); + for (DOMActionInstance route : routes) { + if (route.getType().getLastComponent() != null) { + final YangInstanceIdentifier actionPath = YangInstanceIdentifier.create( + new NodeIdentifier(route.getType().getLastComponent())); + if (!actionPath.isEmpty()) { + String type = route.getType().toString(); + if (type.contains(name)) { + actionMap.put(ROUTE_CONSTANT + actionPath + NAME_CONSTANT + type, address); + } + } + } + } + return actionMap; + } + + @Override + public String getBucketVersions() { + return bucketVersions(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java index 47903bb114..dc4ee8fd86 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.remote.rpc.registry.mbeans; import java.util.Map; @@ -22,7 +21,7 @@ public interface RemoteRpcRegistryMXBean { Set getLocalRegisteredRoutedRpc(); - Map findRpcByName(String name); + Map findRpcByName(String name); - Map findRpcByRoute(String route); + Map findRpcByRoute(String route); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java index 87adef3d9c..d5c72fcea3 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java @@ -9,67 +9,27 @@ package org.opendaylight.controller.remote.rpc.registry.mbeans; import akka.actor.Address; import akka.util.Timeout; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; import org.opendaylight.controller.remote.rpc.registry.RoutingTable; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.Future; - -public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean { - - @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE") - protected final Logger log = LoggerFactory.getLogger(getClass()); - - private static final String LOCAL_CONSTANT = "local"; - - private static final String ROUTE_CONSTANT = "route:"; - - private static final String NAME_CONSTANT = " | name:"; - - private final BucketStoreAccess rpcRegistryAccess; - private final Timeout timeout; +public class RemoteRpcRegistryMXBeanImpl extends AbstractRegistryMXBean + implements RemoteRpcRegistryMXBean { public RemoteRpcRegistryMXBeanImpl(final BucketStoreAccess rpcRegistryAccess, final Timeout timeout) { - super("RemoteRpcRegistry", "RemoteRpcBroker", null); - this.rpcRegistryAccess = rpcRegistryAccess; - this.timeout = timeout; - registerMBean(); - } - - @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"}) - private RoutingTable getLocalData() { - try { - return (RoutingTable) Await.result((Future) rpcRegistryAccess.getLocalData(), timeout.duration()); - } catch (Exception e) { - throw new RuntimeException("getLocalData failed", e); - } - } - - @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"}) - private Map> getRemoteBuckets() { - try { - return (Map>) Await.result((Future)rpcRegistryAccess.getRemoteBuckets(), - timeout.duration()); - } catch (Exception e) { - throw new RuntimeException("getRemoteBuckets failed", e); - } + super("RemoteRpcRegistry", "RemoteRpcBroker", rpcRegistryAccess, timeout); } @Override public Set getGlobalRpc() { - RoutingTable table = getLocalData(); - Set globalRpc = new HashSet<>(table.getRoutes().size()); - for (DOMRpcIdentifier route : table.getRoutes()) { + RoutingTable table = localData(); + Set globalRpc = new HashSet<>(table.getItems().size()); + for (DOMRpcIdentifier route : table.getItems()) { if (route.getContextReference().isEmpty()) { globalRpc.add(route.getType().toString()); } @@ -81,9 +41,9 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot @Override public Set getLocalRegisteredRoutedRpc() { - RoutingTable table = getLocalData(); - Set routedRpc = new HashSet<>(table.getRoutes().size()); - for (DOMRpcIdentifier route : table.getRoutes()) { + RoutingTable table = localData(); + Set routedRpc = new HashSet<>(table.getItems().size()); + for (DOMRpcIdentifier route : table.getItems()) { if (!route.getContextReference().isEmpty()) { routedRpc.add(ROUTE_CONSTANT + route.getContextReference() + NAME_CONSTANT + route.getType()); } @@ -95,12 +55,12 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot @Override public Map findRpcByName(final String name) { - RoutingTable localTable = getLocalData(); + RoutingTable localTable = localData(); // Get all RPCs from local bucket Map rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT)); // Get all RPCs from remote bucket - Map> buckets = getRemoteBuckets(); + Map> buckets = remoteBuckets(); for (Entry> entry : buckets.entrySet()) { RoutingTable table = entry.getValue().getData(); rpcMap.putAll(getRpcMemberMapByName(table, name, entry.getKey().toString())); @@ -112,10 +72,10 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot @Override public Map findRpcByRoute(final String routeId) { - RoutingTable localTable = getLocalData(); + RoutingTable localTable = localData(); Map rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT)); - Map> buckets = getRemoteBuckets(); + Map> buckets = remoteBuckets(); for (Entry> entry : buckets.entrySet()) { RoutingTable table = entry.getValue().getData(); rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, entry.getKey().toString())); @@ -129,8 +89,8 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot * Search if the routing table route String contains routeName. */ private static Map getRpcMemberMapByRoute(final RoutingTable table, final String routeName, - final String address) { - Set routes = table.getRoutes(); + final String address) { + Set routes = table.getItems(); Map rpcMap = new HashMap<>(routes.size()); for (DOMRpcIdentifier route : routes) { if (!route.getContextReference().isEmpty()) { @@ -146,9 +106,9 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot /** * Search if the routing table route type contains name. */ - private static Map getRpcMemberMapByName(final RoutingTable table, final String name, - final String address) { - Set routes = table.getRoutes(); + private static Map getRpcMemberMapByName(final RoutingTable table, final String name, + final String address) { + Set routes = table.getItems(); Map rpcMap = new HashMap<>(routes.size()); for (DOMRpcIdentifier route : routes) { if (!route.getContextReference().isEmpty()) { @@ -162,12 +122,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot } @Override - @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"}) public String getBucketVersions() { - try { - return Await.result((Future)rpcRegistryAccess.getBucketVersions(), timeout.duration()).toString(); - } catch (Exception e) { - throw new RuntimeException("getVersions failed", e); - } + return bucketVersions(); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/OSGI-INF/blueprint/remote-rpc.xml b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/OSGI-INF/blueprint/remote-rpc.xml index e6d19a19c6..c33659ae1e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/OSGI-INF/blueprint/remote-rpc.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/OSGI-INF/blueprint/remote-rpc.xml @@ -14,10 +14,12 @@ + + - @@ -26,12 +28,14 @@ - - + + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractOpsTest.java similarity index 77% rename from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractOpsTest.java index 0736fae6d7..4d96770a6a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractOpsTest.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.remote.rpc; import static org.junit.Assert.assertEquals; @@ -23,6 +22,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.mdsal.dom.api.DOMRpcService; @@ -45,26 +47,27 @@ import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils; * * @author Thomas Pantelis */ -public class AbstractRpcTest { +public class AbstractOpsTest { static final String TEST_REV = "2014-08-28"; static final String TEST_NS = "urn:test"; static final URI TEST_URI = URI.create(TEST_NS); - static final QName TEST_RPC = QName.create(TEST_NS, TEST_REV, "test-rpc"); + static final QName TEST_RPC = QName.create(TEST_NS, TEST_REV, "test-something"); static final QName TEST_RPC_INPUT = QName.create(TEST_NS, TEST_REV, "input"); static final QName TEST_RPC_INPUT_DATA = QName.create(TEST_NS, TEST_REV, "input-data"); static final QName TEST_RPC_OUTPUT = QName.create(TEST_NS, TEST_REV, "output"); - static final QName TEST_RPC_OUTPUT_DATA = QName.create(TEST_URI, "output-data"); static final SchemaPath TEST_RPC_TYPE = SchemaPath.create(true, TEST_RPC); static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create( new YangInstanceIdentifier.NodeIdentifier(TEST_RPC)); public static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH); + public static final DOMDataTreeIdentifier TEST_DATA_TREE_ID = new DOMDataTreeIdentifier( + LogicalDatastoreType.OPERATIONAL, TEST_PATH); static ActorSystem node1; static ActorSystem node2; - static RemoteRpcProviderConfig config1; - static RemoteRpcProviderConfig config2; + static RemoteOpsProviderConfig config1; + static RemoteOpsProviderConfig config2; protected ActorRef rpcInvoker1; protected TestKit rpcRegistry1Probe; @@ -73,16 +76,22 @@ public class AbstractRpcTest { protected SchemaContext schemaContext; protected RemoteRpcImplementation remoteRpcImpl1; protected RemoteRpcImplementation remoteRpcImpl2; + protected RemoteActionImplementation remoteActionImpl1; + protected RemoteActionImplementation remoteActionImpl2; @Mock protected DOMRpcService domRpcService1; @Mock + protected DOMActionService domActionService1; + @Mock protected DOMRpcService domRpcService2; + @Mock + protected DOMActionService domActionService2; @BeforeClass public static void setup() { - config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); - config2 = new RemoteRpcProviderConfig.Builder("memberB").build(); + config1 = new RemoteOpsProviderConfig.Builder("memberA").build(); + config2 = new RemoteOpsProviderConfig.Builder("memberB").build(); node1 = ActorSystem.create("opendaylight-rpc", config1.get()); node2 = ActorSystem.create("opendaylight-rpc", config2.get()); } @@ -97,21 +106,23 @@ public class AbstractRpcTest { @Before public void setUp() { - schemaContext = YangParserTestUtils.parseYangResources(AbstractRpcTest.class, "/test-rpc.yang"); + schemaContext = YangParserTestUtils.parseYangResources(AbstractOpsTest.class, "/test-rpc.yang"); MockitoAnnotations.initMocks(this); rpcRegistry1Probe = new TestKit(node1); - rpcInvoker1 = node1.actorOf(RpcInvoker.props(domRpcService1)); + rpcInvoker1 = node1.actorOf(OpsInvoker.props(domRpcService1, domActionService1)); rpcRegistry2Probe = new TestKit(node2); - rpcInvoker2 = node2.actorOf(RpcInvoker.props(domRpcService2)); + rpcInvoker2 = node2.actorOf(OpsInvoker.props(domRpcService2, domActionService2)); remoteRpcImpl1 = new RemoteRpcImplementation(rpcInvoker2, config1); remoteRpcImpl2 = new RemoteRpcImplementation(rpcInvoker1, config2); + remoteActionImpl1 = new RemoteActionImplementation(rpcInvoker2, config1); + remoteActionImpl2 = new RemoteActionImplementation(rpcInvoker1, config2); } static void assertRpcErrorEquals(final RpcError rpcError, final ErrorSeverity severity, - final ErrorType errorType, final String tag, final String message, final String applicationTag, - final String info, final String causeMsg) { + final ErrorType errorType, final String tag, final String message, + final String applicationTag, final String info, final String causeMsg) { assertEquals("getSeverity", severity, rpcError.getSeverity()); assertEquals("getErrorType", errorType, rpcError.getErrorType()); assertEquals("getTag", tag, rpcError.getTag()); @@ -132,7 +143,7 @@ public class AbstractRpcTest { public static ContainerNode makeRPCInput(final String data) { return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_INPUT)) - .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build(); + .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build(); } @@ -142,8 +153,8 @@ public class AbstractRpcTest { } static void assertFailedRpcResult(final DOMRpcResult rpcResult, final ErrorSeverity severity, - final ErrorType errorType, final String tag, final String message, final String applicationTag, - final String info, final String causeMsg) { + final ErrorType errorType, final String tag, final String message, + final String applicationTag, final String info, final String causeMsg) { assertNotNull("RpcResult was null", rpcResult); final Collection rpcErrors = rpcResult.getErrors(); assertEquals("RpcErrors count", 1, rpcErrors.size()); @@ -152,7 +163,7 @@ public class AbstractRpcTest { } static void assertSuccessfulRpcResult(final DOMRpcResult rpcResult, - final NormalizedNode expOutput) { + final NormalizedNode expOutput) { assertNotNull("RpcResult was null", rpcResult); assertCompositeNodeEquals(expOutput, rpcResult.getResult()); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsBrokerTest.java similarity index 89% rename from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsBrokerTest.java index 7ea9191e7c..793740c72a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsBrokerTest.java @@ -25,7 +25,7 @@ import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult; import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -public class RpcBrokerTest extends AbstractRpcTest { +public class OpsBrokerTest extends AbstractOpsTest { @Test public void testExecuteRpc() { @@ -33,14 +33,13 @@ public class RpcBrokerTest extends AbstractRpcTest { final DOMRpcResult rpcResult = new DefaultDOMRpcResult(invokeRpcResult); when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), any())).thenReturn( FluentFutures.immediateFluentFuture(rpcResult)); + final ExecuteRpc executeRpc = ExecuteRpc.from(TEST_RPC_ID, null); - final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null); - - rpcInvoker1.tell(executeMsg, rpcRegistry1Probe.getRef()); + rpcInvoker1.tell(executeRpc, rpcRegistry1Probe.getRef()); final RpcResponse rpcResponse = rpcRegistry1Probe.expectMsgClass(Duration.ofSeconds(5), RpcResponse.class); - assertEquals(rpcResult.getResult(), rpcResponse.getResultNormalizedNode()); + assertEquals(rpcResult.getResult(), rpcResponse.getOutput()); } @Test diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsListenerTest.java similarity index 60% rename from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsListenerTest.java index 7ae8a0a57f..d5cfcc009f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsListenerTest.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.remote.rpc; import akka.actor.ActorRef; @@ -16,19 +15,24 @@ import java.util.Collections; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMActionInstance; import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaPath; -public class RpcListenerTest { +public class OpsListenerTest { private static final QName TEST_QNAME = QName.create("test", "2015-06-12", "test"); private static final SchemaPath RPC_TYPE = SchemaPath.create(true, TEST_QNAME); private static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier .create(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)); private static final DOMRpcIdentifier RPC_ID = DOMRpcIdentifier.create(RPC_TYPE, TEST_PATH); + private static final DOMActionInstance ACTION_INSTANCE = DOMActionInstance.of(RPC_TYPE, + LogicalDatastoreType.OPERATIONAL, TEST_PATH); private static ActorSystem SYSTEM; @@ -49,19 +53,40 @@ public class RpcListenerTest { final TestKit probeReg = new TestKit(SYSTEM); final ActorRef rpcRegistry = probeReg.getRef(); - final RpcListener rpcListener = new RpcListener(rpcRegistry); - rpcListener.onRpcAvailable(Collections.singleton(RPC_ID)); + final OpsListener opsListener = new OpsListener(rpcRegistry, rpcRegistry); + opsListener.onRpcAvailable(Collections.singleton(RPC_ID)); probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class); } + @Test + public void testActionRouteAdd() { + // Test announcements + final TestKit probeReg = new TestKit(SYSTEM); + final ActorRef actionRegistry = probeReg.getRef(); + + final OpsListener opsListener = new OpsListener(actionRegistry, actionRegistry); + opsListener.onActionsChanged(Collections.emptySet(),Collections.singleton(ACTION_INSTANCE)); + probeReg.expectMsgClass(ActionRegistry.Messages.UpdateActions.class); + } + @Test public void testRouteRemove() { // Test announcements final TestKit probeReg = new TestKit(SYSTEM); final ActorRef rpcRegistry = probeReg.getRef(); - final RpcListener rpcListener = new RpcListener(rpcRegistry); - rpcListener.onRpcUnavailable(Collections.singleton(RPC_ID)); + final OpsListener opsListener = new OpsListener(rpcRegistry, rpcRegistry); + opsListener.onRpcUnavailable(Collections.singleton(RPC_ID)); probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class); } + +// @Test +// public void testAcceptsImplementation() { +// +// final TestKit probeReg = new TestKit(SYSTEM); +// final ActorRef opsRegistry = probeReg.getRef(); +// +// final OpsListener opsListener = new OpsListener(opsRegistry, opsRegistry); +// opsListener.acceptsImplementation() +// } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsRegistrarTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsRegistrarTest.java new file mode 100644 index 0000000000..59db730d01 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsRegistrarTest.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.Props; +import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMActionInstance; +import org.opendaylight.mdsal.dom.api.DOMActionProviderService; +import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; +import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration; +import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; +import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +public class OpsRegistrarTest { + @Mock + private DOMRpcProviderService rpcService; + @Mock + private DOMActionProviderService actionService; + @Mock + private DOMRpcImplementationRegistration oldReg; + @Mock + private DOMRpcImplementationRegistration newReg; + @Mock + private ObjectRegistration oldActionReg; + @Mock + private ObjectRegistration newActionReg; + + private ActorSystem system; + private TestActorRef testActorRef; + private Address endpointAddress; + private RemoteRpcEndpoint firstEndpoint; + private RemoteRpcEndpoint secondEndpoint; + private RemoteActionEndpoint firstActionEndpoint; + private RemoteActionEndpoint secondActionEndpoint; + private OpsRegistrar opsRegistrar; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + system = ActorSystem.create("test"); + + final TestKit testKit = new TestKit(system); + final RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("system").build(); + final Props props = OpsRegistrar.props(config, rpcService, actionService); + testActorRef = new TestActorRef<>(system, props, testKit.getRef(), "actorRef"); + endpointAddress = new Address("http", "local"); + + final DOMRpcIdentifier firstEndpointId = DOMRpcIdentifier.create( + SchemaPath.create(true, QName.create("first:identifier", "foo"))); + final DOMRpcIdentifier secondEndpointId = DOMRpcIdentifier.create( + SchemaPath.create(true, QName.create("second:identifier", "bar"))); + final QName firstActionQName = QName.create("first:actionIdentifier", "fooAction"); + + final DOMActionInstance firstActionInstance = DOMActionInstance.of( + SchemaPath.create(true, firstActionQName), LogicalDatastoreType.OPERATIONAL, + YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(firstActionQName))); + + final DOMActionInstance secondActionInstance = DOMActionInstance.of( + SchemaPath.create(true, firstActionQName), LogicalDatastoreType.OPERATIONAL, + YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(firstActionQName))); + + final TestKit senderKit = new TestKit(system); + firstEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(firstEndpointId)); + secondEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(secondEndpointId)); + firstActionEndpoint = new RemoteActionEndpoint(senderKit.getRef(), + Collections.singletonList(firstActionInstance)); + secondActionEndpoint = new RemoteActionEndpoint(senderKit.getRef(), + Collections.singletonList(secondActionInstance)); + + doReturn(oldReg).when(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class), + eq(firstEndpoint.getRpcs())); + doReturn(newReg).when(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class), + eq(secondEndpoint.getRpcs())); + + doReturn(oldActionReg).when(actionService).registerActionImplementation(any(RemoteActionImplementation.class), + eq(secondActionEndpoint.getActions())); + doReturn(oldActionReg).when(actionService).registerActionImplementation(any(RemoteActionImplementation.class), + eq(secondActionEndpoint.getActions())); + + opsRegistrar = testActorRef.underlyingActor(); + } + + @After + public void tearDown() { + TestKit.shutdownActorSystem(system, true); + } + + @Test + public void testHandleReceiveAddEndpoint() { + final Map> endpoints = ImmutableMap.of( + endpointAddress, Optional.of(firstEndpoint)); + testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); + + verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class), + eq(firstEndpoint.getRpcs())); + verifyNoMoreInteractions(rpcService, oldReg, newReg); + } + + @Test + public void testHandleReceiveRemoveEndpoint() { + final Map> endpoints = ImmutableMap.of( + endpointAddress, Optional.empty()); + testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); + verifyNoMoreInteractions(rpcService, oldReg, newReg); + } + + @Test + public void testHandleReceiveUpdateRpcEndpoint() { + final InOrder inOrder = inOrder(rpcService, oldReg, newReg); + + testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(firstEndpoint))), + ActorRef.noSender()); + + inOrder.verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class), + eq(firstEndpoint.getRpcs())); + + testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(secondEndpoint))), + ActorRef.noSender()); + + inOrder.verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class), + eq(secondEndpoint.getRpcs())); + + + verifyNoMoreInteractions(rpcService, oldReg, newReg); + } + + @Test + public void testHandleReceiveUpdateActionEndpoint() { + final InOrder inOrder = inOrder(actionService, oldActionReg, newActionReg); + + testActorRef.tell(new UpdateRemoteActionEndpoints(ImmutableMap.of(endpointAddress, + Optional.of(firstActionEndpoint))), ActorRef.noSender()); + + inOrder.verify(actionService).registerActionImplementation(any(RemoteActionImplementation.class), + eq(firstActionEndpoint.getActions())); + + testActorRef.tell(new UpdateRemoteActionEndpoints(ImmutableMap.of(endpointAddress, + Optional.of(secondActionEndpoint))), ActorRef.noSender()); + + inOrder.verify(actionService).registerActionImplementation(any(RemoteActionImplementation.class), + eq(secondActionEndpoint.getActions())); + + // verify first registration is closed +// inOrder.verify(oldReg).close(); + + verifyNoMoreInteractions(actionService, oldActionReg, newActionReg); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsImplementationTest.java similarity index 58% rename from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsImplementationTest.java index 5b3394eebd..6a7681223e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsImplementationTest.java @@ -13,18 +13,24 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.opendaylight.mdsal.dom.api.DOMActionException; +import org.opendaylight.mdsal.dom.api.DOMActionResult; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMRpcException; import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult; +import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult; import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -35,7 +41,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaPath; * * @author Thomas Pantelis */ -public class RemoteRpcImplementationTest extends AbstractRpcTest { +public class RemoteOpsImplementationTest extends AbstractOpsTest { /** * This test method invokes and executes the remote rpc. @@ -45,10 +51,12 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { final ContainerNode rpcOutput = makeRPCOutput("bar"); final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput); +// Answer> answer = FluentFutures.immediateFluentFuture(rpcResult); + final NormalizedNode invokeRpcInput = makeRPCInput("foo"); @SuppressWarnings({"unchecked", "rawtypes"}) final ArgumentCaptor> inputCaptor = - (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class); + ArgumentCaptor.forClass(NormalizedNode.class); when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn( FluentFutures.immediateFluentFuture(rpcResult)); @@ -60,6 +68,27 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { assertEquals(rpcOutput, result.getResult()); } + /** + * This test method invokes and executes the remote action. + */ + @Test + public void testInvokeAction() throws Exception { + final ContainerNode actionOutput = makeRPCOutput("bar"); + final DOMActionResult actionResult = new SimpleDOMActionResult(actionOutput, Collections.emptyList()); + final NormalizedNode invokeActionInput = makeRPCInput("foo"); + @SuppressWarnings({"unchecked", "rawtypes"}) + final ArgumentCaptor inputCaptor = + ArgumentCaptor.forClass(ContainerNode.class); + doReturn(FluentFutures.immediateFluentFuture(actionResult)).when(domActionService2).invokeAction( + eq(TEST_RPC_TYPE), eq(TEST_DATA_TREE_ID), inputCaptor.capture()); + final ListenableFuture frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE, + TEST_DATA_TREE_ID, (ContainerNode) invokeActionInput); + assertTrue(frontEndFuture instanceof RemoteDOMActionFuture); + final DOMActionResult result = frontEndFuture.get(5, TimeUnit.SECONDS); + assertEquals(actionOutput, result.getOutput().get()); + + } + /** * This test method invokes and executes the remote rpc. */ @@ -82,6 +111,28 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { assertEquals(rpcOutput, result.getResult()); } + /** + * This test method invokes and executes the remote action. + */ + @Test + public void testInvokeActionWithNullInput() throws Exception { + final ContainerNode actionOutput = makeRPCOutput("bar"); + final DOMActionResult actionResult = new SimpleDOMActionResult(actionOutput); + + @SuppressWarnings({"unchecked", "rawtypes"}) + final ArgumentCaptor inputCaptor = + ArgumentCaptor.forClass(ContainerNode.class); + doReturn(FluentFutures.immediateFluentFuture(actionResult)).when(domActionService2).invokeAction( + eq(TEST_RPC_TYPE), eq(TEST_DATA_TREE_ID), inputCaptor.capture()); + + ListenableFuture frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE, + TEST_DATA_TREE_ID, actionOutput); + assertTrue(frontEndFuture instanceof RemoteDOMActionFuture); + + final DOMActionResult result = frontEndFuture.get(5, TimeUnit.SECONDS); + assertEquals(actionOutput, result.getOutput().get()); + } + /** * This test method invokes and executes the remote rpc. */ @@ -129,6 +180,32 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { } } + /** + * This test method invokes and executes the remote rpc. + */ + @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"}) + @Test(expected = DOMActionException.class) + public void testInvokeActionWithRemoteFailedFuture() throws Throwable { + final ContainerNode invokeActionInput = makeRPCInput("foo"); + @SuppressWarnings({"unchecked", "rawtypes"}) + final ArgumentCaptor inputCaptor = + ArgumentCaptor.forClass(ContainerNode.class); + + when(domActionService2.invokeAction(eq(TEST_RPC_TYPE), eq(TEST_DATA_TREE_ID), + inputCaptor.capture())).thenReturn(FluentFutures.immediateFailedFluentFuture( + new RemoteDOMRpcException("Test Exception", null))); + + final ListenableFuture frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE, + TEST_DATA_TREE_ID, invokeActionInput); + assertTrue(frontEndFuture instanceof RemoteDOMActionFuture); + + try { + frontEndFuture.get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + /** * This test method invokes and tests exceptions when akka timeout occured * Currently ignored since this test with current config takes around 15 seconds to complete. @@ -164,4 +241,27 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { throw e.getCause(); } } + + /** + * This test method invokes remote rpc and lookup failed + * with runtime exception. + */ + @Test(expected = DOMActionException.class) + @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"}) + public void testInvokeActionWithLookupException() throws Throwable { + final ContainerNode invokeRpcInput = makeRPCInput("foo"); + + doThrow(new RuntimeException("test")).when(domActionService2).invokeAction(any(SchemaPath.class), + any(DOMDataTreeIdentifier.class), any(ContainerNode.class)); + + final ListenableFuture frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE, + TEST_DATA_TREE_ID, invokeRpcInput); + assertTrue(frontEndFuture instanceof RemoteDOMActionFuture); + + try { + frontEndFuture.get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderConfigTest.java similarity index 60% rename from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderConfigTest.java index 41de6a48c7..322dbac781 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderConfigTest.java @@ -7,6 +7,11 @@ */ package org.opendaylight.controller.remote.rpc; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedAbstractActor; @@ -14,33 +19,34 @@ import akka.testkit.TestActorRef; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; -import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; import scala.concurrent.duration.FiniteDuration; -public class RemoteRpcProviderConfigTest { +public class RemoteOpsProviderConfigTest { @Test public void testConfigDefaults() { - RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test").build(); + RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("unit-test").build(); //Assert on configurations from common config - Assert.assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default - Assert.assertNotNull(config.getMailBoxCapacity()); - Assert.assertNotNull(config.getMailBoxName()); - Assert.assertNotNull(config.getMailBoxPushTimeout()); + assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default + assertNotNull(config.getMailBoxCapacity()); + assertNotNull(config.getMailBoxName()); + assertNotNull(config.getMailBoxPushTimeout()); //rest of the configurations should be set - Assert.assertNotNull(config.getActorSystemName()); - Assert.assertNotNull(config.getRpcBrokerName()); - Assert.assertNotNull(config.getRpcBrokerPath()); - Assert.assertNotNull(config.getRpcManagerName()); - Assert.assertNotNull(config.getRpcManagerPath()); - Assert.assertNotNull(config.getRpcRegistryName()); - Assert.assertNotNull(config.getRpcRegistryPath()); - Assert.assertNotNull(config.getAskDuration()); - Assert.assertNotNull(config.getGossipTickInterval()); + assertNotNull(config.getActorSystemName()); + assertNotNull(config.getRpcBrokerName()); + assertNotNull(config.getRpcBrokerPath()); + assertNotNull(config.getRpcManagerName()); + assertNotNull(config.getRpcManagerPath()); + assertNotNull(config.getRpcRegistryName()); + assertNotNull(config.getActionRegistryName()); + assertNotNull(config.getRpcRegistryPath()); + assertNotNull(config.getActionRegistryPath()); + assertNotNull(config.getAskDuration()); + assertNotNull(config.getGossipTickInterval()); } @Test @@ -52,16 +58,16 @@ public class RemoteRpcProviderConfigTest { String timeOutVal = "10ms"; FiniteDuration expectedTimeout = FiniteDuration.create(10, TimeUnit.MILLISECONDS); - RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test") + RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("unit-test") .metricCaptureEnabled(true)//enable metric capture .mailboxCapacity(expectedCapacity) .mailboxPushTimeout(timeOutVal) .withConfigReader(reader) .build(); - Assert.assertTrue(config.isMetricCaptureEnabled()); - Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue()); - Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis()); + assertTrue(config.isMetricCaptureEnabled()); + assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue()); + assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis()); //Now check this config inside an actor ActorSystem system = ActorSystem.create("unit-test", config.get()); @@ -71,11 +77,11 @@ public class RemoteRpcProviderConfigTest { ConfigTestActor actor = configTestActorTestActorRef.underlyingActor(); Config actorConfig = actor.getConfig(); - config = new RemoteRpcProviderConfig(actorConfig); + config = new RemoteOpsProviderConfig(actorConfig); - Assert.assertTrue(config.isMetricCaptureEnabled()); - Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue()); - Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis()); + assertTrue(config.isMetricCaptureEnabled()); + assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue()); + assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis()); } public static class ConfigTestActor extends UntypedAbstractActor { @@ -87,7 +93,7 @@ public class RemoteRpcProviderConfigTest { } @Override - public void onReceive(Object message) { + public void onReceive(final Object message) { } /** diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactoryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactoryTest.java new file mode 100644 index 0000000000..851e37eed2 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderFactoryTest.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import static org.mockito.MockitoAnnotations.initMocks; + +import akka.actor.ActorSystem; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.opendaylight.mdsal.dom.api.DOMActionProviderService; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; +import org.opendaylight.mdsal.dom.api.DOMRpcService; + +public class RemoteOpsProviderFactoryTest { + + @Mock + private DOMRpcProviderService providerService; + @Mock + private DOMRpcService rpcService; + @Mock + private ActorSystem actorSystem; + @Mock + private RemoteOpsProviderConfig providerConfig; + @Mock + private DOMActionProviderService actionProviderService; + @Mock + private DOMActionService actionService; + + @Before + public void setUp() { + initMocks(this); + } + + @Test + public void testCreateInstance() { + Assert.assertNotNull(RemoteOpsProviderFactory + .createInstance(providerService, rpcService, actorSystem, providerConfig, + actionProviderService, actionService)); + } + + @Test(expected = NullPointerException.class) + public void testCreateInstanceMissingProvideService() { + RemoteOpsProviderFactory.createInstance(null, rpcService, actorSystem, providerConfig, + actionProviderService, actionService); + } + + @Test(expected = NullPointerException.class) + public void testCreateInstanceMissingRpcService() { + RemoteOpsProviderFactory.createInstance(providerService, null, actorSystem, providerConfig, + actionProviderService, actionService); + } + + @Test(expected = NullPointerException.class) + public void testCreateInstanceMissingActorSystem() { + RemoteOpsProviderFactory.createInstance(providerService, rpcService, null, providerConfig, + actionProviderService, actionService); + } + + @Test(expected = NullPointerException.class) + public void testCreateInstanceMissingProviderConfig() { + RemoteOpsProviderFactory.createInstance(providerService, rpcService, actorSystem, null, + actionProviderService, actionService); + } + + @Test(expected = NullPointerException.class) + public void testCreateInstanceMissingActionProvider() { + RemoteOpsProviderFactory.createInstance(providerService, rpcService, actorSystem, providerConfig, + null, actionService); + } + + @Test(expected = NullPointerException.class) + public void testCreateInstanceMissingActionService() { + RemoteOpsProviderFactory.createInstance(providerService, rpcService, actorSystem, providerConfig, + actionProviderService, null); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderTest.java similarity index 72% rename from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderTest.java index c427044c60..df9f346f6b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteOpsProviderTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.remote.rpc; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; @@ -16,21 +17,22 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.mdsal.dom.api.DOMActionProviderService; +import org.opendaylight.mdsal.dom.api.DOMActionService; import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; import org.opendaylight.mdsal.dom.api.DOMRpcService; import scala.concurrent.Await; import scala.concurrent.duration.FiniteDuration; -public class RemoteRpcProviderTest { +public class RemoteOpsProviderTest { static ActorSystem system; - static RemoteRpcProviderConfig moduleConfig; + static RemoteOpsProviderConfig moduleConfig; @BeforeClass public static void setup() { - moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc") + moduleConfig = new RemoteOpsProviderConfig.Builder("odl-cluster-rpc") .withConfigReader(ConfigFactory::load).build(); final Config config = moduleConfig.get(); system = ActorSystem.create("odl-cluster-rpc", config); @@ -45,16 +47,16 @@ public class RemoteRpcProviderTest { @Test public void testRemoteRpcProvider() throws Exception { - try (RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(DOMRpcProviderService.class), - mock(DOMRpcService.class), new RemoteRpcProviderConfig(system.settings().config()))) { + try (RemoteOpsProvider rpcProvider = new RemoteOpsProvider(system, mock(DOMRpcProviderService.class), + mock(DOMRpcService.class), new RemoteOpsProviderConfig(system.settings().config()), + mock(DOMActionProviderService.class), mock(DOMActionService.class))) { rpcProvider.start(); - final ActorRef actorRef = Await.result( system.actorSelection(moduleConfig.getRpcManagerPath()).resolveOne( FiniteDuration.create(1, TimeUnit.SECONDS)), FiniteDuration.create(2, TimeUnit.SECONDS)); - Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath())); + assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath())); } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactoryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactoryTest.java deleted file mode 100644 index 3481d5b15f..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactoryTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc; - -import static org.mockito.MockitoAnnotations.initMocks; - -import akka.actor.ActorSystem; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; -import org.opendaylight.mdsal.dom.api.DOMRpcService; - -public class RemoteRpcProviderFactoryTest { - - @Mock - private DOMRpcProviderService providerService; - @Mock - private DOMRpcService rpcService; - @Mock - private ActorSystem actorSystem; - @Mock - private RemoteRpcProviderConfig providerConfig; - - @Before - public void setUp() { - initMocks(this); - } - - @Test - public void testCreateInstance() { - Assert.assertNotNull(RemoteRpcProviderFactory - .createInstance(providerService, rpcService, actorSystem, providerConfig)); - } - - @Test(expected = NullPointerException.class) - public void testCreateInstanceMissingProvideService() { - RemoteRpcProviderFactory.createInstance(null, rpcService, actorSystem, providerConfig); - } - - @Test(expected = NullPointerException.class) - public void testCreateInstanceMissingRpcService() { - RemoteRpcProviderFactory.createInstance(providerService, null, actorSystem, providerConfig); - } - - @Test(expected = NullPointerException.class) - public void testCreateInstanceMissingActorSystem() { - RemoteRpcProviderFactory.createInstance(providerService, rpcService, null, providerConfig); - } - - @Test(expected = NullPointerException.class) - public void testCreateInstanceMissingProviderConfig() { - RemoteRpcProviderFactory.createInstance(providerService, rpcService, actorSystem, null); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcErrorsExceptionTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcErrorsExceptionTest.java index 941ae2b330..6ced09682e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcErrorsExceptionTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcErrorsExceptionTest.java @@ -7,9 +7,10 @@ */ package org.opendaylight.controller.remote.rpc; +import static org.junit.Assert.assertEquals; + import java.util.ArrayList; import java.util.List; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.opendaylight.yangtools.yang.common.RpcError; @@ -37,22 +38,25 @@ public class RpcErrorsExceptionTest { @Test public void testGetMessage() { - Assert.assertEquals(ERROR_MESSAGE, exception.getMessage()); + assertEquals(ERROR_MESSAGE, exception.getMessage()); } @Test public void testGetRpcErrors() { final List actualErrors = (List) exception.getRpcErrors(); - Assert.assertEquals(rpcErrors.size(), actualErrors.size()); + assertEquals(rpcErrors.size(), actualErrors.size()); for (int i = 0; i < actualErrors.size(); i++) { - Assert.assertEquals(rpcErrors.get(i).getApplicationTag(), actualErrors.get(i).getApplicationTag()); - Assert.assertEquals(rpcErrors.get(i).getSeverity(), actualErrors.get(i).getSeverity()); - Assert.assertEquals(rpcErrors.get(i).getMessage(), actualErrors.get(i).getMessage()); - Assert.assertEquals(rpcErrors.get(i).getErrorType(), actualErrors.get(i).getErrorType()); - Assert.assertEquals(rpcErrors.get(i).getCause(), actualErrors.get(i).getCause()); - Assert.assertEquals(rpcErrors.get(i).getInfo(), actualErrors.get(i).getInfo()); - Assert.assertEquals(rpcErrors.get(i).getTag(), actualErrors.get(i).getTag()); + final RpcError expected = rpcErrors.get(i); + final RpcError actual = actualErrors.get(i); + + assertEquals(expected.getApplicationTag(), actual.getApplicationTag()); + assertEquals(expected.getSeverity(), actual.getSeverity()); + assertEquals(expected.getMessage(), actual.getMessage()); + assertEquals(expected.getErrorType(), actual.getErrorType()); + assertEquals(expected.getCause(), actual.getCause()); + assertEquals(expected.getInfo(), actual.getInfo()); + assertEquals(expected.getTag(), actual.getTag()); } } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcRegistrarTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcRegistrarTest.java deleted file mode 100644 index 48b825ad39..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcRegistrarTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Address; -import akka.actor.Props; -import akka.testkit.TestActorRef; -import akka.testkit.javadsl.TestKit; -import com.google.common.collect.ImmutableMap; -import java.util.Collections; -import java.util.Map; -import java.util.Optional; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint; -import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; -import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration; -import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; - -public class RpcRegistrarTest { - @Mock - private DOMRpcProviderService service; - @Mock - private DOMRpcImplementationRegistration oldReg; - @Mock - private DOMRpcImplementationRegistration newReg; - - private ActorSystem system; - private TestActorRef testActorRef; - private Address endpointAddress; - private RemoteRpcEndpoint firstEndpoint; - private RemoteRpcEndpoint secondEndpoint; - private RpcRegistrar rpcRegistrar; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - system = ActorSystem.create("test"); - - final TestKit testKit = new TestKit(system); - final RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("system").build(); - final Props props = RpcRegistrar.props(config, service); - testActorRef = new TestActorRef<>(system, props, testKit.getRef(), "actorRef"); - endpointAddress = new Address("http", "local"); - - final DOMRpcIdentifier firstEndpointId = DOMRpcIdentifier.create( - SchemaPath.create(true, QName.create("first:identifier", "foo"))); - final DOMRpcIdentifier secondEndpointId = DOMRpcIdentifier.create( - SchemaPath.create(true, QName.create("second:identifier", "bar"))); - - final TestKit senderKit = new TestKit(system); - firstEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(firstEndpointId)); - secondEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(secondEndpointId)); - - Mockito.doReturn(oldReg).when(service).registerRpcImplementation( - Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs())); - - Mockito.doReturn(newReg).when(service).registerRpcImplementation( - Mockito.any(RemoteRpcImplementation.class), Mockito.eq(secondEndpoint.getRpcs())); - - rpcRegistrar = testActorRef.underlyingActor(); - } - - @After - public void tearDown() { - TestKit.shutdownActorSystem(system, true); - } - - @Test - public void testPostStop() throws Exception { - testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(firstEndpoint))), - ActorRef.noSender()); - testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(secondEndpoint))), - ActorRef.noSender()); - - rpcRegistrar.postStop(); - - Mockito.verify(oldReg).close(); - Mockito.verify(newReg).close(); - } - - @Test - public void testHandleReceiveAddEndpoint() { - final Map> endpoints = ImmutableMap.of( - endpointAddress, Optional.of(firstEndpoint)); - testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); - - Mockito.verify(service).registerRpcImplementation( - Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs())); - Mockito.verifyNoMoreInteractions(service, oldReg, newReg); - } - - @Test - public void testHandleReceiveRemoveEndpoint() { - final Map> endpoints = ImmutableMap.of( - endpointAddress, Optional.empty()); - testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); - Mockito.verifyNoMoreInteractions(service, oldReg, newReg); - } - - @Test - public void testHandleReceiveUpdateEndpoint() { - final InOrder inOrder = Mockito.inOrder(service, oldReg, newReg); - - testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(firstEndpoint))), - ActorRef.noSender()); - - // first registration - inOrder.verify(service).registerRpcImplementation( - Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs())); - - testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(secondEndpoint))), - ActorRef.noSender()); - - // second registration - inOrder.verify(service).registerRpcImplementation( - Mockito.any(RemoteRpcImplementation.class), Mockito.eq(secondEndpoint.getRpcs())); - - // verify first registration is closed - inOrder.verify(oldReg).close(); - - Mockito.verifyNoMoreInteractions(service, oldReg, newReg); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteOpsTest.java similarity index 51% rename from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteOpsTest.java index 2aeed83744..eafbd0de0b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteOpsTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * Copyright (c) 2019 Nordix Foundation. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -11,23 +11,19 @@ import static org.junit.Assert.assertEquals; import org.apache.commons.lang.SerializationUtils; import org.junit.Test; -import org.opendaylight.controller.remote.rpc.AbstractRpcTest; +import org.opendaylight.controller.remote.rpc.AbstractOpsTest; -/** - * Unit tests for ExecuteRpc. - * - * @author Thomas Pantelis - */ -public class ExecuteRpcTest { +public class ExecuteOpsTest { @Test - public void testSerialization() { - ExecuteRpc expected = ExecuteRpc.from(AbstractRpcTest.TEST_RPC_ID, - AbstractRpcTest.makeRPCInput("serialization-test")); + public void testOpsSerialization() { + ExecuteRpc expected = ExecuteRpc.from(AbstractOpsTest.TEST_RPC_ID, + AbstractOpsTest.makeRPCInput("serialization-test")); ExecuteRpc actual = (ExecuteRpc) SerializationUtils.clone(expected); - assertEquals("getRpc", expected.getRpc(), actual.getRpc()); - assertEquals("getInputNormalizedNode", expected.getInputNormalizedNode(), actual.getInputNormalizedNode()); + assertEquals("getName", expected.getType(), actual.getType()); + assertEquals("getInputNormalizedNode", expected.getInput(), actual.getInput()); + assertEquals("getPath", expected.getType(), actual.getType()); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/OpsResponseTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/OpsResponseTest.java new file mode 100644 index 0000000000..938f148042 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/OpsResponseTest.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import static org.junit.Assert.assertEquals; + +import java.util.Collections; +import java.util.Optional; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.remote.rpc.AbstractOpsTest; + +/** + * Unit tests for RpcResponse. + * + * @author Thomas Pantelis + */ +public class OpsResponseTest { + + @Test + public void testSerialization() { + RpcResponse expectedRpc = new RpcResponse(AbstractOpsTest.makeRPCOutput("serialization-test")); + + ActionResponse expectedAction = new ActionResponse( + Optional.of(AbstractOpsTest.makeRPCOutput("serialization-test")), Collections.emptyList()); + + RpcResponse actualRpc = (RpcResponse) SerializationUtils.clone(expectedRpc); + + ActionResponse actualAction = (ActionResponse) SerializationUtils.clone(expectedAction); + + assertEquals("getResultNormalizedNode", expectedRpc.getOutput(), + actualRpc.getOutput()); + + assertEquals("getResultNormalizedNode", expectedAction.getOutput(), + actualAction.getOutput()); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java deleted file mode 100644 index f4ec377bb7..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc.messages; - -import static org.junit.Assert.assertEquals; - -import org.apache.commons.lang.SerializationUtils; -import org.junit.Test; -import org.opendaylight.controller.remote.rpc.AbstractRpcTest; - -/** - * Unit tests for RpcResponse. - * - * @author Thomas Pantelis - */ -public class RpcResponseTest { - - @Test - public void testSerialization() { - RpcResponse expected = new RpcResponse(AbstractRpcTest.makeRPCOutput("serialization-test")); - - RpcResponse actual = (RpcResponse) SerializationUtils.clone(expected); - - assertEquals("getResultNormalizedNode", expected.getResultNormalizedNode(), actual.getResultNormalizedNode()); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistryTest.java new file mode 100644 index 0000000000..82b2c79d73 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/ActionRegistryTest.java @@ -0,0 +1,423 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import akka.cluster.UniqueAddress; +import akka.testkit.javadsl.TestKit; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateActions; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMActionInstance; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ActionRegistryTest { + private static final Logger LOG = LoggerFactory.getLogger(ActionRegistryTest.class); + + private static ActorSystem node1; + private static ActorSystem node2; + private static ActorSystem node3; + + private TestKit invoker1; + private TestKit invoker2; + private TestKit invoker3; + private TestKit registrar1; + private TestKit registrar2; + private TestKit registrar3; + private ActorRef registry1; + private ActorRef registry2; + private ActorRef registry3; + + private int routeIdCounter = 1; + + @BeforeClass + public static void staticSetup() { + AkkaConfigurationReader reader = ConfigFactory::load; + + RemoteOpsProviderConfig config1 = new RemoteOpsProviderConfig.Builder("memberA").gossipTickInterval("200ms") + .withConfigReader(reader).build(); + RemoteOpsProviderConfig config2 = new RemoteOpsProviderConfig.Builder("memberB").gossipTickInterval("200ms") + .withConfigReader(reader).build(); + RemoteOpsProviderConfig config3 = new RemoteOpsProviderConfig.Builder("memberC").gossipTickInterval("200ms") + .withConfigReader(reader).build(); + node1 = ActorSystem.create("opendaylight-rpc", config1.get()); + node2 = ActorSystem.create("opendaylight-rpc", config2.get()); + node3 = ActorSystem.create("opendaylight-rpc", config3.get()); + + waitForMembersUp(node1, Cluster.get(node2).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress()); + waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress()); + } + + static void waitForMembersUp(final ActorSystem node, final UniqueAddress... addresses) { + Set otherMembersSet = Sets.newHashSet(addresses); + Stopwatch sw = Stopwatch.createStarted(); + while (sw.elapsed(TimeUnit.SECONDS) <= 10) { + CurrentClusterState state = Cluster.get(node).state(); + for (Member m : state.getMembers()) { + if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress()) + && otherMembersSet.isEmpty()) { + return; + } + } + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + fail("Member(s) " + otherMembersSet + " are not Up"); + } + + @AfterClass + public static void staticTeardown() { + TestKit.shutdownActorSystem(node1); + TestKit.shutdownActorSystem(node2); + TestKit.shutdownActorSystem(node3); + } + + @Before + public void setup() { + invoker1 = new TestKit(node1); + registrar1 = new TestKit(node1); + registry1 = node1.actorOf(ActionRegistry.props(config(node1), invoker1.getRef(), registrar1.getRef())); + invoker2 = new TestKit(node2); + registrar2 = new TestKit(node2); + registry2 = node2.actorOf(ActionRegistry.props(config(node2), invoker2.getRef(), registrar2.getRef())); + invoker3 = new TestKit(node3); + registrar3 = new TestKit(node3); + registry3 = node3.actorOf(ActionRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef())); + } + + private static RemoteOpsProviderConfig config(final ActorSystem node) { + return new RemoteOpsProviderConfig(node.settings().config()); + } + + @After + public void teardown() { + if (registry1 != null) { + node1.stop(registry1); + } + if (registry2 != null) { + node2.stop(registry2); + } + if (registry3 != null) { + node3.stop(registry3); + } + + if (invoker1 != null) { + node1.stop(invoker1.getRef()); + } + if (invoker2 != null) { + node2.stop(invoker2.getRef()); + } + if (invoker3 != null) { + node3.stop(invoker3.getRef()); + } + + if (registrar1 != null) { + node1.stop(registrar1.getRef()); + } + if (registrar2 != null) { + node2.stop(registrar2.getRef()); + } + if (registrar3 != null) { + node3.stop(registrar3.getRef()); + } + } + + /** + * One node cluster. 1. Register action, ensure router can be found 2. Then remove action, ensure its + * deleted + */ + @Test + public void testAddRemoveActionOnSameNode() { + LOG.info("testAddRemoveActionOnSameNode starting"); + + Address nodeAddress = node1.provider().getDefaultAddress(); + + // Add action on node 1 + + List addedRouteIds = createRouteIds(); + + registry1.tell(new ActionRegistry.Messages.UpdateActions(addedRouteIds, + Collections.emptyList()), ActorRef.noSender()); + + // Bucket store should get an update bucket message. Updated bucket contains added action. + final TestKit testKit = new TestKit(node1); + + Map> buckets = retrieveBuckets(registry1, testKit, nodeAddress); + verifyBucket(buckets.get(nodeAddress), addedRouteIds); + + Map versions = retrieveVersions(registry1, testKit); + assertEquals("Version for bucket " + nodeAddress, (Long) buckets.get(nodeAddress).getVersion(), + versions.get(nodeAddress)); + + // Now remove action + registry1.tell(new UpdateActions(Collections.emptyList(),addedRouteIds), ActorRef.noSender()); + + // Bucket store should get an update bucket message. Action is removed in the updated bucket + + verifyEmptyBucket(testKit, registry1, nodeAddress); + + LOG.info("testAddRemoveActionOnSameNode ending"); + + } + + /** + * Three node cluster. 1. Register action on 1 node, ensure 2nd node gets updated 2. Remove action on + * 1 node, ensure 2nd node gets updated + */ + @Test + public void testActionAddRemoveInCluster() { + + LOG.info("testActionAddRemoveInCluster starting"); + + List addedRouteIds = createRouteIds(); + + Address node1Address = node1.provider().getDefaultAddress(); + + // Add action on node 1 + registry1.tell(new UpdateActions(addedRouteIds, Collections.emptyList()), ActorRef.noSender()); + + // Bucket store on node2 should get a message to update its local copy of remote buckets + final TestKit testKit = new TestKit(node2); + + Map> buckets = retrieveBuckets(registry2, testKit, node1Address); + verifyBucket(buckets.get(node1Address), addedRouteIds); + + // Now remove + registry1.tell(new UpdateActions(Collections.emptyList(), addedRouteIds), ActorRef.noSender()); + + // Bucket store on node2 should get a message to update its local copy of remote buckets. + // Wait for the bucket for node1 to be empty. + + verifyEmptyBucket(testKit, registry2, node1Address); + + LOG.info("testActionAddRemoveInCluster ending"); + } + + private void verifyEmptyBucket(final TestKit testKit, final ActorRef registry, final Address address) + throws AssertionError { + Map> buckets; + int numTries = 0; + while (true) { + buckets = retrieveBuckets(registry1, testKit, address); + + try { + verifyBucket(buckets.get(address), Collections.emptyList()); + break; + } catch (AssertionError e) { + if (++numTries >= 50) { + throw e; + } + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + } + + /** + * Three node cluster. Register action on 2 nodes. Ensure 3rd gets updated. + */ + @Test + public void testActionAddedOnMultiNodes() { + final TestKit testKit = new TestKit(node3); + + // Add action on node 1 + List addedRouteIds1 = createRouteIds(); + registry1.tell(new UpdateActions(addedRouteIds1, Collections.emptyList()), ActorRef.noSender()); + + final UpdateRemoteActionEndpoints req1 = registrar3.expectMsgClass(Duration.ofSeconds(3), + UpdateRemoteActionEndpoints.class); + + // Add action on node 2 + List addedRouteIds2 = createRouteIds(); + registry2.tell(new UpdateActions(addedRouteIds2, Collections.emptyList()), ActorRef.noSender()); + + final UpdateRemoteActionEndpoints req2 = registrar3.expectMsgClass(Duration.ofSeconds(3), + UpdateRemoteActionEndpoints.class); + Address node2Address = node2.provider().getDefaultAddress(); + Address node1Address = node1.provider().getDefaultAddress(); + + Map> buckets = retrieveBuckets(registry3, testKit, node1Address, + node2Address); + + verifyBucket(buckets.get(node1Address), addedRouteIds1); + verifyBucket(buckets.get(node2Address), addedRouteIds2); + + Map versions = retrieveVersions(registry3, testKit); + assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(), + versions.get(node1Address)); + assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(), + versions.get(node2Address)); + + assertEndpoints(req1, node1Address, invoker1); + assertEndpoints(req2, node2Address, invoker2); + + } + + private static void assertEndpoints(final ActionRegistry.Messages.UpdateRemoteActionEndpoints msg, + final Address address, final TestKit invoker) { + final Map> endpoints = msg.getActionEndpoints(); + assertEquals(1, endpoints.size()); + + final Optional maybeEndpoint = endpoints.get(address); + assertNotNull(maybeEndpoint); + assertTrue(maybeEndpoint.isPresent()); + + final RemoteActionEndpoint endpoint = maybeEndpoint.get(); + final ActorRef router = endpoint.getRouter(); + assertNotNull(router); + + router.tell("hello", ActorRef.noSender()); + final String s = invoker.expectMsgClass(Duration.ofSeconds(3), String.class); + assertEquals("hello", s); + } + + private static Map retrieveVersions(final ActorRef bucketStore, final TestKit testKit) { + bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef()); + @SuppressWarnings("unchecked") + final Map reply = testKit.expectMsgClass(Duration.ofSeconds(3), Map.class); + return reply; + } + + private static void verifyBucket(final Bucket bucket, + final List expRouteIds) { + ActionRoutingTable table = bucket.getData(); + assertNotNull("Bucket ActionRoutingTable is null", table); + for (DOMActionInstance r : expRouteIds) { + if (!table.contains(r)) { + fail("ActionRoutingTable does not contain " + r + ". Actual: " + table); + } + } + + assertEquals("ActionRoutingTable size", expRouteIds.size(), table.size()); + } + + private static Map> retrieveBuckets( + final ActorRef bucketStore, final TestKit testKit, final Address... addresses) { + int numTries = 0; + while (true) { + bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef()); + @SuppressWarnings("unchecked") + Map> buckets = testKit.expectMsgClass(Duration.ofSeconds(3), + Map.class); + + boolean foundAll = true; + for (Address addr : addresses) { + Bucket bucket = buckets.get(addr); + if (bucket == null) { + foundAll = false; + break; + } + } + + if (foundAll) { + return buckets; + } + + if (++numTries >= 50) { + fail("Missing expected buckets for addresses: " + Arrays.toString(addresses) + + ", Actual: " + buckets); + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + } + + @Test + public void testAddRoutesConcurrency() { + final TestKit testKit = new TestKit(node1); + + final int nRoutes = 500; + final Collection added = new ArrayList<>(nRoutes); + for (int i = 0; i < nRoutes; i++) { + QName type = QName.create(URI.create("/mockaction"), "mockaction" + routeIdCounter++); + final DOMActionInstance routeId = DOMActionInstance.of(SchemaPath.create(true, + type), LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.create(new + YangInstanceIdentifier.NodeIdentifier(type))); + added.add(routeId); + + //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + registry1.tell(new UpdateActions(Arrays.asList(routeId), Collections.emptyList()), + ActorRef.noSender()); + } + + int numTries = 0; + while (true) { + registry1.tell(GET_ALL_BUCKETS, testKit.getRef()); + @SuppressWarnings("unchecked") + Map> buckets = testKit.expectMsgClass(Duration.ofSeconds(3), + Map.class); + + Bucket localBucket = buckets.values().iterator().next(); + ActionRoutingTable table = localBucket.getData(); + if (table != null && table.size() == nRoutes) { + for (DOMActionInstance r : added) { + assertTrue("ActionRoutingTable contains " + r, table.contains(r)); + } + + break; + } + + if (++numTries >= 50) { + fail("Expected # routes: " + nRoutes + ", Actual: " + table.size()); + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + } + + private List createRouteIds() { + QName type = QName.create(URI.create("/mockaction"), "mockaction" + routeIdCounter++); + List routeIds = new ArrayList<>(1); + routeIds.add(DOMActionInstance.of(SchemaPath.create(true, type), + LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.create( + new YangInstanceIdentifier.NodeIdentifier(type)))); + return routeIds; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index d32e8d8a6e..4ae6e1e57f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -44,7 +44,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; @@ -79,11 +79,11 @@ public class RpcRegistryTest { public static void staticSetup() { AkkaConfigurationReader reader = ConfigFactory::load; - RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms") + RemoteOpsProviderConfig config1 = new RemoteOpsProviderConfig.Builder("memberA").gossipTickInterval("200ms") .withConfigReader(reader).build(); - RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").gossipTickInterval("200ms") + RemoteOpsProviderConfig config2 = new RemoteOpsProviderConfig.Builder("memberB").gossipTickInterval("200ms") .withConfigReader(reader).build(); - RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").gossipTickInterval("200ms") + RemoteOpsProviderConfig config3 = new RemoteOpsProviderConfig.Builder("memberC").gossipTickInterval("200ms") .withConfigReader(reader).build(); node1 = ActorSystem.create("opendaylight-rpc", config1.get()); node2 = ActorSystem.create("opendaylight-rpc", config2.get()); @@ -131,8 +131,8 @@ public class RpcRegistryTest { registry3 = node3.actorOf(RpcRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef())); } - private static RemoteRpcProviderConfig config(final ActorSystem node) { - return new RemoteRpcProviderConfig(node.settings().config()); + private static RemoteOpsProviderConfig config(final ActorSystem node) { + return new RemoteOpsProviderConfig(node.settings().config()); } @After @@ -299,7 +299,7 @@ public class RpcRegistryTest { } private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address, final TestKit invoker) { - final Map> endpoints = msg.getEndpoints(); + final Map> endpoints = msg.getRpcEndpoints(); assertEquals(1, endpoints.size()); final Optional maybeEndpoint = endpoints.get(address); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index b9784ab390..ed6d12c34d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -22,7 +22,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; import org.opendaylight.controller.remote.rpc.TerminationMonitor; public class BucketStoreTest { @@ -140,13 +140,13 @@ public class BucketStoreTest { */ private static BucketStoreActor createStore() { final Props props = Props.create(TestingBucketStoreActor.class, - new RemoteRpcProviderConfig(system.settings().config()), "testing-store",new T()); + new RemoteOpsProviderConfig(system.settings().config()), "testing-store",new T()); return TestActorRef.>create(system, props, "testStore").underlyingActor(); } private static final class TestingBucketStoreActor extends BucketStoreActor { - protected TestingBucketStoreActor(final RemoteRpcProviderConfig config, + protected TestingBucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) { super(config, persistenceId, initialData); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java index 6c826ac7bf..896c024539 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java @@ -29,7 +29,7 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; import org.opendaylight.controller.remote.rpc.TerminationMonitor; @@ -106,8 +106,8 @@ public class GossiperTest { * @return instance of Gossiper class */ private static Gossiper createGossiper() { - final RemoteRpcProviderConfig config = - new RemoteRpcProviderConfig.Builder("unit-test") + final RemoteOpsProviderConfig config = + new RemoteOpsProviderConfig.Builder("unit-test") .withConfigReader(ConfigFactory::load).build(); final Props props = Gossiper.testProps(config); final TestActorRef testRef = TestActorRef.create(system, props, "testGossiper"); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImplTest.java new file mode 100644 index 0000000000..67aba156ef --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteActionRegistryMXBeanImplTest.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2019 Nordix Foundation. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.mbeans; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.Dispatchers; +import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; +import akka.util.Timeout; +import com.google.common.collect.Lists; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMActionInstance; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +public class RemoteActionRegistryMXBeanImplTest { + + private static final QName LOCAL_QNAME = QName.create("base", "local"); + private static final SchemaPath EMPTY_SCHEMA_PATH = SchemaPath.ROOT; + private static final SchemaPath LOCAL_SCHEMA_PATH = SchemaPath.create(true, LOCAL_QNAME); + + private ActorSystem system; + private TestActorRef testActor; + private List buckets; + private RemoteActionRegistryMXBeanImpl mxBean; + + @Before + public void setUp() { + system = ActorSystem.create("test"); + + final DOMActionInstance emptyActionIdentifier = DOMActionInstance.of( + EMPTY_SCHEMA_PATH, LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY); + final DOMActionInstance localActionIdentifier = DOMActionInstance.of( + LOCAL_SCHEMA_PATH, LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(LOCAL_QNAME)); + + buckets = Lists.newArrayList(emptyActionIdentifier, localActionIdentifier); + + final RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("system").build(); + final TestKit invoker = new TestKit(system); + final TestKit registrar = new TestKit(system); + final TestKit supervisor = new TestKit(system); + final Props props = ActionRegistry.props(config, invoker.getRef(), registrar.getRef()) + .withDispatcher(Dispatchers.DefaultDispatcherId()); + testActor = new TestActorRef<>(system, props, supervisor.getRef(), "testActor"); + + final Timeout timeout = Timeout.apply(10, TimeUnit.SECONDS); + mxBean = new RemoteActionRegistryMXBeanImpl(new BucketStoreAccess(testActor, system.dispatcher(), timeout), + timeout); + } + + @After + public void tearDown() { + TestKit.shutdownActorSystem(system, Boolean.TRUE); + } + + @Test + public void testGetLocalRegisteredRoutedActionEmptyBuckets() { + final Set localRegisteredRoutedAction = mxBean.getLocalRegisteredAction(); + + Assert.assertNotNull(localRegisteredRoutedAction); + Assert.assertTrue(localRegisteredRoutedAction.isEmpty()); + } + + @Test + public void testGetLocalRegisteredRoutedAction() { + testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets), + Collections.emptyList()), ActorRef.noSender()); + final Set localRegisteredRoutedAction = mxBean.getLocalRegisteredAction(); + + Assert.assertNotNull(localRegisteredRoutedAction); + Assert.assertEquals(1, localRegisteredRoutedAction.size()); + + final String localAction = localRegisteredRoutedAction.iterator().next(); + Assert.assertTrue(localAction.contains(LOCAL_QNAME.toString())); + Assert.assertTrue(localAction.contains(LOCAL_SCHEMA_PATH.toString())); + } + + @Test + public void testFindActionByNameEmptyBuckets() { + final Map rpcByName = mxBean.findActionByName(""); + + Assert.assertNotNull(rpcByName); + Assert.assertTrue(rpcByName.isEmpty()); + } + + @Test + public void testFindActionByName() { + testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets), + Collections.emptyList()), ActorRef.noSender()); + final Map rpcByName = mxBean.findActionByName(""); + + Assert.assertNotNull(rpcByName); + Assert.assertEquals(1, rpcByName.size()); + Assert.assertTrue(rpcByName.containsValue(LOCAL_QNAME.getLocalName())); + } + + @Test + public void testFindActionByRouteEmptyBuckets() { + final Map rpcByRoute = mxBean.findActionByRoute(""); + + Assert.assertNotNull(rpcByRoute); + Assert.assertTrue(rpcByRoute.isEmpty()); + } + + @Test + public void testFindActionByRoute() { + testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets), + Collections.emptyList()), ActorRef.noSender()); + final Map rpcByRoute = mxBean.findActionByRoute(""); + + Assert.assertNotNull(rpcByRoute); + Assert.assertEquals(1, rpcByRoute.size()); + Assert.assertTrue(rpcByRoute.containsValue(LOCAL_QNAME.getLocalName())); + } + + @Test + public void testGetBucketVersionsEmptyBuckets() { + final String bucketVersions = mxBean.getBucketVersions(); + Assert.assertEquals(Collections.emptyMap().toString(), bucketVersions); + } + + @Test + public void testGetBucketVersions() { + testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets), + Collections.emptyList()), ActorRef.noSender()); + final String bucketVersions = mxBean.getBucketVersions(); + + Assert.assertTrue(bucketVersions.contains(testActor.provider().getDefaultAddress().toString())); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImplTest.java index d445ae1e6e..dbeccb2456 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImplTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImplTest.java @@ -24,7 +24,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; @@ -54,7 +54,7 @@ public class RemoteRpcRegistryMXBeanImplTest { buckets = Lists.newArrayList(emptyRpcIdentifier, localRpcIdentifier); - final RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("system").build(); + final RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("system").build(); final TestKit invoker = new TestKit(system); final TestKit registrar = new TestKit(system); final TestKit supervisor = new TestKit(system);