--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.dispatch.OnComplete;
+import com.google.common.util.concurrent.AbstractFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+
+abstract class AbstractRemoteFuture<T, E extends Exception> extends AbstractFuture<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractRemoteFuture.class);
+
+ private final @NonNull SchemaPath type;
+
+ AbstractRemoteFuture(final @NonNull SchemaPath type, final Future<Object> requestFuture) {
+ this.type = requireNonNull(type);
+ requestFuture.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global());
+ }
+
+ @Override
+ public final T get() throws InterruptedException, ExecutionException {
+ try {
+ return super.get();
+ } catch (ExecutionException e) {
+ throw mapException(e);
+ }
+ }
+
+ @Override
+ public final T get(final long timeout, final TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ try {
+ return super.get(timeout, unit);
+ } catch (final ExecutionException e) {
+ throw mapException(e);
+ }
+ }
+
+ @Override
+ protected final boolean set(final T value) {
+ final boolean ret = super.set(value);
+ if (ret) {
+ LOG.debug("Future {} for action {} successfully completed", this, type);
+ }
+ return ret;
+ }
+
+ final void failNow(final Throwable error) {
+ LOG.debug("Failing future {} for operation {}", this, type, error);
+ setException(error);
+ }
+
+ abstract @Nullable T processReply(Object reply);
+
+ abstract @NonNull Class<E> exceptionClass();
+
+ abstract @NonNull E wrapCause(Throwable cause);
+
+ private ExecutionException mapException(final ExecutionException ex) {
+ final Throwable cause = ex.getCause();
+ return exceptionClass().isInstance(cause) ? ex : new ExecutionException(ex.getMessage(), wrapCause(cause));
+ }
+
+ private final class FutureUpdater extends OnComplete<Object> {
+ @Override
+ public void onComplete(final Throwable error, final Object reply) {
+ if (error == null) {
+ final T result = processReply(reply);
+ if (result != null) {
+ LOG.debug("Received response for operation {}: result is {}", type, result);
+ set(result);
+ } else {
+ failNow(new IllegalStateException("Incorrect reply type " + reply + " from Akka"));
+ }
+ } else {
+ failNow(error);
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.opendaylight.controller.remote.rpc.messages.AbstractExecute;
+import scala.concurrent.Future;
+
+/**
+ * An abstract base class for remote RPC/action implementations.
+ */
+abstract class AbstractRemoteImplementation<T extends AbstractExecute<?>> {
+ // 0 for local, 1 for binding, 2 for remote
+ static final long COST = 2;
+
+ private final ActorRef remoteInvoker;
+ private final Timeout askDuration;
+
+ AbstractRemoteImplementation(final ActorRef remoteInvoker, final RemoteOpsProviderConfig config) {
+ this.remoteInvoker = requireNonNull(remoteInvoker);
+ this.askDuration = config.getAskDuration();
+ }
+
+ final Future<Object> ask(final T message) {
+ return Patterns.ask(remoteInvoker, requireNonNull(message), askDuration);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Status.Failure;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collection;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.messages.ActionResponse;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteAction;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.mdsal.dom.api.DOMActionResult;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Actor receiving invocation requests from remote nodes, routing them to
+ * {@link DOMRpcService#invokeRpc(SchemaPath, NormalizedNode)} and
+ * {@link DOMActionService#invokeAction(SchemaPath, DOMDataTreeIdentifier, ContainerNode)}.
+ *
+ * <p>
+ * Note that while the two interfaces are very similar, invocation strategies are slightly different due to historic
+ * behavior of RPCs:
+ * <ul>
+ * <li>RPCs allow both null input and output, and this is passed to the infrastructure. Furthermore any invocation
+ * which results in errors being reported drops the output content, even if it is present -- which is wrong, as
+ * 'errors' in this case can also be just warnings.</li>
+ * <li>Actions do not allow null input, but allow null output. If the output is present, it is passed along with any
+ * errors reported.</li>
+ * </ul>
+ */
+final class OpsInvoker extends AbstractUntypedActor {
+ private final DOMRpcService rpcService;
+ private final DOMActionService actionService;
+
+ private OpsInvoker(final DOMRpcService rpcService, final DOMActionService actionService) {
+ this.rpcService = requireNonNull(rpcService);
+ this.actionService = requireNonNull(actionService);
+ }
+
+ public static Props props(final DOMRpcService rpcService, final DOMActionService actionService) {
+ return Props.create(OpsInvoker.class,
+ requireNonNull(rpcService, "DOMRpcService can not be null"),
+ requireNonNull(actionService, "DOMActionService can not be null"));
+ }
+
+ @Override
+ protected void handleReceive(final Object message) {
+ if (message instanceof ExecuteRpc) {
+ LOG.debug("Handling ExecuteOps Message");
+ execute((ExecuteRpc) message);
+ } else if (message instanceof ExecuteAction) {
+ execute((ExecuteAction) message);
+ } else {
+ unknownMessage(message);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void execute(final ExecuteRpc msg) {
+ LOG.debug("Executing RPC {}", msg.getType());
+ final ActorRef sender = getSender();
+
+ final ListenableFuture<DOMRpcResult> future;
+ try {
+ future = rpcService.invokeRpc(msg.getType(), msg.getInput());
+ } catch (final RuntimeException e) {
+ LOG.debug("Failed to invoke RPC {}", msg.getType(), e);
+ sender.tell(new Failure(e), self());
+ return;
+ }
+
+ Futures.addCallback(future, new AbstractCallback<DOMRpcResult>(getSender(), msg.getType()) {
+ @Override
+ Object nullResponse(final SchemaPath type) {
+ LOG.warn("Execution of {} resulted in null result", type);
+ return new RpcResponse(null);
+ }
+
+ @Override
+ Object response(final SchemaPath type, final DOMRpcResult result) {
+ final Collection<? extends RpcError> errors = result.getErrors();
+ return errors.isEmpty() ? new RpcResponse(result.getResult())
+ // This is legacy (wrong) behavior, which ignores the fact that errors may be just warnings,
+ // discarding any output
+ : new Failure(new RpcErrorsException(String.format("Execution of rpc %s failed", type),
+ errors));
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void execute(final ExecuteAction msg) {
+ LOG.debug("Executing Action {}", msg.getType());
+
+ final ActorRef sender = getSender();
+
+ final ListenableFuture<? extends DOMActionResult> future;
+ try {
+ future = actionService.invokeAction(msg.getType(), msg.getPath(), msg.getInput());
+ } catch (final RuntimeException e) {
+ LOG.debug("Failed to invoke action {}", msg.getType(), e);
+ sender.tell(new Failure(e), self());
+ return;
+ }
+
+ Futures.addCallback(future, new AbstractCallback<DOMActionResult>(getSender(), msg.getType()) {
+ @Override
+ Object nullResponse(final SchemaPath type) {
+ throw new IllegalStateException("Null invocation result of action " + type);
+ }
+
+ @Override
+ Object response(final SchemaPath type, final DOMActionResult result) {
+ final Collection<? extends RpcError> errors = result.getErrors();
+ return errors.isEmpty() ? new ActionResponse(result.getOutput(), result.getErrors())
+ // This is legacy (wrong) behavior, which ignores the fact that errors may be just warnings,
+ // discarding any output
+ : new Failure(new RpcErrorsException(String.format("Execution of action %s failed", type),
+ errors));
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ private abstract class AbstractCallback<T> implements FutureCallback<T> {
+ private final ActorRef replyTo;
+ private final SchemaPath type;
+
+ AbstractCallback(final ActorRef replyTo, final SchemaPath type) {
+ this.replyTo = requireNonNull(replyTo);
+ this.type = requireNonNull(type);
+ }
+
+ @Override
+ public final void onSuccess(final T result) {
+ final Object response;
+ if (result == null) {
+ // This shouldn't happen but the FutureCallback annotates the result param with Nullable so handle null
+ // here to avoid FindBugs warning.
+ response = nullResponse(type);
+ } else {
+ response = response(type, result);
+ }
+
+ LOG.debug("Sending response for execution of {} : {}", type, response);
+ replyTo.tell(response, self());
+ }
+
+ @Override
+ public final void onFailure(final Throwable failure) {
+ LOG.debug("Failed to execute operation {}", type, failure);
+ LOG.error("Failed to execute operation {} due to {}. More details are available on DEBUG level.", type,
+ Throwables.getRootCause(failure).getMessage());
+ replyTo.tell(new Failure(failure), self());
+ }
+
+ abstract @NonNull Object nullResponse(@NonNull SchemaPath type);
+
+ abstract @NonNull Object response(@NonNull SchemaPath type, @NonNull T result);
+ }
+}
import akka.actor.ActorRef;
import java.util.Collection;
+import java.util.Set;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.mdsal.dom.api.DOMActionAvailabilityExtension;
+import org.opendaylight.mdsal.dom.api.DOMActionImplementation;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
import org.slf4j.LoggerFactory;
/**
- * A {@link DOMRpcAvailabilityListener} reacting to RPC implementations different than {@link RemoteRpcImplementation}.
- * The knowledge of such implementations is forwarded to {@link RpcRegistry}, which is responsible for advertising
- * their presence to other nodes.
+ * A {@link DOMRpcAvailabilityListener} reacting to RPC implementations different than
+ * {@link RemoteRpcImplementation} or {@link RemoteActionImplementation}.
+ * The knowledge of such implementations is forwarded to {@link RpcRegistry} and {@link ActionRegistry},
+ * which is responsible for advertising their presence to other nodes.
*/
-final class RpcListener implements DOMRpcAvailabilityListener {
- private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
+final class OpsListener implements DOMRpcAvailabilityListener, DOMActionAvailabilityExtension.AvailabilityListener {
+ private static final Logger LOG = LoggerFactory.getLogger(OpsListener.class);
private final ActorRef rpcRegistry;
+ private final ActorRef actionRegistry;
- RpcListener(final ActorRef rpcRegistry) {
+ OpsListener(final ActorRef rpcRegistry, final ActorRef actionRegistry) {
this.rpcRegistry = requireNonNull(rpcRegistry);
+ this.actionRegistry = requireNonNull(actionRegistry);
}
@Override
public boolean acceptsImplementation(final DOMRpcImplementation impl) {
return !(impl instanceof RemoteRpcImplementation);
}
+
+ @Override
+ public boolean acceptsImplementation(final DOMActionImplementation impl) {
+ return !(impl instanceof RemoteActionImplementation);
+ }
+
+ @Override
+ public void onActionsChanged(final Set<DOMActionInstance> removed, final Set<DOMActionInstance> added) {
+ LOG.debug("adding registration for [{}]", added);
+ LOG.debug("removing registration for [{}]", removed);
+ actionRegistry.tell(new ActionRegistry.Messages.UpdateActions(added, removed), ActorRef.noSender());
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import akka.actor.OneForOneStrategy;
+import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
+ * {@link OpsListener} with the local {@link DOMRpcService}.
+ */
+public class OpsManager extends AbstractUntypedActor {
+ private final DOMRpcProviderService rpcProvisionRegistry;
+ private final RemoteOpsProviderConfig config;
+ private final DOMRpcService rpcServices;
+ private DOMActionProviderService actionProvisionRegistry;
+ private DOMActionService actionService;
+
+ private ListenerRegistration<OpsListener> listenerReg;
+ private ActorRef opsInvoker;
+ private ActorRef actionRegistry;
+ private ActorRef rpcRegistry;
+ private ActorRef opsRegistrar;
+
+ OpsManager(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+ final RemoteOpsProviderConfig config, final DOMActionProviderService actionProviderService,
+ final DOMActionService actionService) {
+ this.rpcProvisionRegistry = requireNonNull(rpcProvisionRegistry);
+ this.rpcServices = requireNonNull(rpcServices);
+ this.config = requireNonNull(config);
+ this.actionProvisionRegistry = requireNonNull(actionProviderService);
+ this.actionService = requireNonNull(actionService);
+ }
+
+ public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+ final RemoteOpsProviderConfig config,
+ final DOMActionProviderService actionProviderService,
+ final DOMActionService actionService) {
+ requireNonNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
+ requireNonNull(rpcServices, "RpcService can not be null!");
+ requireNonNull(config, "RemoteOpsProviderConfig can not be null!");
+ requireNonNull(actionProviderService, "ActionProviderService can not be null!");
+ requireNonNull(actionService, "ActionService can not be null!");
+ return Props.create(OpsManager.class, rpcProvisionRegistry, rpcServices, config,
+ actionProviderService, actionService);
+ }
+
+ @Override
+ public void preStart() throws Exception {
+ super.preStart();
+
+ opsInvoker = getContext().actorOf(OpsInvoker.props(rpcServices, actionService)
+ .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+ LOG.debug("Listening for RPC invocation requests with {}", opsInvoker);
+
+ opsRegistrar = getContext().actorOf(OpsRegistrar.props(config, rpcProvisionRegistry, actionProvisionRegistry)
+ .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
+ LOG.debug("Registering remote RPCs with {}", opsRegistrar);
+
+ rpcRegistry = getContext().actorOf(RpcRegistry.props(config, opsInvoker, opsRegistrar)
+ .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+ LOG.debug("Propagating RPC information with {}", rpcRegistry);
+
+ actionRegistry = getContext().actorOf(RpcRegistry.props(config, opsInvoker, opsRegistrar)
+ .withMailbox(config.getMailBoxName()), config.getActionRegistryName());
+ LOG.debug("Propagating RPC information with {}", actionRegistry);
+
+ final OpsListener opsListener = new OpsListener(rpcRegistry, actionRegistry);
+ LOG.debug("Registering local availability listener {}", opsListener);
+ listenerReg = rpcServices.registerRpcListener(opsListener);
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ if (listenerReg != null) {
+ listenerReg.close();
+ listenerReg = null;
+ }
+
+ super.postStop();
+ }
+
+ @Override
+ protected void handleReceive(final Object message) {
+ unknownMessage(message);
+ }
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+ return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), t -> {
+ LOG.error("An exception happened actor will be resumed", t);
+ return SupervisorStrategy.resume();
+ });
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.Address;
+import akka.actor.Props;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
+import org.opendaylight.mdsal.dom.api.DOMActionImplementation;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+
+/**
+ * Actor handling registration of RPCs and Actions available on remote nodes with the local
+ * {@link DOMRpcProviderService} and {@link DOMActionProviderService}.
+ */
+final class OpsRegistrar extends AbstractUntypedActor {
+ private final Map<Address, ObjectRegistration<DOMRpcImplementation>> rpcRegs = new HashMap<>();
+ private final Map<Address, ObjectRegistration<DOMActionImplementation>> actionRegs = new HashMap<>();
+ private final DOMRpcProviderService rpcProviderService;
+ private final RemoteOpsProviderConfig config;
+ private final DOMActionProviderService actionProviderService;
+
+ OpsRegistrar(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService,
+ final DOMActionProviderService actionProviderService) {
+ this.config = requireNonNull(config);
+ this.rpcProviderService = requireNonNull(rpcProviderService);
+ this.actionProviderService = requireNonNull(actionProviderService);
+ }
+
+ public static Props props(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService,
+ final DOMActionProviderService actionProviderService) {
+ return Props.create(OpsRegistrar.class, requireNonNull(config),
+ requireNonNull(rpcProviderService, "DOMRpcProviderService cannot be null"),
+ requireNonNull(actionProviderService, "DOMActionProviderService cannot be null"));
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ rpcRegs.clear();
+ actionRegs.clear();
+
+ super.postStop();
+ }
+
+ @Override
+ protected void handleReceive(final Object message) {
+ if (message instanceof UpdateRemoteEndpoints) {
+ LOG.debug("Handling updateRemoteEndpoints message");
+ updateRemoteRpcEndpoints(((UpdateRemoteEndpoints) message).getRpcEndpoints());
+ } else if (message instanceof UpdateRemoteActionEndpoints) {
+ LOG.debug("Handling updateRemoteActionEndpoints message");
+ updateRemoteActionEndpoints(((UpdateRemoteActionEndpoints) message).getActionEndpoints());
+ } else {
+ unknownMessage(message);
+ }
+ }
+
+ private void updateRemoteRpcEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
+ /*
+ * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close
+ * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
+ * unavailability which would occur if we were to do it the other way around.
+ *
+ * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
+ * hence we register all new implementations before closing all registrations.
+ */
+ for (Entry<Address, Optional<RemoteRpcEndpoint>> e : rpcEndpoints.entrySet()) {
+ LOG.debug("Updating RPC registrations for {}", e.getKey());
+
+ final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
+ if (maybeEndpoint.isPresent()) {
+ final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
+ final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
+ rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
+ endpoint.getRpcs()));
+ } else {
+ rpcRegs.remove(e.getKey());
+ }
+ }
+ }
+
+ /**
+ * Updates the action endpoints, Adding new registrations first before removing previous registrations.
+ */
+ private void updateRemoteActionEndpoints(final Map<Address,
+ Optional<ActionRegistry.RemoteActionEndpoint>> actionEndpoints) {
+ /*
+ * Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close
+ * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
+ * unavailability which would occur if we were to do it the other way around.
+ *
+ * Note that when an Action moves from one remote node to another, we also do not want to expose the gap,
+ * hence we register all new implementations before closing all registrations.
+ */
+ for (Entry<Address, Optional<RemoteActionEndpoint>> e : actionEndpoints.entrySet()) {
+ LOG.debug("Updating Action registrations for {}", e.getKey());
+
+ final Optional<RemoteActionEndpoint> maybeEndpoint = e.getValue();
+ if (maybeEndpoint.isPresent()) {
+ final RemoteActionEndpoint endpoint = maybeEndpoint.get();
+ final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config);
+ actionRegs.put(e.getKey(),
+ actionProviderService.registerActionImplementation(impl, endpoint.getActions()));
+ } else {
+ actionRegs.remove(e.getKey());
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteAction;
+import org.opendaylight.mdsal.dom.api.DOMActionImplementation;
+import org.opendaylight.mdsal.dom.api.DOMActionResult;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link DOMActionImplementation} which routes invocation requests to a remote invoker actor.
+ */
+final class RemoteActionImplementation extends AbstractRemoteImplementation<ExecuteAction>
+ implements DOMActionImplementation {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteActionImplementation.class);
+
+ RemoteActionImplementation(final ActorRef remoteInvoker, final RemoteOpsProviderConfig config) {
+ super(remoteInvoker, config);
+ }
+
+ /**
+ * Routes action request to a remote invoker, which will execute the action and return with result.
+ */
+ @Override
+ public ListenableFuture<DOMActionResult> invokeAction(final SchemaPath type, final DOMDataTreeIdentifier path,
+ final ContainerNode input) {
+ LOG.debug("invoking action {} with path {}", type, path);
+ return new RemoteDOMActionFuture(type, ask(ExecuteAction.from(type, path, input)));
+ }
+
+ @Override
+ public long invocationCost() {
+ return COST;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import org.opendaylight.mdsal.dom.api.DOMActionException;
+
+public class RemoteDOMActionException extends DOMActionException {
+ private static final long serialVersionUID = 1L;
+
+ RemoteDOMActionException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.remote.rpc.messages.ActionResponse;
+import org.opendaylight.mdsal.dom.api.DOMActionException;
+import org.opendaylight.mdsal.dom.api.DOMActionResult;
+import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import scala.concurrent.Future;
+
+final class RemoteDOMActionFuture extends AbstractRemoteFuture<DOMActionResult, DOMActionException> {
+ RemoteDOMActionFuture(final @NonNull SchemaPath type, final @NonNull Future<Object> requestFuture) {
+ super(type, requestFuture);
+ }
+
+ @Override
+ DOMActionResult processReply(final Object reply) {
+ if (reply instanceof ActionResponse) {
+ final ActionResponse actionReply = (ActionResponse) reply;
+ final ContainerNode output = actionReply.getOutput();
+ return output == null ? new SimpleDOMActionResult(actionReply.getErrors())
+ : new SimpleDOMActionResult(output, actionReply.getErrors());
+ }
+
+ return null;
+ }
+
+ @Override
+ Class<DOMActionException> exceptionClass() {
+ return DOMActionException.class;
+ }
+
+ @Override
+ DOMActionException wrapCause(final Throwable cause) {
+ return new RemoteDOMActionException("Exception during invoking ACTION", cause);
+ }
+}
*/
package org.opendaylight.controller.remote.rpc;
-import static java.util.Objects.requireNonNull;
-
-import akka.dispatch.OnComplete;
-import com.google.common.util.concurrent.AbstractFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
import org.opendaylight.mdsal.dom.api.DOMRpcException;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import scala.concurrent.Future;
-final class RemoteDOMRpcFuture extends AbstractFuture<DOMRpcResult> {
-
- private static final Logger LOG = LoggerFactory.getLogger(RemoteDOMRpcFuture.class);
-
- private final QName rpcName;
-
- private RemoteDOMRpcFuture(final QName rpcName) {
- this.rpcName = requireNonNull(rpcName, "rpcName");
- }
-
- public static RemoteDOMRpcFuture create(final QName rpcName) {
- return new RemoteDOMRpcFuture(rpcName);
- }
-
- protected void failNow(final Throwable error) {
- LOG.debug("Failing future {} for rpc {}", this, rpcName, error);
- setException(error);
- }
-
- protected void completeWith(final Future<Object> future) {
- future.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global());
+final class RemoteDOMRpcFuture extends AbstractRemoteFuture<DOMRpcResult, DOMRpcException> {
+ RemoteDOMRpcFuture(final @NonNull SchemaPath type, final @NonNull Future<Object> requestFuture) {
+ super(type, requestFuture);
}
@Override
- public DOMRpcResult get() throws InterruptedException, ExecutionException {
- try {
- return super.get();
- } catch (ExecutionException e) {
- throw mapException(e);
- }
+ DOMRpcResult processReply(final Object reply) {
+ return reply instanceof RpcResponse ? new DefaultDOMRpcResult(((RpcResponse) reply).getOutput()) : null;
}
@Override
- public DOMRpcResult get(final long timeout, final TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- try {
- return super.get(timeout, unit);
- } catch (final ExecutionException e) {
- throw mapException(e);
- }
- }
-
- private static ExecutionException mapException(final ExecutionException ex) {
- final Throwable cause = ex.getCause();
- if (cause instanceof DOMRpcException) {
- return ex;
- }
- return new ExecutionException(ex.getMessage(),
- new RemoteDOMRpcException("Exception during invoking RPC", ex.getCause()));
+ Class<DOMRpcException> exceptionClass() {
+ return DOMRpcException.class;
}
- private final class FutureUpdater extends OnComplete<Object> {
-
- @Override
- public void onComplete(final Throwable error, final Object reply) {
- if (error != null) {
- RemoteDOMRpcFuture.this.failNow(error);
- } else if (reply instanceof RpcResponse) {
- final RpcResponse rpcReply = (RpcResponse) reply;
- final NormalizedNode<?, ?> result = rpcReply.getResultNormalizedNode();
-
- LOG.debug("Received response for rpc {}: result is {}", rpcName, result);
-
- RemoteDOMRpcFuture.this.set(new DefaultDOMRpcResult(result));
-
- LOG.debug("Future {} for rpc {} successfully completed", RemoteDOMRpcFuture.this, rpcName);
- } else {
- RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply
- + "from Akka"));
- }
- }
+ @Override
+ DOMRpcException wrapCause(final Throwable cause) {
+ return new RemoteDOMRpcException("Exception during invoking RPC", cause);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the base class which initialize all the actors, listeners and
+ * default RPc implementation so remote invocation of rpcs.
+ */
+public class RemoteOpsProvider implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteOpsProvider.class);
+
+ private final DOMRpcProviderService rpcProvisionRegistry;
+ private final RemoteOpsProviderConfig config;
+ private final ActorSystem actorSystem;
+ private final DOMRpcService rpcService;
+ private final DOMActionProviderService actionProvisionRegistry;
+ private final DOMActionService actionService;
+
+ private ActorRef opsManager;
+
+ public RemoteOpsProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry,
+ final DOMRpcService rpcService, final RemoteOpsProviderConfig config,
+ final DOMActionProviderService actionProviderService,
+ final DOMActionService actionService) {
+ this.actorSystem = requireNonNull(actorSystem);
+ this.rpcProvisionRegistry = requireNonNull(rpcProvisionRegistry);
+ this.rpcService = requireNonNull(rpcService);
+ this.config = requireNonNull(config);
+ this.actionProvisionRegistry = requireNonNull(actionProviderService);
+ this.actionService = requireNonNull(actionService);
+ }
+
+ @Override
+ public void close() {
+ if (opsManager != null) {
+ LOG.info("Stopping Ops Manager at {}", opsManager);
+ opsManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ opsManager = null;
+ }
+ }
+
+ public void start() {
+ LOG.info("Starting Remote Ops service...");
+ opsManager = actorSystem.actorOf(OpsManager.props(rpcProvisionRegistry, rpcService, config,
+ actionProvisionRegistry, actionService), config.getRpcManagerName());
+ LOG.debug("Ops Manager started at {}", opsManager);
+ }
+}
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import scala.concurrent.duration.FiniteDuration;
-public class RemoteRpcProviderConfig extends CommonConfig {
+public class RemoteOpsProviderConfig extends CommonConfig {
protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name";
protected static final String TAG_RPC_REGISTRAR_NAME = "rpc-registrar-name";
protected static final String TAG_RPC_REGISTRY_NAME = "registry-name";
+ protected static final String TAG_ACTION_REGISTRY_NAME = "action-registry-name";
protected static final String TAG_RPC_MGR_NAME = "rpc-manager-name";
protected static final String TAG_RPC_BROKER_PATH = "rpc-broker-path";
protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path";
+ protected static final String TAG_ACTION_REGISTRY_PATH = "action-registry-path";
protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path";
protected static final String TAG_ASK_DURATION = "ask-duration";
private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval";
private static final String TAG_RPC_REGISTRY_PERSISTENCE_ID = "rpc-registry-persistence-id";
+ private static final String TAG_ACTION_REGISTRY_PERSISTENCE_ID = "action-registry-persistence-id";
//locally cached values
private Timeout cachedAskDuration;
private FiniteDuration cachedGossipTickInterval;
- public RemoteRpcProviderConfig(final Config config) {
+ public RemoteOpsProviderConfig(final Config config) {
super(config);
}
return get().getString(TAG_RPC_REGISTRY_NAME);
}
+ public String getActionRegistryName() {
+ return get().getString(TAG_ACTION_REGISTRY_NAME);
+ }
+
public String getRpcManagerName() {
return get().getString(TAG_RPC_MGR_NAME);
}
return get().getString(TAG_RPC_REGISTRY_PERSISTENCE_ID);
}
+ public String getActionRegistryPath() {
+ return get().getString(TAG_ACTION_REGISTRY_PATH);
+ }
+
+ public String getActionRegistryPersistenceId() {
+ return get().getString(TAG_ACTION_REGISTRY_PERSISTENCE_ID);
+ }
+
public String getRpcManagerPath() {
return get().getString(TAG_RPC_MGR_PATH);
}
*/
@SuppressFBWarnings(value = "BC_UNCONFIRMED_CAST_OF_RETURN_VALUE",
justification = "Findbugs flags this as an unconfirmed cast of return value but the build method clearly "
- + "returns RemoteRpcProviderConfig. Perhaps it's confused b/c the build method is overloaded and "
+ + "returns RemoteOpsProviderConfig. Perhaps it's confused b/c the build method is overloaded and "
+ "and differs in return type from the base class.")
- public static RemoteRpcProviderConfig newInstance(final String actorSystemName, final boolean metricCaptureEnabled,
- final int mailboxCapacity) {
+ public static RemoteOpsProviderConfig newInstance(final String actorSystemName, final boolean metricCaptureEnabled,
+ final int mailboxCapacity) {
return new Builder(actorSystemName).metricCaptureEnabled(metricCaptureEnabled)
.mailboxCapacity(mailboxCapacity).build();
}
configHolder.put(TAG_RPC_BROKER_NAME, "broker");
configHolder.put(TAG_RPC_REGISTRAR_NAME, "registrar");
configHolder.put(TAG_RPC_REGISTRY_NAME, "registry");
+ configHolder.put(TAG_ACTION_REGISTRY_NAME, "action-registry");
configHolder.put(TAG_RPC_MGR_NAME, "rpc");
//Actor paths
configHolder.put(TAG_RPC_BROKER_PATH, "/user/rpc/broker");
configHolder.put(TAG_RPC_REGISTRY_PATH, "/user/rpc/registry");
+ configHolder.put(TAG_ACTION_REGISTRY_PATH, "/user/action/registry");
configHolder.put(TAG_RPC_MGR_PATH, "/user/rpc");
//durations
// persistence
configHolder.put(TAG_RPC_REGISTRY_PERSISTENCE_ID, "remote-rpc-registry");
+ configHolder.put(TAG_ACTION_REGISTRY_PERSISTENCE_ID, "remote-action-registry");
}
public Builder gossipTickInterval(final String interval) {
}
@Override
- public RemoteRpcProviderConfig build() {
- return new RemoteRpcProviderConfig(merge());
+ public RemoteOpsProviderConfig build() {
+ return new RemoteOpsProviderConfig(merge());
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorSystem;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+
+public final class RemoteOpsProviderFactory {
+ private RemoteOpsProviderFactory() {
+
+ }
+
+ public static RemoteOpsProvider createInstance(final DOMRpcProviderService rpcProviderService,
+ final DOMRpcService rpcService, final ActorSystem actorSystem,
+ final RemoteOpsProviderConfig config,
+ final DOMActionProviderService actionProviderService,
+ final DOMActionService actionService) {
+
+ return new RemoteOpsProvider(actorSystem, rpcProviderService, rpcService, config,
+ actionProviderService, actionService);
+ }
+}
*/
package org.opendaylight.controller.remote.rpc;
-import static java.util.Objects.requireNonNull;
-
import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
*
* @author Robert Varga
*/
-final class RemoteRpcImplementation implements DOMRpcImplementation {
- // 0 for local, 1 for binding, 2 for remote
- private static final long COST = 2;
-
- private final ActorRef remoteInvoker;
- private final Timeout askDuration;
-
- RemoteRpcImplementation(final ActorRef remoteInvoker, final RemoteRpcProviderConfig config) {
- this.remoteInvoker = requireNonNull(remoteInvoker);
- this.askDuration = config.getAskDuration();
+final class RemoteRpcImplementation extends AbstractRemoteImplementation<ExecuteRpc> implements DOMRpcImplementation {
+ RemoteRpcImplementation(final ActorRef remoteInvoker, final RemoteOpsProviderConfig config) {
+ super(remoteInvoker, config);
}
@Override
public ListenableFuture<DOMRpcResult> invokeRpc(final DOMRpcIdentifier rpc,
final NormalizedNode<?, ?> input) {
- final RemoteDOMRpcFuture ret = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent());
- ret.completeWith(Patterns.ask(remoteInvoker, ExecuteRpc.from(rpc, input), askDuration));
- return ret;
+ return new RemoteDOMRpcFuture(rpc.getType(), ask(ExecuteRpc.from(rpc, input)));
}
@Override
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import com.google.common.base.Preconditions;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is the base class which initialize all the actors, listeners and
- * default RPc implementation so remote invocation of rpcs.
- */
-public class RemoteRpcProvider implements AutoCloseable {
-
- private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
-
- private final DOMRpcProviderService rpcProvisionRegistry;
- private final RemoteRpcProviderConfig config;
- private final ActorSystem actorSystem;
- private final DOMRpcService rpcService;
-
- private ActorRef rpcManager;
-
- public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry,
- final DOMRpcService rpcService, final RemoteRpcProviderConfig config) {
- this.actorSystem = Preconditions.checkNotNull(actorSystem);
- this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
- this.rpcService = Preconditions.checkNotNull(rpcService);
- this.config = Preconditions.checkNotNull(config);
- }
-
- @Override
- public void close() {
- if (rpcManager != null) {
- LOG.info("Stopping RPC Manager at {}", rpcManager);
- rpcManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
- rpcManager = null;
- }
- }
-
- public void start() {
- LOG.info("Starting Remote RPC service...");
- rpcManager = actorSystem.actorOf(RpcManager.props(rpcProvisionRegistry, rpcService, config),
- config.getRpcManagerName());
- LOG.debug("RPC Manager started at {}", rpcManager);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorSystem;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-
-public final class RemoteRpcProviderFactory {
- private RemoteRpcProviderFactory() {
-
- }
-
- public static RemoteRpcProvider createInstance(final DOMRpcProviderService rpcProviderService,
- final DOMRpcService rpcService, final ActorSystem actorSystem, final RemoteRpcProviderConfig config) {
-
- return new RemoteRpcProvider(actorSystem, rpcProviderService, rpcService, config);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.mdsal.dom.api.DOMRpcResult;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
-/**
- * Actor receiving invocation requests from remote nodes, routing them to
- * {@link DOMRpcService#invokeRpc(SchemaPath, NormalizedNode)}.
- */
-final class RpcInvoker extends AbstractUntypedActor {
- private final DOMRpcService rpcService;
-
- private RpcInvoker(final DOMRpcService rpcService) {
- this.rpcService = Preconditions.checkNotNull(rpcService);
- }
-
- public static Props props(final DOMRpcService rpcService) {
- Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
- return Props.create(RpcInvoker.class, rpcService);
- }
-
- @Override
- protected void handleReceive(final Object message) {
- if (message instanceof ExecuteRpc) {
- executeRpc((ExecuteRpc) message);
- } else {
- unknownMessage(message);
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void executeRpc(final ExecuteRpc msg) {
- LOG.debug("Executing rpc {}", msg.getRpc());
- final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
- final ActorRef sender = getSender();
- final ActorRef self = self();
-
- final ListenableFuture<DOMRpcResult> future;
- try {
- future = rpcService.invokeRpc(schemaPath, msg.getInputNormalizedNode());
- } catch (final RuntimeException e) {
- LOG.debug("Failed to invoke RPC {}", msg.getRpc(), e);
- sender.tell(new akka.actor.Status.Failure(e), sender);
- return;
- }
-
- Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
- @Override
- public void onSuccess(final DOMRpcResult result) {
- if (result == null) {
- // This shouldn't happen but the FutureCallback annotates the result param with Nullable so
- // handle null here to avoid FindBugs warning.
- LOG.debug("Got null DOMRpcResult - sending null response for execute rpc : {}", msg.getRpc());
- sender.tell(new RpcResponse(null), self);
- return;
- }
-
- if (!result.getErrors().isEmpty()) {
- final String message = String.format("Execution of RPC %s failed", msg.getRpc());
- sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, result.getErrors())),
- self);
- } else {
- LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
- sender.tell(new RpcResponse(result.getResult()), self);
- }
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- LOG.debug("Failed to execute RPC {}", msg.getRpc(), failure);
- LOG.error("Failed to execute RPC {} due to {}. More details are available on DEBUG level.",
- msg.getRpc(), Throwables.getRootCause(failure).getMessage());
- sender.tell(new akka.actor.Status.Failure(failure), self);
- }
- }, MoreExecutors.directExecutor());
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorRef;
-import akka.actor.OneForOneStrategy;
-import akka.actor.Props;
-import akka.actor.SupervisorStrategy;
-import com.google.common.base.Preconditions;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
- * {@link RpcListener} with the local {@link DOMRpcService}.
- */
-public class RpcManager extends AbstractUntypedActor {
- private final DOMRpcProviderService rpcProvisionRegistry;
- private final RemoteRpcProviderConfig config;
- private final DOMRpcService rpcServices;
-
- private ListenerRegistration<RpcListener> listenerReg;
- private ActorRef rpcInvoker;
- private ActorRef rpcRegistry;
- private ActorRef rpcRegistrar;
-
- RpcManager(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
- final RemoteRpcProviderConfig config) {
- this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
- this.rpcServices = Preconditions.checkNotNull(rpcServices);
- this.config = Preconditions.checkNotNull(config);
- }
-
- public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
- final RemoteRpcProviderConfig config) {
- Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
- Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
- Preconditions.checkNotNull(config, "RemoteRpcProviderConfig can not be null!");
- return Props.create(RpcManager.class, rpcProvisionRegistry, rpcServices, config);
- }
-
- @Override
- public void preStart() throws Exception {
- super.preStart();
-
- rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
- .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
- LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
-
- rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
- .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
- LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
-
- rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
- .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
- LOG.debug("Propagating RPC information with {}", rpcRegistry);
-
- final RpcListener rpcListener = new RpcListener(rpcRegistry);
- LOG.debug("Registering local availabitility listener {}", rpcListener);
- listenerReg = rpcServices.registerRpcListener(rpcListener);
- }
-
- @Override
- public void postStop() throws Exception {
- if (listenerReg != null) {
- listenerReg.close();
- listenerReg = null;
- }
-
- super.postStop();
- }
-
- @Override
- protected void handleReceive(final Object message) {
- unknownMessage(message);
- }
-
- @Override
- public SupervisorStrategy supervisorStrategy() {
- return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), t -> {
- LOG.error("An exception happened actor will be resumed", t);
- return SupervisorStrategy.resume();
- });
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.Address;
-import akka.actor.Props;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
-import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-
-/**
- * Actor handling registration of RPCs available on remote nodes with the local {@link DOMRpcProviderService}.
- *
- * @author Robert Varga
- */
-final class RpcRegistrar extends AbstractUntypedActor {
- private final Map<Address, DOMRpcImplementationRegistration<?>> regs = new HashMap<>();
- private final DOMRpcProviderService rpcProviderService;
- private final RemoteRpcProviderConfig config;
-
- RpcRegistrar(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) {
- this.config = Preconditions.checkNotNull(config);
- this.rpcProviderService = Preconditions.checkNotNull(rpcProviderService);
- }
-
- public static Props props(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) {
- Preconditions.checkNotNull(rpcProviderService, "DOMRpcProviderService cannot be null");
- return Props.create(RpcRegistrar.class, config, rpcProviderService);
- }
-
- @Override
- public void postStop() throws Exception {
- regs.values().forEach(DOMRpcImplementationRegistration::close);
- regs.clear();
-
- super.postStop();
- }
-
- @Override
- protected void handleReceive(final Object message) {
- if (message instanceof UpdateRemoteEndpoints) {
- updateRemoteEndpoints(((UpdateRemoteEndpoints) message).getEndpoints());
- } else {
- unknownMessage(message);
- }
- }
-
- private void updateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
- /*
- * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close
- * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
- * unavailability which would occur if we were to do it the other way around.
- *
- * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
- * hence we register all new implementations before closing all registrations.
- */
- final Collection<DOMRpcImplementationRegistration<?>> prevRegs = new ArrayList<>(endpoints.size());
-
- for (Entry<Address, Optional<RemoteRpcEndpoint>> e : endpoints.entrySet()) {
- LOG.debug("Updating RPC registrations for {}", e.getKey());
-
- final DOMRpcImplementationRegistration<?> prevReg;
- final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
- if (maybeEndpoint.isPresent()) {
- final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
- final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
- prevReg = regs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
- endpoint.getRpcs()));
- } else {
- prevReg = regs.remove(e.getKey());
- }
-
- if (prevReg != null) {
- prevRegs.add(prevReg);
- }
- }
-
- for (DOMRpcImplementationRegistration<?> r : prevRegs) {
- r.close();
- }
- }
-}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.remote.rpc;
import akka.actor.Terminated;
LOG.debug("Created TerminationMonitor");
}
- @Override public void onReceive(Object message) {
+ @Override
+ public void onReceive(final Object message) {
if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
LOG.debug("Actor terminated : {}", terminated.actor());
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import java.io.Serializable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * An abstract base class for invocation requests. Specialized via {@link ExecuteAction} and {@link ExecuteRpc}.
+ */
+public abstract class AbstractExecute<T extends NormalizedNode<?, ?>> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final transient @NonNull SchemaPath type;
+ private final transient T input;
+
+ AbstractExecute(final @NonNull SchemaPath type, final T input) {
+ this.type = requireNonNull(type);
+ this.input = input;
+ }
+
+ public final @NonNull SchemaPath getType() {
+ return type;
+ }
+
+ public final T getInput() {
+ return input;
+ }
+
+ @Override
+ public final String toString() {
+ // We want 'type' to be always first
+ return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues().add("type", type)).toString();
+ }
+
+ ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return helper.add("input", input);
+ }
+
+ abstract Object writeReplace();
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import java.io.Serializable;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * An abstract base class for invocation responses. Specialized via {@link ActionResponse} and {@link RpcResponse}.
+ */
+public abstract class AbstractResponse<T extends NormalizedNode<?, ?>> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final transient @Nullable T output;
+
+ public AbstractResponse(final @Nullable T output) {
+ this.output = output;
+ }
+
+ public final @Nullable T getOutput() {
+ return output;
+ }
+
+ abstract Object writeReplace();
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ImmutableList;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+@SuppressFBWarnings({"SE_TRANSIENT_FIELD_NOT_RESTORED", "DMI_NONSERIALIZABLE_OBJECT_WRITTEN"})
+public class ActionResponse extends AbstractResponse<ContainerNode> {
+ private static final long serialVersionUID = 1L;
+
+ private final transient @NonNull ImmutableList<@NonNull RpcError> errors;
+
+ public ActionResponse(final @NonNull Optional<ContainerNode> output, @NonNull final Collection<RpcError> errors) {
+ super(output.orElse(null));
+ this.errors = ImmutableList.copyOf(errors);
+ }
+
+ public @NonNull ImmutableList<@NonNull RpcError> getErrors() {
+ return errors;
+ }
+
+ @Override
+ Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private ActionResponse actionResponse;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ }
+
+ Proxy(final ActionResponse actionResponse) {
+ this.actionResponse = requireNonNull(actionResponse);
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeObject(actionResponse.getErrors());
+ SerializationUtils.writeNormalizedNode(out, actionResponse.getOutput());
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ @SuppressWarnings("unchecked")
+ final ImmutableList<RpcError> errors = (ImmutableList<RpcError>) in.readObject();
+ final Optional<NormalizedNode<?, ?>> output = SerializationUtils.readNormalizedNode(in);
+ actionResponse = new ActionResponse(output.map(ContainerNode.class::cast), errors);
+ }
+
+ private Object readResolve() {
+ return actionResponse;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects.ToStringHelper;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public final class ExecuteAction extends AbstractExecute<@NonNull ContainerNode> {
+ private static final long serialVersionUID = 1128904894827335676L;
+
+ private final @NonNull DOMDataTreeIdentifier path;
+
+ private ExecuteAction(final @NonNull SchemaPath type, final @NonNull DOMDataTreeIdentifier path,
+ final @NonNull ContainerNode input) {
+ super(type, requireNonNull(input));
+ this.path = requireNonNull(path);
+ }
+
+ public static @NonNull ExecuteAction from(final @NonNull SchemaPath type, @NonNull final DOMDataTreeIdentifier path,
+ final @NonNull ContainerNode input) {
+ return new ExecuteAction(type, path, input);
+ }
+
+ public @NonNull DOMDataTreeIdentifier getPath() {
+ return path;
+ }
+
+ @Override
+ ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return super.addToStringAttributes(helper.add("path", path));
+ }
+
+ @Override
+ Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private ExecuteAction executeAction;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+
+ }
+
+ Proxy(final ExecuteAction executeAction) {
+ this.executeAction = requireNonNull(executeAction);
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ try (NormalizedNodeDataOutput stream = NormalizedNodeInputOutput.newDataOutput(out)) {
+ stream.writeSchemaPath(executeAction.getType());
+ // FIXME: deal with data store types?
+ stream.writeYangInstanceIdentifier(executeAction.getPath().getRootIdentifier());
+ stream.writeOptionalNormalizedNode(executeAction.getInput());
+ }
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException {
+ final NormalizedNodeDataInput stream = NormalizedNodeInputOutput.newDataInput(in);
+ final SchemaPath name = stream.readSchemaPath();
+ final YangInstanceIdentifier path = stream.readYangInstanceIdentifier();
+ final ContainerNode input = (ContainerNode) stream.readOptionalNormalizedNode().orElse(null);
+
+ executeAction = new ExecuteAction(name, new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, path),
+ input);
+ }
+
+ private Object readResolve() {
+ return verifyNotNull(executeAction);
+ }
+ }
+}
import static java.util.Objects.requireNonNull;
-import com.google.common.base.MoreObjects;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.io.Serializable;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
-import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-public final class ExecuteRpc implements Serializable {
+public final class ExecuteRpc extends AbstractExecute<@Nullable NormalizedNode<?, ?>> {
private static final long serialVersionUID = 1128904894827335676L;
- @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
- + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
- + "aren't serialized. FindBugs does not recognize this.")
- private final NormalizedNode<?, ?> inputNormalizedNode;
- private final QName rpc;
-
- private ExecuteRpc(final @Nullable NormalizedNode<?, ?> inputNormalizedNode, final @NonNull QName rpc) {
- this.rpc = requireNonNull(rpc, "rpc Qname should not be null");
- this.inputNormalizedNode = inputNormalizedNode;
- }
-
- public static ExecuteRpc from(final @NonNull DOMRpcIdentifier rpc, final @Nullable NormalizedNode<?, ?> input) {
- return new ExecuteRpc(input, rpc.getType().getLastComponent());
- }
-
- public @Nullable NormalizedNode<?, ?> getInputNormalizedNode() {
- return inputNormalizedNode;
+ private ExecuteRpc(final @NonNull SchemaPath type, final @Nullable NormalizedNode<?, ?> input) {
+ super(type, input);
}
- public @NonNull QName getRpc() {
- return rpc;
- }
-
- private Object writeReplace() {
- return new Proxy(this);
+ public static @NonNull ExecuteRpc from(final @NonNull DOMRpcIdentifier rpc,
+ final @Nullable NormalizedNode<?, ?> input) {
+ return new ExecuteRpc(rpc.getType(), input);
}
@Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("rpc", rpc)
- .add("normalizedNode", inputNormalizedNode)
- .toString();
+ Object writeReplace() {
+ return new Proxy(this);
}
- private static class Proxy implements Externalizable {
+ private static final class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
private ExecuteRpc executeRpc;
// redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
@SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
+
}
Proxy(final ExecuteRpc executeRpc) {
- this.executeRpc = executeRpc;
+ this.executeRpc = requireNonNull(executeRpc);
}
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
try (NormalizedNodeDataOutput stream = NormalizedNodeInputOutput.newDataOutput(out)) {
- stream.writeQName(executeRpc.getRpc());
- stream.writeOptionalNormalizedNode(executeRpc.getInputNormalizedNode());
+ stream.writeQName(executeRpc.getType().getLastComponent());
+ stream.writeOptionalNormalizedNode(executeRpc.getInput());
}
}
@Override
- public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(final ObjectInput in) throws IOException {
final NormalizedNodeDataInput stream = NormalizedNodeInputOutput.newDataInput(in);
- final QName qname = stream.readQName();
- executeRpc = new ExecuteRpc(stream.readOptionalNormalizedNode().orElse(null), qname);
+ final SchemaPath type = SchemaPath.ROOT.createChild(stream.readQName());
+ final NormalizedNode<?, ?> input = stream.readOptionalNormalizedNode().orElse(null);
+ executeRpc = new ExecuteRpc(type, input);
}
private Object readResolve() {
*/
package org.opendaylight.controller.remote.rpc.messages;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.io.Serializable;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class RpcResponse implements Serializable {
+public class RpcResponse extends AbstractResponse<NormalizedNode<?, ?>> {
private static final long serialVersionUID = -4211279498688989245L;
- @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
- + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
- + "aren't serialized. FindBugs does not recognize this.")
- private final NormalizedNode<?, ?> resultNormalizedNode;
-
- public RpcResponse(final @Nullable NormalizedNode<?, ?> inputNormalizedNode) {
- resultNormalizedNode = inputNormalizedNode;
- }
-
- public @Nullable NormalizedNode<?, ?> getResultNormalizedNode() {
- return resultNormalizedNode;
+ public RpcResponse(final @Nullable NormalizedNode<?, ?> output) {
+ super(output);
}
- private Object writeReplace() {
+ @Override
+ Object writeReplace() {
return new Proxy(this);
}
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
- SerializationUtils.writeNormalizedNode(out, rpcResponse.getResultNormalizedNode());
+ SerializationUtils.writeNormalizedNode(out, rpcResponse.getOutput());
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData;
+
+/**
+ * Common class for routing tables.
+ *
+ * @param <T> Table type
+ * @param <I> Item type
+ */
+public abstract class AbstractRoutingTable<T extends AbstractRoutingTable<T, I>, I> implements BucketData<T>,
+ Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final @NonNull ActorRef invoker;
+ private final @NonNull ImmutableSet<I> items;
+
+ AbstractRoutingTable(final ActorRef invoker, final Collection<I> items) {
+ this.invoker = requireNonNull(invoker);
+ this.items = ImmutableSet.copyOf(items);
+ }
+
+ @Override
+ public final Optional<ActorRef> getWatchActor() {
+ return Optional.of(invoker);
+ }
+
+ public final @NonNull ImmutableSet<I> getItems() {
+ return items;
+ }
+
+ final @NonNull ActorRef getInvoker() {
+ return invoker;
+ }
+
+ @VisibleForTesting
+ public final boolean contains(final I routeId) {
+ return items.contains(routeId);
+ }
+
+ @VisibleForTesting
+ public final int size() {
+ return items.size();
+ }
+
+ abstract Object writeReplace();
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).add("invoker", invoker).add("items", items).toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorRef;
+import akka.actor.Address;
+import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteActionRegistryMXBeanImpl;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+
+/**
+ * Registry to look up cluster nodes that have registered for a given Action.
+ *
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
+ * cluster wide information.
+ */
+public class ActionRegistry extends BucketStoreActor<ActionRoutingTable> {
+ private final ActorRef rpcRegistrar;
+ private final RemoteActionRegistryMXBeanImpl mxBean;
+
+ public ActionRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
+ final ActorRef rpcRegistrar) {
+ super(config, config.getRpcRegistryPersistenceId(), new ActionRoutingTable(rpcInvoker, ImmutableSet.of()));
+ this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
+ this.mxBean = new RemoteActionRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+ config.getAskDuration()), config.getAskDuration());
+ }
+
+ /**
+ * Create a new props instance for instantiating an ActionRegistry actor.
+ *
+ * @param config Provider configuration
+ * @param opsRegistrar Local RPC provider interface, used to register routers to remote nodes
+ * @param opsInvoker Actor handling RPC invocation requests from remote nodes
+ * @return A new {@link Props} instance
+ */
+ public static Props props(final RemoteOpsProviderConfig config, final ActorRef opsInvoker,
+ final ActorRef opsRegistrar) {
+ return Props.create(ActionRegistry.class, config, opsInvoker, opsRegistrar);
+ }
+
+ @Override
+ public void postStop() {
+ super.postStop();
+ this.mxBean.unregister();
+ }
+
+ @Override
+ protected void handleCommand(final Object message) throws Exception {
+ if (message instanceof ActionRegistry.Messages.UpdateActions) {
+ LOG.debug("handling updatesActionRoutes message");
+ updatesActionRoutes((Messages.UpdateActions) message);
+ } else {
+ super.handleCommand(message);
+ }
+ }
+
+ private void updatesActionRoutes(final Messages.UpdateActions msg) {
+ LOG.debug("addedActions: {}", msg.getAddedActions());
+ LOG.debug("removedActions: {}", msg.getRemovedActions());
+ updateLocalBucket(getLocalData().updateActions(msg.getAddedActions(), msg.getRemovedActions()));
+ }
+
+ @Override
+ protected void onBucketRemoved(final Address address, final Bucket<ActionRoutingTable> bucket) {
+ rpcRegistrar.tell(new Messages.UpdateRemoteActionEndpoints(ImmutableMap.of(address, Optional.empty())),
+ ActorRef.noSender());
+ }
+
+ @Override
+ protected void onBucketsUpdated(final Map<Address, Bucket<ActionRoutingTable>> buckets) {
+ LOG.debug("Updating buckets for action registry");
+ final Map<Address, Optional<RemoteActionEndpoint>> endpoints = new HashMap<>(buckets.size());
+
+ for (Map.Entry<Address, Bucket<ActionRoutingTable>> e : buckets.entrySet()) {
+ final ActionRoutingTable table = e.getValue().getData();
+
+ final Collection<DOMActionInstance> actions = table.getItems();
+ endpoints.put(e.getKey(), actions.isEmpty() ? Optional.empty()
+ : Optional.of(new RemoteActionEndpoint(table.getInvoker(), actions)));
+ }
+
+ if (!endpoints.isEmpty()) {
+ rpcRegistrar.tell(new Messages.UpdateRemoteActionEndpoints(endpoints), ActorRef.noSender());
+ }
+ }
+
+ public static final class RemoteActionEndpoint {
+ private final Set<DOMActionInstance> actions;
+ private final ActorRef router;
+
+ @VisibleForTesting
+ public RemoteActionEndpoint(final ActorRef router, final Collection<DOMActionInstance> actions) {
+ this.router = Preconditions.checkNotNull(router);
+ this.actions = ImmutableSet.copyOf(actions);
+ }
+
+ public ActorRef getRouter() {
+ return router;
+ }
+
+ public Set<DOMActionInstance> getActions() {
+ return actions;
+ }
+ }
+
+ /**
+ * All messages used by the ActionRegistry.
+ */
+ public static class Messages {
+ abstract static class AbstractActionRouteMessage {
+ final Collection<DOMActionInstance> addedActions;
+ final Collection<DOMActionInstance> removedActions;
+
+ AbstractActionRouteMessage(final Collection<DOMActionInstance> addedActions,
+ final Collection<DOMActionInstance> removedActions) {
+ this.addedActions = ImmutableList.copyOf(addedActions);
+ this.removedActions = ImmutableList.copyOf(removedActions);
+ }
+
+ Collection<DOMActionInstance> getAddedActions() {
+ return this.addedActions;
+ }
+
+ Collection<DOMActionInstance> getRemovedActions() {
+ return this.removedActions;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ContainsRoute{" + "addedActions=" + addedActions + " removedActions=" + removedActions + '}';
+ }
+ }
+
+
+ public static final class UpdateActions extends AbstractActionRouteMessage {
+ public UpdateActions(final Collection<DOMActionInstance> addedActions,
+ final Collection<DOMActionInstance> removedActions) {
+ super(addedActions, removedActions);
+ }
+
+ }
+
+ public static final class UpdateRemoteActionEndpoints {
+ private final Map<Address, Optional<RemoteActionEndpoint>> actionEndpoints;
+
+ @VisibleForTesting
+ public UpdateRemoteActionEndpoints(final Map<Address, Optional<RemoteActionEndpoint>>
+ actionEndpoints) {
+ this.actionEndpoints = ImmutableMap.copyOf(actionEndpoints);
+ }
+
+ public Map<Address, Optional<RemoteActionEndpoint>> getActionEndpoints() {
+ return actionEndpoints;
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ActionRoutingTable extends AbstractRoutingTable<ActionRoutingTable, DOMActionInstance> {
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(ActionRoutingTable.class);
+
+ @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
+ private Collection<DOMActionInstance> actions;
+ private ActorRef opsInvoker;
+
+ // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+ // be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final ActionRoutingTable table) {
+ actions = table.getItems();
+ opsInvoker = table.getInvoker();
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ LOG.debug("serializing ActionRoutingTable.");
+ out.writeObject(Serialization.serializedActorPath(opsInvoker));
+
+ final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out);
+ nnout.writeInt(actions.size());
+ for (DOMActionInstance id : actions) {
+ nnout.writeSchemaPath(id.getType());
+ YangInstanceIdentifier actionPath = YangInstanceIdentifier.create(
+ new YangInstanceIdentifier.NodeIdentifier(id.getType().getLastComponent()));
+ nnout.writeYangInstanceIdentifier(actionPath);
+ }
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ LOG.debug("deserializing ActionRoutingTable");
+ opsInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
+
+ final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in);
+ final int size = nnin.readInt();
+ actions = new ArrayList<>(size);
+ for (int i = 0; i < size; ++i) {
+ actions.add(DOMActionInstance.of(nnin.readSchemaPath(), LogicalDatastoreType.OPERATIONAL,
+ nnin.readYangInstanceIdentifier()));
+ }
+ }
+
+ private Object readResolve() {
+ return new ActionRoutingTable(opsInvoker, actions);
+ }
+ }
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(ActionRoutingTable.class);
+
+ ActionRoutingTable(final ActorRef invoker, final Collection<DOMActionInstance> actions) {
+ super(invoker, actions);
+ }
+
+ ActionRoutingTable updateActions(final Collection<DOMActionInstance> toAdd,
+ final Collection<DOMActionInstance> toRemove) {
+ LOG.debug("Updating actions in ActionRoutingTable");
+ final Set<DOMActionInstance> newActions = new HashSet<>(getItems());
+ newActions.addAll(toAdd);
+ newActions.removeAll(toRemove);
+ return new ActionRoutingTable(getInvoker(), newActions);
+ }
+
+ @Override
+ Object writeReplace() {
+ return new Proxy(this);
+ }
+}
import akka.actor.ActorRef;
import akka.serialization.JavaSerializer;
import akka.serialization.Serialization;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.Optional;
import java.util.Set;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData;
import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
-public final class RoutingTable implements BucketData<RoutingTable>, Serializable {
+public final class RoutingTable extends AbstractRoutingTable<RoutingTable, DOMRpcIdentifier> {
private static final class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
private Collection<DOMRpcIdentifier> rpcs;
- private ActorRef rpcInvoker;
+ private ActorRef opsInvoker;
// checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
// be able to create instances via reflection.
}
Proxy(final RoutingTable table) {
- rpcs = table.getRoutes();
- rpcInvoker = table.getRpcInvoker();
+ rpcs = table.getItems();
+ opsInvoker = table.getInvoker();
}
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
- out.writeObject(Serialization.serializedActorPath(rpcInvoker));
+ out.writeObject(Serialization.serializedActorPath(opsInvoker));
final NormalizedNodeDataOutput nnout = NormalizedNodeInputOutput.newDataOutput(out);
nnout.writeInt(rpcs.size());
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
- rpcInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
+ opsInvoker = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in);
final int size = nnin.readInt();
}
private Object readResolve() {
- return new RoutingTable(rpcInvoker, rpcs);
+ return new RoutingTable(opsInvoker, rpcs);
}
}
private static final long serialVersionUID = 1L;
- @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "We deal with the field in serialization methods.")
- private final Set<DOMRpcIdentifier> rpcs;
- private final ActorRef rpcInvoker;
-
- RoutingTable(final ActorRef rpcInvoker, final Collection<DOMRpcIdentifier> table) {
- this.rpcInvoker = Preconditions.checkNotNull(rpcInvoker);
- this.rpcs = ImmutableSet.copyOf(table);
- }
-
- @Override
- public Optional<ActorRef> getWatchActor() {
- return Optional.of(rpcInvoker);
- }
-
- public Set<DOMRpcIdentifier> getRoutes() {
- return rpcs;
- }
-
- ActorRef getRpcInvoker() {
- return rpcInvoker;
+ RoutingTable(final ActorRef invoker, final Collection<DOMRpcIdentifier> table) {
+ super(invoker, table);
}
RoutingTable addRpcs(final Collection<DOMRpcIdentifier> toAdd) {
- final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(rpcs);
+ final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(getItems());
newRpcs.addAll(toAdd);
- return new RoutingTable(rpcInvoker, newRpcs);
+ return new RoutingTable(getInvoker(), newRpcs);
}
RoutingTable removeRpcs(final Collection<DOMRpcIdentifier> toRemove) {
- final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(rpcs);
+ final Set<DOMRpcIdentifier> newRpcs = new HashSet<>(getItems());
newRpcs.removeAll(toRemove);
- return new RoutingTable(rpcInvoker, newRpcs);
- }
-
- private Object writeReplace() {
- return new Proxy(this);
- }
-
- @VisibleForTesting
- boolean contains(final DOMRpcIdentifier routeId) {
- return rpcs.contains(routeId);
- }
-
- @VisibleForTesting
- int size() {
- return rpcs.size();
+ return new RoutingTable(getInvoker(), newRpcs);
}
@Override
- public String toString() {
- return "RoutingTable{" + "rpcs=" + rpcs + ", rpcInvoker=" + rpcInvoker + '}';
+ Object writeReplace() {
+ return new Proxy(this);
}
}
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
private final ActorRef rpcRegistrar;
private final RemoteRpcRegistryMXBeanImpl mxBean;
- public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+ public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
* @param rpcInvoker Actor handling RPC invocation requests from remote nodes
* @return A new {@link Props} instance
*/
- public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
- final ActorRef rpcRegistrar) {
+ public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
+ final ActorRef rpcRegistrar) {
return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
}
@Override
protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
- rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
+ rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
+ ActorRef.noSender());
}
@Override
for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
final RoutingTable table = e.getValue().getData();
- final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
+ final Collection<DOMRpcIdentifier> rpcs = table.getItems();
endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
- : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
+ : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
}
if (!endpoints.isEmpty()) {
- rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+ rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
}
}
*/
public static class Messages {
abstract static class AbstractRouteMessage {
- final List<DOMRpcIdentifier> routeIdentifiers;
+ final List<DOMRpcIdentifier> rpcRouteIdentifiers;
- AbstractRouteMessage(final Collection<DOMRpcIdentifier> routeIdentifiers) {
- Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+ AbstractRouteMessage(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+ Preconditions.checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
"Route Identifiers must be supplied");
- this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers);
+ this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
}
List<DOMRpcIdentifier> getRouteIdentifiers() {
- return this.routeIdentifiers;
+ return this.rpcRouteIdentifiers;
}
@Override
public String toString() {
- return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
+ return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
}
}
- public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
- public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
- super(routeIdentifiers);
+ public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
+ public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+ super(rpcRouteIdentifiers);
}
+
}
public static final class RemoveRoutes extends AbstractRouteMessage {
- public RemoveRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
- super(routeIdentifiers);
+ public RemoveRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+ super(rpcRouteIdentifiers);
}
}
public static final class UpdateRemoteEndpoints {
- private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
+ private final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints;
+
@VisibleForTesting
- public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
- this.endpoints = ImmutableMap.copyOf(endpoints);
+ public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
+ this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
}
- public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
- return endpoints;
+ public Map<Address, Optional<RemoteRpcEndpoint>> getRpcEndpoints() {
+ return rpcEndpoints;
}
}
}
import java.util.Optional;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
/**
* A store that syncs its data across nodes in the cluster.
*/
private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
- private final RemoteRpcProviderConfig config;
+ private final RemoteOpsProviderConfig config;
private final String persistenceId;
/**
private Integer incarnation;
private boolean persisting;
- protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
+ protected BucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) {
this.config = Preconditions.checkNotNull(config);
this.initialData = Preconditions.checkNotNull(initialData);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
}
}
- protected final RemoteRpcProviderConfig getConfig() {
+ protected final RemoteOpsProviderConfig getConfig() {
return config;
}
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
import scala.concurrent.duration.FiniteDuration;
/**
};
private final boolean autoStartGossipTicks;
- private final RemoteRpcProviderConfig config;
+ private final RemoteOpsProviderConfig config;
/**
* All known cluster members.
private BucketStoreAccess bucketStore;
- Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+ Gossiper(final RemoteOpsProviderConfig config, final Boolean autoStartGossipTicks) {
this.config = Preconditions.checkNotNull(config);
this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
}
- Gossiper(final RemoteRpcProviderConfig config) {
+ Gossiper(final RemoteOpsProviderConfig config) {
this(config, Boolean.TRUE);
}
- public static Props props(final RemoteRpcProviderConfig config) {
+ public static Props props(final RemoteOpsProviderConfig config) {
return Props.create(Gossiper.class, config);
}
- static Props testProps(final RemoteRpcProviderConfig config) {
+ static Props testProps(final RemoteOpsProviderConfig config) {
return Props.create(Gossiper.class, config, Boolean.FALSE);
}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.Address;
+import akka.util.Timeout;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Map;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.remote.rpc.registry.AbstractRoutingTable;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+abstract class AbstractRegistryMXBean<T extends AbstractRoutingTable<T, I>, I> extends AbstractMXBean {
+ static final String LOCAL_CONSTANT = "local";
+ static final String ROUTE_CONSTANT = "route:";
+ static final String NAME_CONSTANT = " | name:";
+
+ @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE")
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final BucketStoreAccess bucketAccess;
+ private final FiniteDuration timeout;
+
+ AbstractRegistryMXBean(final @NonNull String beanName, final @NonNull String beanType,
+ final @NonNull BucketStoreAccess bucketAccess, final @NonNull Timeout timeout) {
+ super(beanName, beanType, null);
+ this.bucketAccess = requireNonNull(bucketAccess);
+ this.timeout = timeout.duration();
+ registerMBean();
+ }
+
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+ final T localData() {
+ try {
+ return (T) Await.result((Future) bucketAccess.getLocalData(), timeout);
+ } catch (Exception e) {
+ throw new RuntimeException("getLocalData failed", e);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+ final Map<Address, Bucket<T>> remoteBuckets() {
+ try {
+ return (Map<Address, Bucket<T>>) Await.result((Future)bucketAccess.getRemoteBuckets(), timeout);
+ } catch (Exception e) {
+ throw new RuntimeException("getRemoteBuckets failed", e);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+ final String bucketVersions() {
+ try {
+ return Await.result((Future)bucketAccess.getBucketVersions(), timeout).toString();
+ } catch (Exception e) {
+ throw new RuntimeException("getVersions failed", e);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface RemoteActionRegistryMXBean {
+
+ String getBucketVersions();
+
+ Set<String> getLocalRegisteredAction();
+
+ Map<String, String> findActionByName(String name);
+
+ Map<String, String> findActionByRoute(String route);
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import akka.actor.Address;
+import akka.util.Timeout;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.remote.rpc.registry.ActionRoutingTable;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+
+public class RemoteActionRegistryMXBeanImpl extends AbstractRegistryMXBean<ActionRoutingTable, DOMActionInstance>
+ implements RemoteActionRegistryMXBean {
+
+ public RemoteActionRegistryMXBeanImpl(final BucketStoreAccess actionRegistryAccess, final Timeout timeout) {
+ super("RemoteActionRegistry", "RemoteActionBroker", actionRegistryAccess, timeout);
+ }
+
+ @Override
+ public Set<String> getLocalRegisteredAction() {
+ ActionRoutingTable table = localData();
+ Set<String> routedAction = new HashSet<>(table.getItems().size());
+ for (DOMActionInstance route : table.getItems()) {
+ if (route.getType().getLastComponent() != null) {
+ final YangInstanceIdentifier actionPath = YangInstanceIdentifier.create(
+ new NodeIdentifier(route.getType().getLastComponent()));
+ if (!actionPath.isEmpty()) {
+ routedAction.add(ROUTE_CONSTANT + actionPath + NAME_CONSTANT + route.getType());
+ }
+ }
+ }
+
+ log.debug("Locally registered routed RPCs {}", routedAction);
+ return routedAction;
+ }
+
+ @Override
+ public Map<String, String> findActionByName(final String name) {
+ ActionRoutingTable localTable = localData();
+ // Get all Actions from local bucket
+ Map<String, String> rpcMap = new HashMap<>(getActionMemberMapByName(localTable, name, LOCAL_CONSTANT));
+
+ // Get all Actions from remote bucket
+ Map<Address, Bucket<ActionRoutingTable>> buckets = remoteBuckets();
+ for (Map.Entry<Address, Bucket<ActionRoutingTable>> entry : buckets.entrySet()) {
+ ActionRoutingTable table = entry.getValue().getData();
+ rpcMap.putAll(getActionMemberMapByName(table, name, entry.getKey().toString()));
+ }
+
+ log.debug("list of Actions {} searched by name {}", rpcMap, name);
+ return rpcMap;
+ }
+
+ @Override
+ public Map<String, String> findActionByRoute(final String routeId) {
+ ActionRoutingTable localTable = localData();
+ Map<String, String> rpcMap = new HashMap<>(getActionMemberMapByAction(localTable, routeId, LOCAL_CONSTANT));
+
+ Map<Address, Bucket<ActionRoutingTable>> buckets = remoteBuckets();
+ for (Map.Entry<Address, Bucket<ActionRoutingTable>> entry : buckets.entrySet()) {
+ ActionRoutingTable table = entry.getValue().getData();
+ rpcMap.putAll(getActionMemberMapByAction(table, routeId, entry.getKey().toString()));
+ }
+
+ log.debug("list of Actions {} searched by route {}", rpcMap, routeId);
+ return rpcMap;
+ }
+
+ /**
+ * Search if the routing table route String contains routeName.
+ */
+ private static Map<String, String> getActionMemberMapByAction(final ActionRoutingTable table,
+ final String routeName, final String address) {
+ Collection<DOMActionInstance> routes = table.getItems();
+ Map<String, String> actionMap = new HashMap<>(routes.size());
+ for (DOMActionInstance route : routes) {
+ if (route.getType().getLastComponent() != null) {
+ final YangInstanceIdentifier actionPath = YangInstanceIdentifier.create(
+ new NodeIdentifier(route.getType().getLastComponent()));
+ if (!actionPath.isEmpty()) {
+ String routeString = actionPath.toString();
+ if (routeString.contains(routeName)) {
+ actionMap.put(ROUTE_CONSTANT + routeString + NAME_CONSTANT + route.getType(), address);
+ }
+ }
+ }
+ }
+ return actionMap;
+ }
+
+ /**
+ * Search if the routing table route type contains name.
+ */
+ private static Map<String, String> getActionMemberMapByName(final ActionRoutingTable table, final String name,
+ final String address) {
+ Collection<DOMActionInstance> routes = table.getItems();
+ Map<String, String> actionMap = new HashMap<>(routes.size());
+ for (DOMActionInstance route : routes) {
+ if (route.getType().getLastComponent() != null) {
+ final YangInstanceIdentifier actionPath = YangInstanceIdentifier.create(
+ new NodeIdentifier(route.getType().getLastComponent()));
+ if (!actionPath.isEmpty()) {
+ String type = route.getType().toString();
+ if (type.contains(name)) {
+ actionMap.put(ROUTE_CONSTANT + actionPath + NAME_CONSTANT + type, address);
+ }
+ }
+ }
+ }
+ return actionMap;
+ }
+
+ @Override
+ public String getBucketVersions() {
+ return bucketVersions();
+ }
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.remote.rpc.registry.mbeans;
import java.util.Map;
Set<String> getLocalRegisteredRoutedRpc();
- Map<String,String> findRpcByName(String name);
+ Map<String, String> findRpcByName(String name);
- Map<String,String> findRpcByRoute(String route);
+ Map<String, String> findRpcByRoute(String route);
}
import akka.actor.Address;
import akka.util.Timeout;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-
-public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean {
-
- @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE")
- protected final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final String LOCAL_CONSTANT = "local";
-
- private static final String ROUTE_CONSTANT = "route:";
-
- private static final String NAME_CONSTANT = " | name:";
-
- private final BucketStoreAccess rpcRegistryAccess;
- private final Timeout timeout;
+public class RemoteRpcRegistryMXBeanImpl extends AbstractRegistryMXBean<RoutingTable, DOMRpcIdentifier>
+ implements RemoteRpcRegistryMXBean {
public RemoteRpcRegistryMXBeanImpl(final BucketStoreAccess rpcRegistryAccess, final Timeout timeout) {
- super("RemoteRpcRegistry", "RemoteRpcBroker", null);
- this.rpcRegistryAccess = rpcRegistryAccess;
- this.timeout = timeout;
- registerMBean();
- }
-
- @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
- private RoutingTable getLocalData() {
- try {
- return (RoutingTable) Await.result((Future) rpcRegistryAccess.getLocalData(), timeout.duration());
- } catch (Exception e) {
- throw new RuntimeException("getLocalData failed", e);
- }
- }
-
- @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
- private Map<Address, Bucket<RoutingTable>> getRemoteBuckets() {
- try {
- return (Map<Address, Bucket<RoutingTable>>) Await.result((Future)rpcRegistryAccess.getRemoteBuckets(),
- timeout.duration());
- } catch (Exception e) {
- throw new RuntimeException("getRemoteBuckets failed", e);
- }
+ super("RemoteRpcRegistry", "RemoteRpcBroker", rpcRegistryAccess, timeout);
}
@Override
public Set<String> getGlobalRpc() {
- RoutingTable table = getLocalData();
- Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
- for (DOMRpcIdentifier route : table.getRoutes()) {
+ RoutingTable table = localData();
+ Set<String> globalRpc = new HashSet<>(table.getItems().size());
+ for (DOMRpcIdentifier route : table.getItems()) {
if (route.getContextReference().isEmpty()) {
globalRpc.add(route.getType().toString());
}
@Override
public Set<String> getLocalRegisteredRoutedRpc() {
- RoutingTable table = getLocalData();
- Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
- for (DOMRpcIdentifier route : table.getRoutes()) {
+ RoutingTable table = localData();
+ Set<String> routedRpc = new HashSet<>(table.getItems().size());
+ for (DOMRpcIdentifier route : table.getItems()) {
if (!route.getContextReference().isEmpty()) {
routedRpc.add(ROUTE_CONSTANT + route.getContextReference() + NAME_CONSTANT + route.getType());
}
@Override
public Map<String, String> findRpcByName(final String name) {
- RoutingTable localTable = getLocalData();
+ RoutingTable localTable = localData();
// Get all RPCs from local bucket
Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
// Get all RPCs from remote bucket
- Map<Address, Bucket<RoutingTable>> buckets = getRemoteBuckets();
+ Map<Address, Bucket<RoutingTable>> buckets = remoteBuckets();
for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
RoutingTable table = entry.getValue().getData();
rpcMap.putAll(getRpcMemberMapByName(table, name, entry.getKey().toString()));
@Override
public Map<String, String> findRpcByRoute(final String routeId) {
- RoutingTable localTable = getLocalData();
+ RoutingTable localTable = localData();
Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
- Map<Address, Bucket<RoutingTable>> buckets = getRemoteBuckets();
+ Map<Address, Bucket<RoutingTable>> buckets = remoteBuckets();
for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
RoutingTable table = entry.getValue().getData();
rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, entry.getKey().toString()));
* Search if the routing table route String contains routeName.
*/
private static Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
- final String address) {
- Set<DOMRpcIdentifier> routes = table.getRoutes();
+ final String address) {
+ Set<DOMRpcIdentifier> routes = table.getItems();
Map<String, String> rpcMap = new HashMap<>(routes.size());
for (DOMRpcIdentifier route : routes) {
if (!route.getContextReference().isEmpty()) {
/**
* Search if the routing table route type contains name.
*/
- private static Map<String, String> getRpcMemberMapByName(final RoutingTable table, final String name,
- final String address) {
- Set<DOMRpcIdentifier> routes = table.getRoutes();
+ private static Map<String, String> getRpcMemberMapByName(final RoutingTable table, final String name,
+ final String address) {
+ Set<DOMRpcIdentifier> routes = table.getItems();
Map<String, String> rpcMap = new HashMap<>(routes.size());
for (DOMRpcIdentifier route : routes) {
if (!route.getContextReference().isEmpty()) {
}
@Override
- @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
public String getBucketVersions() {
- try {
- return Await.result((Future)rpcRegistryAccess.getBucketVersions(), timeout.duration()).toString();
- } catch (Exception e) {
- throw new RuntimeException("getVersions failed", e);
- }
+ return bucketVersions();
}
}
<reference id="actorSystemProvider" interface="org.opendaylight.controller.cluster.ActorSystemProvider" />
<reference id="domRpcService" interface="org.opendaylight.mdsal.dom.api.DOMRpcService"/>
<reference id="domRpcRegistry" interface="org.opendaylight.mdsal.dom.api.DOMRpcProviderService"/>
+ <reference id="domActionService" interface="org.opendaylight.mdsal.dom.api.DOMActionService"/>
+ <reference id="domActionRegistry" interface="org.opendaylight.mdsal.dom.api.DOMActionProviderService"/>
<bean id="actorSystem" factory-ref="actorSystemProvider" factory-method="getActorSystem"/>
- <bean id="remoteRpcProviderConfig" class="org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig"
+ <bean id="remoteOpsProviderConfig" class="org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig"
factory-method="newInstance">
<argument>
<bean factory-ref="actorSystem" factory-method="name"/>
<argument value="${bounded-mailbox-capacity}"/>
</bean>
- <bean id="remoteRpcProvider" class="org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory"
+ <bean id="remoteOpsProvider" class="org.opendaylight.controller.remote.rpc.RemoteOpsProviderFactory"
factory-method="createInstance" init-method="start" destroy-method="close">
<argument ref="domRpcRegistry"/>
<argument ref="domRpcService"/>
<argument ref="actorSystem"/>
- <argument ref="remoteRpcProviderConfig"/>
+ <argument ref="remoteOpsProviderConfig"/>
+ <argument ref="domActionRegistry"/>
+ <argument ref="domActionService"/>
</bean>
</blueprint>
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.remote.rpc;
import static org.junit.Assert.assertEquals;
import org.junit.BeforeClass;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.api.DOMRpcService;
*
* @author Thomas Pantelis
*/
-public class AbstractRpcTest {
+public class AbstractOpsTest {
static final String TEST_REV = "2014-08-28";
static final String TEST_NS = "urn:test";
static final URI TEST_URI = URI.create(TEST_NS);
- static final QName TEST_RPC = QName.create(TEST_NS, TEST_REV, "test-rpc");
+ static final QName TEST_RPC = QName.create(TEST_NS, TEST_REV, "test-something");
static final QName TEST_RPC_INPUT = QName.create(TEST_NS, TEST_REV, "input");
static final QName TEST_RPC_INPUT_DATA = QName.create(TEST_NS, TEST_REV, "input-data");
static final QName TEST_RPC_OUTPUT = QName.create(TEST_NS, TEST_REV, "output");
- static final QName TEST_RPC_OUTPUT_DATA = QName.create(TEST_URI, "output-data");
static final SchemaPath TEST_RPC_TYPE = SchemaPath.create(true, TEST_RPC);
static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create(
new YangInstanceIdentifier.NodeIdentifier(TEST_RPC));
public static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH);
+ public static final DOMDataTreeIdentifier TEST_DATA_TREE_ID = new DOMDataTreeIdentifier(
+ LogicalDatastoreType.OPERATIONAL, TEST_PATH);
static ActorSystem node1;
static ActorSystem node2;
- static RemoteRpcProviderConfig config1;
- static RemoteRpcProviderConfig config2;
+ static RemoteOpsProviderConfig config1;
+ static RemoteOpsProviderConfig config2;
protected ActorRef rpcInvoker1;
protected TestKit rpcRegistry1Probe;
protected SchemaContext schemaContext;
protected RemoteRpcImplementation remoteRpcImpl1;
protected RemoteRpcImplementation remoteRpcImpl2;
+ protected RemoteActionImplementation remoteActionImpl1;
+ protected RemoteActionImplementation remoteActionImpl2;
@Mock
protected DOMRpcService domRpcService1;
@Mock
+ protected DOMActionService domActionService1;
+ @Mock
protected DOMRpcService domRpcService2;
+ @Mock
+ protected DOMActionService domActionService2;
@BeforeClass
public static void setup() {
- config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
- config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+ config1 = new RemoteOpsProviderConfig.Builder("memberA").build();
+ config2 = new RemoteOpsProviderConfig.Builder("memberB").build();
node1 = ActorSystem.create("opendaylight-rpc", config1.get());
node2 = ActorSystem.create("opendaylight-rpc", config2.get());
}
@Before
public void setUp() {
- schemaContext = YangParserTestUtils.parseYangResources(AbstractRpcTest.class, "/test-rpc.yang");
+ schemaContext = YangParserTestUtils.parseYangResources(AbstractOpsTest.class, "/test-rpc.yang");
MockitoAnnotations.initMocks(this);
rpcRegistry1Probe = new TestKit(node1);
- rpcInvoker1 = node1.actorOf(RpcInvoker.props(domRpcService1));
+ rpcInvoker1 = node1.actorOf(OpsInvoker.props(domRpcService1, domActionService1));
rpcRegistry2Probe = new TestKit(node2);
- rpcInvoker2 = node2.actorOf(RpcInvoker.props(domRpcService2));
+ rpcInvoker2 = node2.actorOf(OpsInvoker.props(domRpcService2, domActionService2));
remoteRpcImpl1 = new RemoteRpcImplementation(rpcInvoker2, config1);
remoteRpcImpl2 = new RemoteRpcImplementation(rpcInvoker1, config2);
+ remoteActionImpl1 = new RemoteActionImplementation(rpcInvoker2, config1);
+ remoteActionImpl2 = new RemoteActionImplementation(rpcInvoker1, config2);
}
static void assertRpcErrorEquals(final RpcError rpcError, final ErrorSeverity severity,
- final ErrorType errorType, final String tag, final String message, final String applicationTag,
- final String info, final String causeMsg) {
+ final ErrorType errorType, final String tag, final String message,
+ final String applicationTag, final String info, final String causeMsg) {
assertEquals("getSeverity", severity, rpcError.getSeverity());
assertEquals("getErrorType", errorType, rpcError.getErrorType());
assertEquals("getTag", tag, rpcError.getTag());
public static ContainerNode makeRPCInput(final String data) {
return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_INPUT))
- .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build();
+ .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build();
}
}
static void assertFailedRpcResult(final DOMRpcResult rpcResult, final ErrorSeverity severity,
- final ErrorType errorType, final String tag, final String message, final String applicationTag,
- final String info, final String causeMsg) {
+ final ErrorType errorType, final String tag, final String message,
+ final String applicationTag, final String info, final String causeMsg) {
assertNotNull("RpcResult was null", rpcResult);
final Collection<? extends RpcError> rpcErrors = rpcResult.getErrors();
assertEquals("RpcErrors count", 1, rpcErrors.size());
}
static void assertSuccessfulRpcResult(final DOMRpcResult rpcResult,
- final NormalizedNode<? , ?> expOutput) {
+ final NormalizedNode<? , ?> expOutput) {
assertNotNull("RpcResult was null", rpcResult);
assertCompositeNodeEquals(expOutput, rpcResult.getResult());
}
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-public class RpcBrokerTest extends AbstractRpcTest {
+public class OpsBrokerTest extends AbstractOpsTest {
@Test
public void testExecuteRpc() {
final DOMRpcResult rpcResult = new DefaultDOMRpcResult(invokeRpcResult);
when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), any())).thenReturn(
FluentFutures.immediateFluentFuture(rpcResult));
+ final ExecuteRpc executeRpc = ExecuteRpc.from(TEST_RPC_ID, null);
- final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
-
- rpcInvoker1.tell(executeMsg, rpcRegistry1Probe.getRef());
+ rpcInvoker1.tell(executeRpc, rpcRegistry1Probe.getRef());
final RpcResponse rpcResponse = rpcRegistry1Probe.expectMsgClass(Duration.ofSeconds(5), RpcResponse.class);
- assertEquals(rpcResult.getResult(), rpcResponse.getResultNormalizedNode());
+ assertEquals(rpcResult.getResult(), rpcResponse.getOutput());
}
@Test
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.remote.rpc;
import akka.actor.ActorRef;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-public class RpcListenerTest {
+public class OpsListenerTest {
private static final QName TEST_QNAME = QName.create("test", "2015-06-12", "test");
private static final SchemaPath RPC_TYPE = SchemaPath.create(true, TEST_QNAME);
private static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier
.create(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME));
private static final DOMRpcIdentifier RPC_ID = DOMRpcIdentifier.create(RPC_TYPE, TEST_PATH);
+ private static final DOMActionInstance ACTION_INSTANCE = DOMActionInstance.of(RPC_TYPE,
+ LogicalDatastoreType.OPERATIONAL, TEST_PATH);
private static ActorSystem SYSTEM;
final TestKit probeReg = new TestKit(SYSTEM);
final ActorRef rpcRegistry = probeReg.getRef();
- final RpcListener rpcListener = new RpcListener(rpcRegistry);
- rpcListener.onRpcAvailable(Collections.singleton(RPC_ID));
+ final OpsListener opsListener = new OpsListener(rpcRegistry, rpcRegistry);
+ opsListener.onRpcAvailable(Collections.singleton(RPC_ID));
probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class);
}
+ @Test
+ public void testActionRouteAdd() {
+ // Test announcements
+ final TestKit probeReg = new TestKit(SYSTEM);
+ final ActorRef actionRegistry = probeReg.getRef();
+
+ final OpsListener opsListener = new OpsListener(actionRegistry, actionRegistry);
+ opsListener.onActionsChanged(Collections.emptySet(),Collections.singleton(ACTION_INSTANCE));
+ probeReg.expectMsgClass(ActionRegistry.Messages.UpdateActions.class);
+ }
+
@Test
public void testRouteRemove() {
// Test announcements
final TestKit probeReg = new TestKit(SYSTEM);
final ActorRef rpcRegistry = probeReg.getRef();
- final RpcListener rpcListener = new RpcListener(rpcRegistry);
- rpcListener.onRpcUnavailable(Collections.singleton(RPC_ID));
+ final OpsListener opsListener = new OpsListener(rpcRegistry, rpcRegistry);
+ opsListener.onRpcUnavailable(Collections.singleton(RPC_ID));
probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class);
}
+
+// @Test
+// public void testAcceptsImplementation() {
+//
+// final TestKit probeReg = new TestKit(SYSTEM);
+// final ActorRef opsRegistry = probeReg.getRef();
+//
+// final OpsListener opsListener = new OpsListener(opsRegistry, opsRegistry);
+// opsListener.acceptsImplementation()
+// }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class OpsRegistrarTest {
+ @Mock
+ private DOMRpcProviderService rpcService;
+ @Mock
+ private DOMActionProviderService actionService;
+ @Mock
+ private DOMRpcImplementationRegistration<RemoteRpcImplementation> oldReg;
+ @Mock
+ private DOMRpcImplementationRegistration<RemoteRpcImplementation> newReg;
+ @Mock
+ private ObjectRegistration<RemoteActionImplementation> oldActionReg;
+ @Mock
+ private ObjectRegistration<RemoteActionImplementation> newActionReg;
+
+ private ActorSystem system;
+ private TestActorRef<OpsRegistrar> testActorRef;
+ private Address endpointAddress;
+ private RemoteRpcEndpoint firstEndpoint;
+ private RemoteRpcEndpoint secondEndpoint;
+ private RemoteActionEndpoint firstActionEndpoint;
+ private RemoteActionEndpoint secondActionEndpoint;
+ private OpsRegistrar opsRegistrar;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ system = ActorSystem.create("test");
+
+ final TestKit testKit = new TestKit(system);
+ final RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("system").build();
+ final Props props = OpsRegistrar.props(config, rpcService, actionService);
+ testActorRef = new TestActorRef<>(system, props, testKit.getRef(), "actorRef");
+ endpointAddress = new Address("http", "local");
+
+ final DOMRpcIdentifier firstEndpointId = DOMRpcIdentifier.create(
+ SchemaPath.create(true, QName.create("first:identifier", "foo")));
+ final DOMRpcIdentifier secondEndpointId = DOMRpcIdentifier.create(
+ SchemaPath.create(true, QName.create("second:identifier", "bar")));
+ final QName firstActionQName = QName.create("first:actionIdentifier", "fooAction");
+
+ final DOMActionInstance firstActionInstance = DOMActionInstance.of(
+ SchemaPath.create(true, firstActionQName), LogicalDatastoreType.OPERATIONAL,
+ YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(firstActionQName)));
+
+ final DOMActionInstance secondActionInstance = DOMActionInstance.of(
+ SchemaPath.create(true, firstActionQName), LogicalDatastoreType.OPERATIONAL,
+ YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(firstActionQName)));
+
+ final TestKit senderKit = new TestKit(system);
+ firstEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(firstEndpointId));
+ secondEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(secondEndpointId));
+ firstActionEndpoint = new RemoteActionEndpoint(senderKit.getRef(),
+ Collections.singletonList(firstActionInstance));
+ secondActionEndpoint = new RemoteActionEndpoint(senderKit.getRef(),
+ Collections.singletonList(secondActionInstance));
+
+ doReturn(oldReg).when(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
+ eq(firstEndpoint.getRpcs()));
+ doReturn(newReg).when(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
+ eq(secondEndpoint.getRpcs()));
+
+ doReturn(oldActionReg).when(actionService).registerActionImplementation(any(RemoteActionImplementation.class),
+ eq(secondActionEndpoint.getActions()));
+ doReturn(oldActionReg).when(actionService).registerActionImplementation(any(RemoteActionImplementation.class),
+ eq(secondActionEndpoint.getActions()));
+
+ opsRegistrar = testActorRef.underlyingActor();
+ }
+
+ @After
+ public void tearDown() {
+ TestKit.shutdownActorSystem(system, true);
+ }
+
+ @Test
+ public void testHandleReceiveAddEndpoint() {
+ final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of(
+ endpointAddress, Optional.of(firstEndpoint));
+ testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+
+ verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
+ eq(firstEndpoint.getRpcs()));
+ verifyNoMoreInteractions(rpcService, oldReg, newReg);
+ }
+
+ @Test
+ public void testHandleReceiveRemoveEndpoint() {
+ final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of(
+ endpointAddress, Optional.empty());
+ testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+ verifyNoMoreInteractions(rpcService, oldReg, newReg);
+ }
+
+ @Test
+ public void testHandleReceiveUpdateRpcEndpoint() {
+ final InOrder inOrder = inOrder(rpcService, oldReg, newReg);
+
+ testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(firstEndpoint))),
+ ActorRef.noSender());
+
+ inOrder.verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
+ eq(firstEndpoint.getRpcs()));
+
+ testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(secondEndpoint))),
+ ActorRef.noSender());
+
+ inOrder.verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
+ eq(secondEndpoint.getRpcs()));
+
+
+ verifyNoMoreInteractions(rpcService, oldReg, newReg);
+ }
+
+ @Test
+ public void testHandleReceiveUpdateActionEndpoint() {
+ final InOrder inOrder = inOrder(actionService, oldActionReg, newActionReg);
+
+ testActorRef.tell(new UpdateRemoteActionEndpoints(ImmutableMap.of(endpointAddress,
+ Optional.of(firstActionEndpoint))), ActorRef.noSender());
+
+ inOrder.verify(actionService).registerActionImplementation(any(RemoteActionImplementation.class),
+ eq(firstActionEndpoint.getActions()));
+
+ testActorRef.tell(new UpdateRemoteActionEndpoints(ImmutableMap.of(endpointAddress,
+ Optional.of(secondActionEndpoint))), ActorRef.noSender());
+
+ inOrder.verify(actionService).registerActionImplementation(any(RemoteActionImplementation.class),
+ eq(secondActionEndpoint.getActions()));
+
+ // verify first registration is closed
+// inOrder.verify(oldReg).close();
+
+ verifyNoMoreInteractions(actionService, oldActionReg, newActionReg);
+ }
+}
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.opendaylight.mdsal.dom.api.DOMActionException;
+import org.opendaylight.mdsal.dom.api.DOMActionResult;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMRpcException;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
*
* @author Thomas Pantelis
*/
-public class RemoteRpcImplementationTest extends AbstractRpcTest {
+public class RemoteOpsImplementationTest extends AbstractOpsTest {
/**
* This test method invokes and executes the remote rpc.
final ContainerNode rpcOutput = makeRPCOutput("bar");
final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
+// Answer<FluentFuture<DOMRpcResult>> answer = FluentFutures.immediateFluentFuture(rpcResult);
+
final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
@SuppressWarnings({"unchecked", "rawtypes"})
final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
- (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
+ ArgumentCaptor.forClass(NormalizedNode.class);
when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
FluentFutures.immediateFluentFuture(rpcResult));
assertEquals(rpcOutput, result.getResult());
}
+ /**
+ * This test method invokes and executes the remote action.
+ */
+ @Test
+ public void testInvokeAction() throws Exception {
+ final ContainerNode actionOutput = makeRPCOutput("bar");
+ final DOMActionResult actionResult = new SimpleDOMActionResult(actionOutput, Collections.emptyList());
+ final NormalizedNode<?, ?> invokeActionInput = makeRPCInput("foo");
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ArgumentCaptor<ContainerNode> inputCaptor =
+ ArgumentCaptor.forClass(ContainerNode.class);
+ doReturn(FluentFutures.immediateFluentFuture(actionResult)).when(domActionService2).invokeAction(
+ eq(TEST_RPC_TYPE), eq(TEST_DATA_TREE_ID), inputCaptor.capture());
+ final ListenableFuture<DOMActionResult> frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE,
+ TEST_DATA_TREE_ID, (ContainerNode) invokeActionInput);
+ assertTrue(frontEndFuture instanceof RemoteDOMActionFuture);
+ final DOMActionResult result = frontEndFuture.get(5, TimeUnit.SECONDS);
+ assertEquals(actionOutput, result.getOutput().get());
+
+ }
+
/**
* This test method invokes and executes the remote rpc.
*/
assertEquals(rpcOutput, result.getResult());
}
+ /**
+ * This test method invokes and executes the remote action.
+ */
+ @Test
+ public void testInvokeActionWithNullInput() throws Exception {
+ final ContainerNode actionOutput = makeRPCOutput("bar");
+ final DOMActionResult actionResult = new SimpleDOMActionResult(actionOutput);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ArgumentCaptor<ContainerNode> inputCaptor =
+ ArgumentCaptor.forClass(ContainerNode.class);
+ doReturn(FluentFutures.immediateFluentFuture(actionResult)).when(domActionService2).invokeAction(
+ eq(TEST_RPC_TYPE), eq(TEST_DATA_TREE_ID), inputCaptor.capture());
+
+ ListenableFuture<DOMActionResult> frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE,
+ TEST_DATA_TREE_ID, actionOutput);
+ assertTrue(frontEndFuture instanceof RemoteDOMActionFuture);
+
+ final DOMActionResult result = frontEndFuture.get(5, TimeUnit.SECONDS);
+ assertEquals(actionOutput, result.getOutput().get());
+ }
+
/**
* This test method invokes and executes the remote rpc.
*/
}
}
+ /**
+ * This test method invokes and executes the remote rpc.
+ */
+ @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
+ @Test(expected = DOMActionException.class)
+ public void testInvokeActionWithRemoteFailedFuture() throws Throwable {
+ final ContainerNode invokeActionInput = makeRPCInput("foo");
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ArgumentCaptor<ContainerNode> inputCaptor =
+ ArgumentCaptor.forClass(ContainerNode.class);
+
+ when(domActionService2.invokeAction(eq(TEST_RPC_TYPE), eq(TEST_DATA_TREE_ID),
+ inputCaptor.capture())).thenReturn(FluentFutures.immediateFailedFluentFuture(
+ new RemoteDOMRpcException("Test Exception", null)));
+
+ final ListenableFuture<DOMActionResult> frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE,
+ TEST_DATA_TREE_ID, invokeActionInput);
+ assertTrue(frontEndFuture instanceof RemoteDOMActionFuture);
+
+ try {
+ frontEndFuture.get(5, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
/**
* This test method invokes and tests exceptions when akka timeout occured
* Currently ignored since this test with current config takes around 15 seconds to complete.
throw e.getCause();
}
}
+
+ /**
+ * This test method invokes remote rpc and lookup failed
+ * with runtime exception.
+ */
+ @Test(expected = DOMActionException.class)
+ @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
+ public void testInvokeActionWithLookupException() throws Throwable {
+ final ContainerNode invokeRpcInput = makeRPCInput("foo");
+
+ doThrow(new RuntimeException("test")).when(domActionService2).invokeAction(any(SchemaPath.class),
+ any(DOMDataTreeIdentifier.class), any(ContainerNode.class));
+
+ final ListenableFuture<DOMActionResult> frontEndFuture = remoteActionImpl1.invokeAction(TEST_RPC_TYPE,
+ TEST_DATA_TREE_ID, invokeRpcInput);
+ assertTrue(frontEndFuture instanceof RemoteDOMActionFuture);
+
+ try {
+ frontEndFuture.get(5, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
}
*/
package org.opendaylight.controller.remote.rpc;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedAbstractActor;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.concurrent.TimeUnit;
-import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
import scala.concurrent.duration.FiniteDuration;
-public class RemoteRpcProviderConfigTest {
+public class RemoteOpsProviderConfigTest {
@Test
public void testConfigDefaults() {
- RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test").build();
+ RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("unit-test").build();
//Assert on configurations from common config
- Assert.assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default
- Assert.assertNotNull(config.getMailBoxCapacity());
- Assert.assertNotNull(config.getMailBoxName());
- Assert.assertNotNull(config.getMailBoxPushTimeout());
+ assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default
+ assertNotNull(config.getMailBoxCapacity());
+ assertNotNull(config.getMailBoxName());
+ assertNotNull(config.getMailBoxPushTimeout());
//rest of the configurations should be set
- Assert.assertNotNull(config.getActorSystemName());
- Assert.assertNotNull(config.getRpcBrokerName());
- Assert.assertNotNull(config.getRpcBrokerPath());
- Assert.assertNotNull(config.getRpcManagerName());
- Assert.assertNotNull(config.getRpcManagerPath());
- Assert.assertNotNull(config.getRpcRegistryName());
- Assert.assertNotNull(config.getRpcRegistryPath());
- Assert.assertNotNull(config.getAskDuration());
- Assert.assertNotNull(config.getGossipTickInterval());
+ assertNotNull(config.getActorSystemName());
+ assertNotNull(config.getRpcBrokerName());
+ assertNotNull(config.getRpcBrokerPath());
+ assertNotNull(config.getRpcManagerName());
+ assertNotNull(config.getRpcManagerPath());
+ assertNotNull(config.getRpcRegistryName());
+ assertNotNull(config.getActionRegistryName());
+ assertNotNull(config.getRpcRegistryPath());
+ assertNotNull(config.getActionRegistryPath());
+ assertNotNull(config.getAskDuration());
+ assertNotNull(config.getGossipTickInterval());
}
@Test
String timeOutVal = "10ms";
FiniteDuration expectedTimeout = FiniteDuration.create(10, TimeUnit.MILLISECONDS);
- RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test")
+ RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("unit-test")
.metricCaptureEnabled(true)//enable metric capture
.mailboxCapacity(expectedCapacity)
.mailboxPushTimeout(timeOutVal)
.withConfigReader(reader)
.build();
- Assert.assertTrue(config.isMetricCaptureEnabled());
- Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
- Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+ assertTrue(config.isMetricCaptureEnabled());
+ assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+ assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
//Now check this config inside an actor
ActorSystem system = ActorSystem.create("unit-test", config.get());
ConfigTestActor actor = configTestActorTestActorRef.underlyingActor();
Config actorConfig = actor.getConfig();
- config = new RemoteRpcProviderConfig(actorConfig);
+ config = new RemoteOpsProviderConfig(actorConfig);
- Assert.assertTrue(config.isMetricCaptureEnabled());
- Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
- Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+ assertTrue(config.isMetricCaptureEnabled());
+ assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+ assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
}
public static class ConfigTestActor extends UntypedAbstractActor {
}
@Override
- public void onReceive(Object message) {
+ public void onReceive(final Object message) {
}
/**
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import akka.actor.ActorSystem;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+
+public class RemoteOpsProviderFactoryTest {
+
+ @Mock
+ private DOMRpcProviderService providerService;
+ @Mock
+ private DOMRpcService rpcService;
+ @Mock
+ private ActorSystem actorSystem;
+ @Mock
+ private RemoteOpsProviderConfig providerConfig;
+ @Mock
+ private DOMActionProviderService actionProviderService;
+ @Mock
+ private DOMActionService actionService;
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ }
+
+ @Test
+ public void testCreateInstance() {
+ Assert.assertNotNull(RemoteOpsProviderFactory
+ .createInstance(providerService, rpcService, actorSystem, providerConfig,
+ actionProviderService, actionService));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCreateInstanceMissingProvideService() {
+ RemoteOpsProviderFactory.createInstance(null, rpcService, actorSystem, providerConfig,
+ actionProviderService, actionService);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCreateInstanceMissingRpcService() {
+ RemoteOpsProviderFactory.createInstance(providerService, null, actorSystem, providerConfig,
+ actionProviderService, actionService);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCreateInstanceMissingActorSystem() {
+ RemoteOpsProviderFactory.createInstance(providerService, rpcService, null, providerConfig,
+ actionProviderService, actionService);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCreateInstanceMissingProviderConfig() {
+ RemoteOpsProviderFactory.createInstance(providerService, rpcService, actorSystem, null,
+ actionProviderService, actionService);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCreateInstanceMissingActionProvider() {
+ RemoteOpsProviderFactory.createInstance(providerService, rpcService, actorSystem, providerConfig,
+ null, actionService);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCreateInstanceMissingActionService() {
+ RemoteOpsProviderFactory.createInstance(providerService, rpcService, actorSystem, providerConfig,
+ actionProviderService, null);
+ }
+}
*/
package org.opendaylight.controller.remote.rpc;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import com.typesafe.config.ConfigFactory;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
import org.opendaylight.mdsal.dom.api.DOMRpcService;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;
-public class RemoteRpcProviderTest {
+public class RemoteOpsProviderTest {
static ActorSystem system;
- static RemoteRpcProviderConfig moduleConfig;
+ static RemoteOpsProviderConfig moduleConfig;
@BeforeClass
public static void setup() {
- moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc")
+ moduleConfig = new RemoteOpsProviderConfig.Builder("odl-cluster-rpc")
.withConfigReader(ConfigFactory::load).build();
final Config config = moduleConfig.get();
system = ActorSystem.create("odl-cluster-rpc", config);
@Test
public void testRemoteRpcProvider() throws Exception {
- try (RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(DOMRpcProviderService.class),
- mock(DOMRpcService.class), new RemoteRpcProviderConfig(system.settings().config()))) {
+ try (RemoteOpsProvider rpcProvider = new RemoteOpsProvider(system, mock(DOMRpcProviderService.class),
+ mock(DOMRpcService.class), new RemoteOpsProviderConfig(system.settings().config()),
+ mock(DOMActionProviderService.class), mock(DOMActionService.class))) {
rpcProvider.start();
-
final ActorRef actorRef = Await.result(
system.actorSelection(moduleConfig.getRpcManagerPath()).resolveOne(
FiniteDuration.create(1, TimeUnit.SECONDS)), FiniteDuration.create(2, TimeUnit.SECONDS));
- Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
+ assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
}
}
}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc;
-
-import static org.mockito.MockitoAnnotations.initMocks;
-
-import akka.actor.ActorSystem;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-
-public class RemoteRpcProviderFactoryTest {
-
- @Mock
- private DOMRpcProviderService providerService;
- @Mock
- private DOMRpcService rpcService;
- @Mock
- private ActorSystem actorSystem;
- @Mock
- private RemoteRpcProviderConfig providerConfig;
-
- @Before
- public void setUp() {
- initMocks(this);
- }
-
- @Test
- public void testCreateInstance() {
- Assert.assertNotNull(RemoteRpcProviderFactory
- .createInstance(providerService, rpcService, actorSystem, providerConfig));
- }
-
- @Test(expected = NullPointerException.class)
- public void testCreateInstanceMissingProvideService() {
- RemoteRpcProviderFactory.createInstance(null, rpcService, actorSystem, providerConfig);
- }
-
- @Test(expected = NullPointerException.class)
- public void testCreateInstanceMissingRpcService() {
- RemoteRpcProviderFactory.createInstance(providerService, null, actorSystem, providerConfig);
- }
-
- @Test(expected = NullPointerException.class)
- public void testCreateInstanceMissingActorSystem() {
- RemoteRpcProviderFactory.createInstance(providerService, rpcService, null, providerConfig);
- }
-
- @Test(expected = NullPointerException.class)
- public void testCreateInstanceMissingProviderConfig() {
- RemoteRpcProviderFactory.createInstance(providerService, rpcService, actorSystem, null);
- }
-}
*/
package org.opendaylight.controller.remote.rpc;
+import static org.junit.Assert.assertEquals;
+
import java.util.ArrayList;
import java.util.List;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.yangtools.yang.common.RpcError;
@Test
public void testGetMessage() {
- Assert.assertEquals(ERROR_MESSAGE, exception.getMessage());
+ assertEquals(ERROR_MESSAGE, exception.getMessage());
}
@Test
public void testGetRpcErrors() {
final List<RpcError> actualErrors = (List<RpcError>) exception.getRpcErrors();
- Assert.assertEquals(rpcErrors.size(), actualErrors.size());
+ assertEquals(rpcErrors.size(), actualErrors.size());
for (int i = 0; i < actualErrors.size(); i++) {
- Assert.assertEquals(rpcErrors.get(i).getApplicationTag(), actualErrors.get(i).getApplicationTag());
- Assert.assertEquals(rpcErrors.get(i).getSeverity(), actualErrors.get(i).getSeverity());
- Assert.assertEquals(rpcErrors.get(i).getMessage(), actualErrors.get(i).getMessage());
- Assert.assertEquals(rpcErrors.get(i).getErrorType(), actualErrors.get(i).getErrorType());
- Assert.assertEquals(rpcErrors.get(i).getCause(), actualErrors.get(i).getCause());
- Assert.assertEquals(rpcErrors.get(i).getInfo(), actualErrors.get(i).getInfo());
- Assert.assertEquals(rpcErrors.get(i).getTag(), actualErrors.get(i).getTag());
+ final RpcError expected = rpcErrors.get(i);
+ final RpcError actual = actualErrors.get(i);
+
+ assertEquals(expected.getApplicationTag(), actual.getApplicationTag());
+ assertEquals(expected.getSeverity(), actual.getSeverity());
+ assertEquals(expected.getMessage(), actual.getMessage());
+ assertEquals(expected.getErrorType(), actual.getErrorType());
+ assertEquals(expected.getCause(), actual.getCause());
+ assertEquals(expected.getInfo(), actual.getInfo());
+ assertEquals(expected.getTag(), actual.getTag());
}
}
-}
\ No newline at end of file
+}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.Props;
-import akka.testkit.TestActorRef;
-import akka.testkit.javadsl.TestKit;
-import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.InOrder;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
-import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
-import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
-public class RpcRegistrarTest {
- @Mock
- private DOMRpcProviderService service;
- @Mock
- private DOMRpcImplementationRegistration<RemoteRpcImplementation> oldReg;
- @Mock
- private DOMRpcImplementationRegistration<RemoteRpcImplementation> newReg;
-
- private ActorSystem system;
- private TestActorRef<RpcRegistrar> testActorRef;
- private Address endpointAddress;
- private RemoteRpcEndpoint firstEndpoint;
- private RemoteRpcEndpoint secondEndpoint;
- private RpcRegistrar rpcRegistrar;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- system = ActorSystem.create("test");
-
- final TestKit testKit = new TestKit(system);
- final RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("system").build();
- final Props props = RpcRegistrar.props(config, service);
- testActorRef = new TestActorRef<>(system, props, testKit.getRef(), "actorRef");
- endpointAddress = new Address("http", "local");
-
- final DOMRpcIdentifier firstEndpointId = DOMRpcIdentifier.create(
- SchemaPath.create(true, QName.create("first:identifier", "foo")));
- final DOMRpcIdentifier secondEndpointId = DOMRpcIdentifier.create(
- SchemaPath.create(true, QName.create("second:identifier", "bar")));
-
- final TestKit senderKit = new TestKit(system);
- firstEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(firstEndpointId));
- secondEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(secondEndpointId));
-
- Mockito.doReturn(oldReg).when(service).registerRpcImplementation(
- Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs()));
-
- Mockito.doReturn(newReg).when(service).registerRpcImplementation(
- Mockito.any(RemoteRpcImplementation.class), Mockito.eq(secondEndpoint.getRpcs()));
-
- rpcRegistrar = testActorRef.underlyingActor();
- }
-
- @After
- public void tearDown() {
- TestKit.shutdownActorSystem(system, true);
- }
-
- @Test
- public void testPostStop() throws Exception {
- testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(firstEndpoint))),
- ActorRef.noSender());
- testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(secondEndpoint))),
- ActorRef.noSender());
-
- rpcRegistrar.postStop();
-
- Mockito.verify(oldReg).close();
- Mockito.verify(newReg).close();
- }
-
- @Test
- public void testHandleReceiveAddEndpoint() {
- final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of(
- endpointAddress, Optional.of(firstEndpoint));
- testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
-
- Mockito.verify(service).registerRpcImplementation(
- Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs()));
- Mockito.verifyNoMoreInteractions(service, oldReg, newReg);
- }
-
- @Test
- public void testHandleReceiveRemoveEndpoint() {
- final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of(
- endpointAddress, Optional.empty());
- testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
- Mockito.verifyNoMoreInteractions(service, oldReg, newReg);
- }
-
- @Test
- public void testHandleReceiveUpdateEndpoint() {
- final InOrder inOrder = Mockito.inOrder(service, oldReg, newReg);
-
- testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(firstEndpoint))),
- ActorRef.noSender());
-
- // first registration
- inOrder.verify(service).registerRpcImplementation(
- Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs()));
-
- testActorRef.tell(new UpdateRemoteEndpoints(ImmutableMap.of(endpointAddress, Optional.of(secondEndpoint))),
- ActorRef.noSender());
-
- // second registration
- inOrder.verify(service).registerRpcImplementation(
- Mockito.any(RemoteRpcImplementation.class), Mockito.eq(secondEndpoint.getRpcs()));
-
- // verify first registration is closed
- inOrder.verify(oldReg).close();
-
- Mockito.verifyNoMoreInteractions(service, oldReg, newReg);
- }
-}
/*
- * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.AbstractRpcTest;
+import org.opendaylight.controller.remote.rpc.AbstractOpsTest;
-/**
- * Unit tests for ExecuteRpc.
- *
- * @author Thomas Pantelis
- */
-public class ExecuteRpcTest {
+public class ExecuteOpsTest {
@Test
- public void testSerialization() {
- ExecuteRpc expected = ExecuteRpc.from(AbstractRpcTest.TEST_RPC_ID,
- AbstractRpcTest.makeRPCInput("serialization-test"));
+ public void testOpsSerialization() {
+ ExecuteRpc expected = ExecuteRpc.from(AbstractOpsTest.TEST_RPC_ID,
+ AbstractOpsTest.makeRPCInput("serialization-test"));
ExecuteRpc actual = (ExecuteRpc) SerializationUtils.clone(expected);
- assertEquals("getRpc", expected.getRpc(), actual.getRpc());
- assertEquals("getInputNormalizedNode", expected.getInputNormalizedNode(), actual.getInputNormalizedNode());
+ assertEquals("getName", expected.getType(), actual.getType());
+ assertEquals("getInputNormalizedNode", expected.getInput(), actual.getInput());
+ assertEquals("getPath", expected.getType(), actual.getType());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.AbstractOpsTest;
+
+/**
+ * Unit tests for RpcResponse.
+ *
+ * @author Thomas Pantelis
+ */
+public class OpsResponseTest {
+
+ @Test
+ public void testSerialization() {
+ RpcResponse expectedRpc = new RpcResponse(AbstractOpsTest.makeRPCOutput("serialization-test"));
+
+ ActionResponse expectedAction = new ActionResponse(
+ Optional.of(AbstractOpsTest.makeRPCOutput("serialization-test")), Collections.emptyList());
+
+ RpcResponse actualRpc = (RpcResponse) SerializationUtils.clone(expectedRpc);
+
+ ActionResponse actualAction = (ActionResponse) SerializationUtils.clone(expectedAction);
+
+ assertEquals("getResultNormalizedNode", expectedRpc.getOutput(),
+ actualRpc.getOutput());
+
+ assertEquals("getResultNormalizedNode", expectedAction.getOutput(),
+ actualAction.getOutput());
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc.messages;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.AbstractRpcTest;
-
-/**
- * Unit tests for RpcResponse.
- *
- * @author Thomas Pantelis
- */
-public class RpcResponseTest {
-
- @Test
- public void testSerialization() {
- RpcResponse expected = new RpcResponse(AbstractRpcTest.makeRPCOutput("serialization-test"));
-
- RpcResponse actual = (RpcResponse) SerializationUtils.clone(expected);
-
- assertEquals("getResultNormalizedNode", expected.getResultNormalizedNode(), actual.getResultNormalizedNode());
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.UniqueAddress;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
+import java.net.URI;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateActions;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ActionRegistryTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ActionRegistryTest.class);
+
+ private static ActorSystem node1;
+ private static ActorSystem node2;
+ private static ActorSystem node3;
+
+ private TestKit invoker1;
+ private TestKit invoker2;
+ private TestKit invoker3;
+ private TestKit registrar1;
+ private TestKit registrar2;
+ private TestKit registrar3;
+ private ActorRef registry1;
+ private ActorRef registry2;
+ private ActorRef registry3;
+
+ private int routeIdCounter = 1;
+
+ @BeforeClass
+ public static void staticSetup() {
+ AkkaConfigurationReader reader = ConfigFactory::load;
+
+ RemoteOpsProviderConfig config1 = new RemoteOpsProviderConfig.Builder("memberA").gossipTickInterval("200ms")
+ .withConfigReader(reader).build();
+ RemoteOpsProviderConfig config2 = new RemoteOpsProviderConfig.Builder("memberB").gossipTickInterval("200ms")
+ .withConfigReader(reader).build();
+ RemoteOpsProviderConfig config3 = new RemoteOpsProviderConfig.Builder("memberC").gossipTickInterval("200ms")
+ .withConfigReader(reader).build();
+ node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+ node2 = ActorSystem.create("opendaylight-rpc", config2.get());
+ node3 = ActorSystem.create("opendaylight-rpc", config3.get());
+
+ waitForMembersUp(node1, Cluster.get(node2).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
+ waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
+ }
+
+ static void waitForMembersUp(final ActorSystem node, final UniqueAddress... addresses) {
+ Set<UniqueAddress> otherMembersSet = Sets.newHashSet(addresses);
+ Stopwatch sw = Stopwatch.createStarted();
+ while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ CurrentClusterState state = Cluster.get(node).state();
+ for (Member m : state.getMembers()) {
+ if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress())
+ && otherMembersSet.isEmpty()) {
+ return;
+ }
+ }
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+
+ fail("Member(s) " + otherMembersSet + " are not Up");
+ }
+
+ @AfterClass
+ public static void staticTeardown() {
+ TestKit.shutdownActorSystem(node1);
+ TestKit.shutdownActorSystem(node2);
+ TestKit.shutdownActorSystem(node3);
+ }
+
+ @Before
+ public void setup() {
+ invoker1 = new TestKit(node1);
+ registrar1 = new TestKit(node1);
+ registry1 = node1.actorOf(ActionRegistry.props(config(node1), invoker1.getRef(), registrar1.getRef()));
+ invoker2 = new TestKit(node2);
+ registrar2 = new TestKit(node2);
+ registry2 = node2.actorOf(ActionRegistry.props(config(node2), invoker2.getRef(), registrar2.getRef()));
+ invoker3 = new TestKit(node3);
+ registrar3 = new TestKit(node3);
+ registry3 = node3.actorOf(ActionRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef()));
+ }
+
+ private static RemoteOpsProviderConfig config(final ActorSystem node) {
+ return new RemoteOpsProviderConfig(node.settings().config());
+ }
+
+ @After
+ public void teardown() {
+ if (registry1 != null) {
+ node1.stop(registry1);
+ }
+ if (registry2 != null) {
+ node2.stop(registry2);
+ }
+ if (registry3 != null) {
+ node3.stop(registry3);
+ }
+
+ if (invoker1 != null) {
+ node1.stop(invoker1.getRef());
+ }
+ if (invoker2 != null) {
+ node2.stop(invoker2.getRef());
+ }
+ if (invoker3 != null) {
+ node3.stop(invoker3.getRef());
+ }
+
+ if (registrar1 != null) {
+ node1.stop(registrar1.getRef());
+ }
+ if (registrar2 != null) {
+ node2.stop(registrar2.getRef());
+ }
+ if (registrar3 != null) {
+ node3.stop(registrar3.getRef());
+ }
+ }
+
+ /**
+ * One node cluster. 1. Register action, ensure router can be found 2. Then remove action, ensure its
+ * deleted
+ */
+ @Test
+ public void testAddRemoveActionOnSameNode() {
+ LOG.info("testAddRemoveActionOnSameNode starting");
+
+ Address nodeAddress = node1.provider().getDefaultAddress();
+
+ // Add action on node 1
+
+ List<DOMActionInstance> addedRouteIds = createRouteIds();
+
+ registry1.tell(new ActionRegistry.Messages.UpdateActions(addedRouteIds,
+ Collections.emptyList()), ActorRef.noSender());
+
+ // Bucket store should get an update bucket message. Updated bucket contains added action.
+ final TestKit testKit = new TestKit(node1);
+
+ Map<Address, Bucket<ActionRoutingTable>> buckets = retrieveBuckets(registry1, testKit, nodeAddress);
+ verifyBucket(buckets.get(nodeAddress), addedRouteIds);
+
+ Map<Address, Long> versions = retrieveVersions(registry1, testKit);
+ assertEquals("Version for bucket " + nodeAddress, (Long) buckets.get(nodeAddress).getVersion(),
+ versions.get(nodeAddress));
+
+ // Now remove action
+ registry1.tell(new UpdateActions(Collections.emptyList(),addedRouteIds), ActorRef.noSender());
+
+ // Bucket store should get an update bucket message. Action is removed in the updated bucket
+
+ verifyEmptyBucket(testKit, registry1, nodeAddress);
+
+ LOG.info("testAddRemoveActionOnSameNode ending");
+
+ }
+
+ /**
+ * Three node cluster. 1. Register action on 1 node, ensure 2nd node gets updated 2. Remove action on
+ * 1 node, ensure 2nd node gets updated
+ */
+ @Test
+ public void testActionAddRemoveInCluster() {
+
+ LOG.info("testActionAddRemoveInCluster starting");
+
+ List<DOMActionInstance> addedRouteIds = createRouteIds();
+
+ Address node1Address = node1.provider().getDefaultAddress();
+
+ // Add action on node 1
+ registry1.tell(new UpdateActions(addedRouteIds, Collections.emptyList()), ActorRef.noSender());
+
+ // Bucket store on node2 should get a message to update its local copy of remote buckets
+ final TestKit testKit = new TestKit(node2);
+
+ Map<Address, Bucket<ActionRoutingTable>> buckets = retrieveBuckets(registry2, testKit, node1Address);
+ verifyBucket(buckets.get(node1Address), addedRouteIds);
+
+ // Now remove
+ registry1.tell(new UpdateActions(Collections.emptyList(), addedRouteIds), ActorRef.noSender());
+
+ // Bucket store on node2 should get a message to update its local copy of remote buckets.
+ // Wait for the bucket for node1 to be empty.
+
+ verifyEmptyBucket(testKit, registry2, node1Address);
+
+ LOG.info("testActionAddRemoveInCluster ending");
+ }
+
+ private void verifyEmptyBucket(final TestKit testKit, final ActorRef registry, final Address address)
+ throws AssertionError {
+ Map<Address, Bucket<ActionRoutingTable>> buckets;
+ int numTries = 0;
+ while (true) {
+ buckets = retrieveBuckets(registry1, testKit, address);
+
+ try {
+ verifyBucket(buckets.get(address), Collections.emptyList());
+ break;
+ } catch (AssertionError e) {
+ if (++numTries >= 50) {
+ throw e;
+ }
+ }
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Three node cluster. Register action on 2 nodes. Ensure 3rd gets updated.
+ */
+ @Test
+ public void testActionAddedOnMultiNodes() {
+ final TestKit testKit = new TestKit(node3);
+
+ // Add action on node 1
+ List<DOMActionInstance> addedRouteIds1 = createRouteIds();
+ registry1.tell(new UpdateActions(addedRouteIds1, Collections.emptyList()), ActorRef.noSender());
+
+ final UpdateRemoteActionEndpoints req1 = registrar3.expectMsgClass(Duration.ofSeconds(3),
+ UpdateRemoteActionEndpoints.class);
+
+ // Add action on node 2
+ List<DOMActionInstance> addedRouteIds2 = createRouteIds();
+ registry2.tell(new UpdateActions(addedRouteIds2, Collections.emptyList()), ActorRef.noSender());
+
+ final UpdateRemoteActionEndpoints req2 = registrar3.expectMsgClass(Duration.ofSeconds(3),
+ UpdateRemoteActionEndpoints.class);
+ Address node2Address = node2.provider().getDefaultAddress();
+ Address node1Address = node1.provider().getDefaultAddress();
+
+ Map<Address, Bucket<ActionRoutingTable>> buckets = retrieveBuckets(registry3, testKit, node1Address,
+ node2Address);
+
+ verifyBucket(buckets.get(node1Address), addedRouteIds1);
+ verifyBucket(buckets.get(node2Address), addedRouteIds2);
+
+ Map<Address, Long> versions = retrieveVersions(registry3, testKit);
+ assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(),
+ versions.get(node1Address));
+ assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(),
+ versions.get(node2Address));
+
+ assertEndpoints(req1, node1Address, invoker1);
+ assertEndpoints(req2, node2Address, invoker2);
+
+ }
+
+ private static void assertEndpoints(final ActionRegistry.Messages.UpdateRemoteActionEndpoints msg,
+ final Address address, final TestKit invoker) {
+ final Map<Address, Optional<RemoteActionEndpoint>> endpoints = msg.getActionEndpoints();
+ assertEquals(1, endpoints.size());
+
+ final Optional<RemoteActionEndpoint> maybeEndpoint = endpoints.get(address);
+ assertNotNull(maybeEndpoint);
+ assertTrue(maybeEndpoint.isPresent());
+
+ final RemoteActionEndpoint endpoint = maybeEndpoint.get();
+ final ActorRef router = endpoint.getRouter();
+ assertNotNull(router);
+
+ router.tell("hello", ActorRef.noSender());
+ final String s = invoker.expectMsgClass(Duration.ofSeconds(3), String.class);
+ assertEquals("hello", s);
+ }
+
+ private static Map<Address, Long> retrieveVersions(final ActorRef bucketStore, final TestKit testKit) {
+ bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef());
+ @SuppressWarnings("unchecked")
+ final Map<Address, Long> reply = testKit.expectMsgClass(Duration.ofSeconds(3), Map.class);
+ return reply;
+ }
+
+ private static void verifyBucket(final Bucket<ActionRoutingTable> bucket,
+ final List<DOMActionInstance> expRouteIds) {
+ ActionRoutingTable table = bucket.getData();
+ assertNotNull("Bucket ActionRoutingTable is null", table);
+ for (DOMActionInstance r : expRouteIds) {
+ if (!table.contains(r)) {
+ fail("ActionRoutingTable does not contain " + r + ". Actual: " + table);
+ }
+ }
+
+ assertEquals("ActionRoutingTable size", expRouteIds.size(), table.size());
+ }
+
+ private static Map<Address, Bucket<ActionRoutingTable>> retrieveBuckets(
+ final ActorRef bucketStore, final TestKit testKit, final Address... addresses) {
+ int numTries = 0;
+ while (true) {
+ bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef());
+ @SuppressWarnings("unchecked")
+ Map<Address, Bucket<ActionRoutingTable>> buckets = testKit.expectMsgClass(Duration.ofSeconds(3),
+ Map.class);
+
+ boolean foundAll = true;
+ for (Address addr : addresses) {
+ Bucket<ActionRoutingTable> bucket = buckets.get(addr);
+ if (bucket == null) {
+ foundAll = false;
+ break;
+ }
+ }
+
+ if (foundAll) {
+ return buckets;
+ }
+
+ if (++numTries >= 50) {
+ fail("Missing expected buckets for addresses: " + Arrays.toString(addresses)
+ + ", Actual: " + buckets);
+ }
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Test
+ public void testAddRoutesConcurrency() {
+ final TestKit testKit = new TestKit(node1);
+
+ final int nRoutes = 500;
+ final Collection<DOMActionInstance> added = new ArrayList<>(nRoutes);
+ for (int i = 0; i < nRoutes; i++) {
+ QName type = QName.create(URI.create("/mockaction"), "mockaction" + routeIdCounter++);
+ final DOMActionInstance routeId = DOMActionInstance.of(SchemaPath.create(true,
+ type), LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.create(new
+ YangInstanceIdentifier.NodeIdentifier(type)));
+ added.add(routeId);
+
+ //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ registry1.tell(new UpdateActions(Arrays.asList(routeId), Collections.emptyList()),
+ ActorRef.noSender());
+ }
+
+ int numTries = 0;
+ while (true) {
+ registry1.tell(GET_ALL_BUCKETS, testKit.getRef());
+ @SuppressWarnings("unchecked")
+ Map<Address, Bucket<ActionRoutingTable>> buckets = testKit.expectMsgClass(Duration.ofSeconds(3),
+ Map.class);
+
+ Bucket<ActionRoutingTable> localBucket = buckets.values().iterator().next();
+ ActionRoutingTable table = localBucket.getData();
+ if (table != null && table.size() == nRoutes) {
+ for (DOMActionInstance r : added) {
+ assertTrue("ActionRoutingTable contains " + r, table.contains(r));
+ }
+
+ break;
+ }
+
+ if (++numTries >= 50) {
+ fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
+ }
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private List<DOMActionInstance> createRouteIds() {
+ QName type = QName.create(URI.create("/mockaction"), "mockaction" + routeIdCounter++);
+ List<DOMActionInstance> routeIds = new ArrayList<>(1);
+ routeIds.add(DOMActionInstance.of(SchemaPath.create(true, type),
+ LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.create(
+ new YangInstanceIdentifier.NodeIdentifier(type))));
+ return routeIds;
+ }
+}
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
public static void staticSetup() {
AkkaConfigurationReader reader = ConfigFactory::load;
- RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms")
+ RemoteOpsProviderConfig config1 = new RemoteOpsProviderConfig.Builder("memberA").gossipTickInterval("200ms")
.withConfigReader(reader).build();
- RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").gossipTickInterval("200ms")
+ RemoteOpsProviderConfig config2 = new RemoteOpsProviderConfig.Builder("memberB").gossipTickInterval("200ms")
.withConfigReader(reader).build();
- RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").gossipTickInterval("200ms")
+ RemoteOpsProviderConfig config3 = new RemoteOpsProviderConfig.Builder("memberC").gossipTickInterval("200ms")
.withConfigReader(reader).build();
node1 = ActorSystem.create("opendaylight-rpc", config1.get());
node2 = ActorSystem.create("opendaylight-rpc", config2.get());
registry3 = node3.actorOf(RpcRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef()));
}
- private static RemoteRpcProviderConfig config(final ActorSystem node) {
- return new RemoteRpcProviderConfig(node.settings().config());
+ private static RemoteOpsProviderConfig config(final ActorSystem node) {
+ return new RemoteOpsProviderConfig(node.settings().config());
}
@After
}
private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address, final TestKit invoker) {
- final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = msg.getEndpoints();
+ final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = msg.getRpcEndpoints();
assertEquals(1, endpoints.size());
final Optional<RemoteRpcEndpoint> maybeEndpoint = endpoints.get(address);
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
public class BucketStoreTest {
*/
private static BucketStoreActor<T> createStore() {
final Props props = Props.create(TestingBucketStoreActor.class,
- new RemoteRpcProviderConfig(system.settings().config()), "testing-store",new T());
+ new RemoteOpsProviderConfig(system.settings().config()), "testing-store",new T());
return TestActorRef.<BucketStoreActor<T>>create(system, props, "testStore").underlyingActor();
}
private static final class TestingBucketStoreActor extends BucketStoreActor<T> {
- protected TestingBucketStoreActor(final RemoteRpcProviderConfig config,
+ protected TestingBucketStoreActor(final RemoteOpsProviderConfig config,
final String persistenceId,
final T initialData) {
super(config, persistenceId, initialData);
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
* @return instance of Gossiper class
*/
private static Gossiper createGossiper() {
- final RemoteRpcProviderConfig config =
- new RemoteRpcProviderConfig.Builder("unit-test")
+ final RemoteOpsProviderConfig config =
+ new RemoteOpsProviderConfig.Builder("unit-test")
.withConfigReader(ConfigFactory::load).build();
final Props props = Gossiper.testProps(config);
final TestActorRef<Gossiper> testRef = TestActorRef.create(system, props, "testGossiper");
--- /dev/null
+/*
+ * Copyright (c) 2019 Nordix Foundation. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
+import akka.util.Timeout;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionInstance;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class RemoteActionRegistryMXBeanImplTest {
+
+ private static final QName LOCAL_QNAME = QName.create("base", "local");
+ private static final SchemaPath EMPTY_SCHEMA_PATH = SchemaPath.ROOT;
+ private static final SchemaPath LOCAL_SCHEMA_PATH = SchemaPath.create(true, LOCAL_QNAME);
+
+ private ActorSystem system;
+ private TestActorRef<ActionRegistry> testActor;
+ private List<DOMActionInstance> buckets;
+ private RemoteActionRegistryMXBeanImpl mxBean;
+
+ @Before
+ public void setUp() {
+ system = ActorSystem.create("test");
+
+ final DOMActionInstance emptyActionIdentifier = DOMActionInstance.of(
+ EMPTY_SCHEMA_PATH, LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY);
+ final DOMActionInstance localActionIdentifier = DOMActionInstance.of(
+ LOCAL_SCHEMA_PATH, LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(LOCAL_QNAME));
+
+ buckets = Lists.newArrayList(emptyActionIdentifier, localActionIdentifier);
+
+ final RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("system").build();
+ final TestKit invoker = new TestKit(system);
+ final TestKit registrar = new TestKit(system);
+ final TestKit supervisor = new TestKit(system);
+ final Props props = ActionRegistry.props(config, invoker.getRef(), registrar.getRef())
+ .withDispatcher(Dispatchers.DefaultDispatcherId());
+ testActor = new TestActorRef<>(system, props, supervisor.getRef(), "testActor");
+
+ final Timeout timeout = Timeout.apply(10, TimeUnit.SECONDS);
+ mxBean = new RemoteActionRegistryMXBeanImpl(new BucketStoreAccess(testActor, system.dispatcher(), timeout),
+ timeout);
+ }
+
+ @After
+ public void tearDown() {
+ TestKit.shutdownActorSystem(system, Boolean.TRUE);
+ }
+
+ @Test
+ public void testGetLocalRegisteredRoutedActionEmptyBuckets() {
+ final Set<String> localRegisteredRoutedAction = mxBean.getLocalRegisteredAction();
+
+ Assert.assertNotNull(localRegisteredRoutedAction);
+ Assert.assertTrue(localRegisteredRoutedAction.isEmpty());
+ }
+
+ @Test
+ public void testGetLocalRegisteredRoutedAction() {
+ testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets),
+ Collections.emptyList()), ActorRef.noSender());
+ final Set<String> localRegisteredRoutedAction = mxBean.getLocalRegisteredAction();
+
+ Assert.assertNotNull(localRegisteredRoutedAction);
+ Assert.assertEquals(1, localRegisteredRoutedAction.size());
+
+ final String localAction = localRegisteredRoutedAction.iterator().next();
+ Assert.assertTrue(localAction.contains(LOCAL_QNAME.toString()));
+ Assert.assertTrue(localAction.contains(LOCAL_SCHEMA_PATH.toString()));
+ }
+
+ @Test
+ public void testFindActionByNameEmptyBuckets() {
+ final Map<String, String> rpcByName = mxBean.findActionByName("");
+
+ Assert.assertNotNull(rpcByName);
+ Assert.assertTrue(rpcByName.isEmpty());
+ }
+
+ @Test
+ public void testFindActionByName() {
+ testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets),
+ Collections.emptyList()), ActorRef.noSender());
+ final Map<String, String> rpcByName = mxBean.findActionByName("");
+
+ Assert.assertNotNull(rpcByName);
+ Assert.assertEquals(1, rpcByName.size());
+ Assert.assertTrue(rpcByName.containsValue(LOCAL_QNAME.getLocalName()));
+ }
+
+ @Test
+ public void testFindActionByRouteEmptyBuckets() {
+ final Map<String, String> rpcByRoute = mxBean.findActionByRoute("");
+
+ Assert.assertNotNull(rpcByRoute);
+ Assert.assertTrue(rpcByRoute.isEmpty());
+ }
+
+ @Test
+ public void testFindActionByRoute() {
+ testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets),
+ Collections.emptyList()), ActorRef.noSender());
+ final Map<String, String> rpcByRoute = mxBean.findActionByRoute("");
+
+ Assert.assertNotNull(rpcByRoute);
+ Assert.assertEquals(1, rpcByRoute.size());
+ Assert.assertTrue(rpcByRoute.containsValue(LOCAL_QNAME.getLocalName()));
+ }
+
+ @Test
+ public void testGetBucketVersionsEmptyBuckets() {
+ final String bucketVersions = mxBean.getBucketVersions();
+ Assert.assertEquals(Collections.emptyMap().toString(), bucketVersions);
+ }
+
+ @Test
+ public void testGetBucketVersions() {
+ testActor.tell(new ActionRegistry.Messages.UpdateActions(Lists.newArrayList(buckets),
+ Collections.emptyList()), ActorRef.noSender());
+ final String bucketVersions = mxBean.getBucketVersions();
+
+ Assert.assertTrue(bucketVersions.contains(testActor.provider().getDefaultAddress().toString()));
+ }
+}
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
buckets = Lists.newArrayList(emptyRpcIdentifier, localRpcIdentifier);
- final RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("system").build();
+ final RemoteOpsProviderConfig config = new RemoteOpsProviderConfig.Builder("system").build();
final TestKit invoker = new TestKit(system);
final TestKit registrar = new TestKit(system);
final TestKit supervisor = new TestKit(system);