/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.controller.remote.rpc;
-import static akka.pattern.Patterns.ask;
-
import akka.actor.ActorRef;
-import akka.dispatch.OnComplete;
-import akka.japi.Pair;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import java.util.List;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
-import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-public class RemoteRpcImplementation implements DOMRpcImplementation {
- private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
+/**
+ * A {@link DOMRpcImplementation} which routes invocation requests to a remote invoker actor.
+ *
+ * @author Robert Varga
+ */
+final class RemoteRpcImplementation implements DOMRpcImplementation {
+ // 0 for local, 1 for binding, 2 for remote
+ private static final long COST = 2;
- private final ActorRef rpcRegistry;
- private final RemoteRpcProviderConfig config;
+ private final ActorRef remoteInvoker;
+ private final Timeout askDuration;
- public RemoteRpcImplementation(final ActorRef rpcRegistry, final RemoteRpcProviderConfig config) {
- this.config = config;
- this.rpcRegistry = rpcRegistry;
+ RemoteRpcImplementation(final ActorRef remoteInvoker, final RemoteRpcProviderConfig config) {
+ this.remoteInvoker = Preconditions.checkNotNull(remoteInvoker);
+ this.askDuration = config.getAskDuration();
}
@Override
public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final DOMRpcIdentifier rpc,
final NormalizedNode<?, ?> input) {
- if (input instanceof RemoteRpcInput) {
- LOG.warn("Rpc {} was removed during execution or there is loop present. Failing received rpc.", rpc);
- return Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(
- new DOMRpcImplementationNotAvailableException(
- "Rpc implementation for %s was removed during processing.", rpc));
- }
- final RemoteDOMRpcFuture frontEndFuture = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent());
- findRouteAsync(rpc).onComplete(new OnComplete<FindRoutersReply>() {
-
- @Override
- public void onComplete(final Throwable error, final FindRoutersReply routes) throws Throwable {
- if (error != null) {
- frontEndFuture.failNow(error);
- } else {
- final List<Pair<ActorRef, Long>> routePairs = routes.getRouterWithUpdateTime();
- if (routePairs == null || routePairs.isEmpty()) {
- frontEndFuture.failNow(new DOMRpcImplementationNotAvailableException(
- "No local or remote implementation available for rpc %s", rpc.getType()));
- } else {
- final ActorRef remoteImplRef = new LatestEntryRoutingLogic(routePairs).select();
- final Object executeRpcMessage = ExecuteRpc.from(rpc, input);
- LOG.debug("Found remote actor {} for rpc {} - sending {}", remoteImplRef, rpc.getType(),
- executeRpcMessage);
- frontEndFuture.completeWith(ask(remoteImplRef, executeRpcMessage, config.getAskDuration()));
- }
- }
- }
- }, ExecutionContext.Implicits$.MODULE$.global());
- return frontEndFuture;
+ final RemoteDOMRpcFuture ret = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent());
+ ret.completeWith(Patterns.ask(remoteInvoker, ExecuteRpc.from(rpc, input), askDuration));
+ return ret;
}
- @SuppressWarnings({"unchecked", "rawtypes"})
- private Future<FindRoutersReply> findRouteAsync(final DOMRpcIdentifier rpc) {
- // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly.
- final RpcRouter.RouteIdentifier<?, ?, ?> routeId =
- new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
- final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
- return (Future) ask(rpcRegistry, findMsg, config.getAskDuration());
+ @Override
+ public long invocationCost() {
+ return COST;
}
}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import java.util.Collection;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-class RemoteRpcInput implements ContainerNode {
-
- private final ContainerNode delegate;
-
- private RemoteRpcInput(final ContainerNode delegate) {
- this.delegate = delegate;
- }
-
- protected static RemoteRpcInput from(@Nullable final NormalizedNode<?, ?> node) {
- if (node == null) {
- return null;
- }
-
- Preconditions.checkArgument(node instanceof ContainerNode);
- return new RemoteRpcInput((ContainerNode) node);
- }
-
- ContainerNode delegate() {
- return delegate;
- }
-
- @Override
- public Map<QName, String> getAttributes() {
- return delegate().getAttributes();
- }
-
- @Override
- public Object getAttributeValue(final QName name) {
- return delegate().getAttributeValue(name);
- }
-
- @Override
- public QName getNodeType() {
- return delegate().getNodeType();
- }
-
- @Override
- public Collection<DataContainerChild<? extends PathArgument, ?>> getValue() {
- return delegate().getValue();
- }
-
- @Override
- public NodeIdentifier getIdentifier() {
- return delegate().getIdentifier();
- }
-
- @Override
- public Optional<DataContainerChild<? extends PathArgument, ?>> getChild(final PathArgument child) {
- return delegate().getChild(child);
- }
-
- @Override
- public int hashCode() {
- return delegate().hashCode();
- }
-
- @Override
- public boolean equals(final Object obj) {
- return delegate().equals(obj);
- }
-}
package org.opendaylight.controller.remote.rpc;
-
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This is the base class which initialize all the actors, listeners and
* default RPc implementation so remote invocation of rpcs.
*/
-public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContextListener {
+public class RemoteRpcProvider implements AutoCloseable, Provider {
private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
private final DOMRpcProviderService rpcProvisionRegistry;
-
- private ListenerRegistration<SchemaContextListener> schemaListenerRegistration;
+ private final RemoteRpcProviderConfig config;
private final ActorSystem actorSystem;
- private SchemaService schemaService;
+
private DOMRpcService rpcService;
- private SchemaContext schemaContext;
+ private SchemaService schemaService;
private ActorRef rpcManager;
- private final RemoteRpcProviderConfig config;
public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry,
final RemoteRpcProviderConfig config) {
this.config = Preconditions.checkNotNull(config);
}
- public void setRpcService(DOMRpcService rpcService) {
+ public void setRpcService(final DOMRpcService rpcService) {
this.rpcService = rpcService;
}
- public void setSchemaService(SchemaService schemaService) {
+ public void setSchemaService(final SchemaService schemaService) {
this.schemaService = schemaService;
}
@Override
- public void close() throws Exception {
- if (schemaListenerRegistration != null) {
- schemaListenerRegistration.close();
- schemaListenerRegistration = null;
+ public void close() {
+ if (rpcManager != null) {
+ LOG.info("Stopping RPC Manager at {}", rpcManager);
+ rpcManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ rpcManager = null;
}
}
@Override
public void onSessionInitiated(final Broker.ProviderSession session) {
- schemaService = session.getService(SchemaService.class);
rpcService = session.getService(DOMRpcService.class);
start();
}
@Override
public Collection<ProviderFunctionality> getProviderFunctionality() {
- return null;
+ return ImmutableSet.of();
}
public void start() {
- LOG.info("Starting remote rpc service...");
-
- schemaContext = schemaService.getGlobalContext();
- rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, rpcProvisionRegistry, rpcService, config),
+ LOG.info("Starting Remote RPC service...");
+ rpcManager = actorSystem.actorOf(RpcManager.props(rpcProvisionRegistry, rpcService, config),
config.getRpcManagerName());
- schemaListenerRegistration = schemaService.registerSchemaContextListener(this);
- LOG.debug("rpc manager started");
- }
-
- @Override
- public void onGlobalContextUpdated(final SchemaContext newSchemaContext) {
- this.schemaContext = newSchemaContext;
- rpcManager.tell(new UpdateSchemaContext(newSchemaContext), null);
+ LOG.debug("RPC Manager started at {}", rpcManager);
}
}
public class RemoteRpcProviderConfig extends CommonConfig {
protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name";
+ protected static final String TAG_RPC_REGISTRAR_NAME = "rpc-registrar-name";
protected static final String TAG_RPC_REGISTRY_NAME = "registry-name";
protected static final String TAG_RPC_MGR_NAME = "rpc-manager-name";
protected static final String TAG_RPC_BROKER_PATH = "rpc-broker-path";
private Timeout cachedAskDuration;
private FiniteDuration cachedGossipTickInterval;
- public RemoteRpcProviderConfig(Config config) {
+ public RemoteRpcProviderConfig(final Config config) {
super(config);
}
return get().getString(TAG_RPC_BROKER_NAME);
}
+ public String getRpcRegistrarName() {
+ return get().getString(TAG_RPC_REGISTRAR_NAME);
+ }
+
public String getRpcRegistryName() {
return get().getString(TAG_RPC_REGISTRY_NAME);
}
justification = "Findbugs flags this as an unconfirmed cast of return value but the build method clearly "
+ "returns RemoteRpcProviderConfig. Perhaps it's confused b/c the build method is overloaded and "
+ "and differs in return type from the base class.")
- public static RemoteRpcProviderConfig newInstance(String actorSystemName, boolean metricCaptureEnabled,
- int mailboxCapacity) {
+ public static RemoteRpcProviderConfig newInstance(final String actorSystemName, final boolean metricCaptureEnabled,
+ final int mailboxCapacity) {
return new Builder(actorSystemName).metricCaptureEnabled(metricCaptureEnabled)
.mailboxCapacity(mailboxCapacity).build();
}
public static class Builder extends CommonConfig.Builder<Builder> {
- public Builder(String actorSystemName) {
+ public Builder(final String actorSystemName) {
super(actorSystemName);
//Actor names
configHolder.put(TAG_RPC_BROKER_NAME, "broker");
+ configHolder.put(TAG_RPC_REGISTRAR_NAME, "registrar");
configHolder.put(TAG_RPC_REGISTRY_NAME, "registry");
configHolder.put(TAG_RPC_MGR_NAME, "rpc");
}
- public Builder gossipTickInterval(String interval) {
+ public Builder gossipTickInterval(final String interval) {
configHolder.put(TAG_GOSSIP_TICK_INTERVAL, interval);
return this;
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
-/**
- * Actor to initiate execution of remote RPC on other nodes of the cluster.
- */
-
-public class RpcBroker extends AbstractUntypedActor {
- private final DOMRpcService rpcService;
-
- private RpcBroker(final DOMRpcService rpcService) {
- this.rpcService = rpcService;
- }
-
- public static Props props(final DOMRpcService rpcService) {
- Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
- return Props.create(RpcBroker.class, rpcService);
- }
-
- @Override
- protected void handleReceive(final Object message) throws Exception {
- if (message instanceof ExecuteRpc) {
- executeRpc((ExecuteRpc) message);
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void executeRpc(final ExecuteRpc msg) {
- LOG.debug("Executing rpc {}", msg.getRpc());
- final NormalizedNode<?, ?> input = RemoteRpcInput.from(msg.getInputNormalizedNode());
- final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
- final ActorRef sender = getSender();
- final ActorRef self = self();
-
- try {
- final CheckedFuture<DOMRpcResult, DOMRpcException> future = rpcService.invokeRpc(schemaPath, input);
-
- Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
- @Override
- public void onSuccess(final DOMRpcResult result) {
- if (result == null) {
- // This shouldn't happen but the FutureCallback annotates the result param with Nullable so
- // handle null here to avoid FindBugs warning.
- LOG.debug("Got null DOMRpcResult - sending null response for execute rpc : {}", msg.getRpc());
- sender.tell(new RpcResponse(null), self);
- return;
- }
-
- if (!result.getErrors().isEmpty()) {
- final String message = String.format("Execution of RPC %s failed", msg.getRpc());
- sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message,
- result.getErrors())), self);
- } else {
- LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
-
- sender.tell(new RpcResponse(result.getResult()), self);
- }
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- LOG.error(
- "executeRpc for {} failed with root cause: {}. For exception details, enable Debug logging.",
- msg.getRpc(), Throwables.getRootCause(failure));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Detailed exception for execute RPC failure :{}", failure);
- }
- sender.tell(new akka.actor.Status.Failure(failure), self);
- }
- });
- } catch (final RuntimeException e) {
- sender.tell(new akka.actor.Status.Failure(e), sender);
- }
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Actor receiving invocation requests from remote nodes, routing them to
+ * {@link DOMRpcService#invokeRpc(SchemaPath, NormalizedNode)}.
+ */
+final class RpcInvoker extends AbstractUntypedActor {
+ private final DOMRpcService rpcService;
+
+ private RpcInvoker(final DOMRpcService rpcService) {
+ this.rpcService = Preconditions.checkNotNull(rpcService);
+ }
+
+ public static Props props(final DOMRpcService rpcService) {
+ Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
+ return Props.create(RpcInvoker.class, rpcService);
+ }
+
+ @Override
+ protected void handleReceive(final Object message) {
+ if (message instanceof ExecuteRpc) {
+ executeRpc((ExecuteRpc) message);
+ } else {
+ unknownMessage(message);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void executeRpc(final ExecuteRpc msg) {
+ LOG.debug("Executing rpc {}", msg.getRpc());
+ final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> future;
+ try {
+ future = rpcService.invokeRpc(schemaPath, msg.getInputNormalizedNode());
+ } catch (final RuntimeException e) {
+ LOG.debug("Failed to invoke RPC {}", msg.getRpc(), e);
+ sender.tell(new akka.actor.Status.Failure(e), sender);
+ return;
+ }
+
+ Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
+ @Override
+ public void onSuccess(final DOMRpcResult result) {
+ if (result == null) {
+ // This shouldn't happen but the FutureCallback annotates the result param with Nullable so
+ // handle null here to avoid FindBugs warning.
+ LOG.debug("Got null DOMRpcResult - sending null response for execute rpc : {}", msg.getRpc());
+ sender.tell(new RpcResponse(null), self);
+ return;
+ }
+
+ if (!result.getErrors().isEmpty()) {
+ final String message = String.format("Execution of RPC %s failed", msg.getRpc());
+ sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, result.getErrors())),
+ self);
+ } else {
+ LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
+ sender.tell(new RpcResponse(result.getResult()), self);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ LOG.debug("Failed to execute RPC {}", msg.getRpc(), failure);
+ LOG.error("Failed to execute RPC {} due to {}. More details are available on DEBUG level.",
+ msg.getRpc(), Throwables.getRootCause(failure));
+ sender.tell(new akka.actor.Status.Failure(failure), self);
+ }
+ });
+ }
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.remote.rpc;
-
import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RpcListener implements DOMRpcAvailabilityListener {
-
+/**
+ * A {@link DOMRpcAvailabilityListener} reacting to RPC implementations different than {@link RemoteRpcImplementation}.
+ * The knowledge of such implementations is forwarded to {@link RpcRegistry}, which is responsible for advertising
+ * their presence to other nodes.
+ */
+final class RpcListener implements DOMRpcAvailabilityListener {
private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
+
private final ActorRef rpcRegistry;
- public RpcListener(final ActorRef rpcRegistry) {
- this.rpcRegistry = rpcRegistry;
+ RpcListener(final ActorRef rpcRegistry) {
+ this.rpcRegistry = Preconditions.checkNotNull(rpcRegistry);
}
@Override
public void onRpcAvailable(@Nonnull final Collection<DOMRpcIdentifier> rpcs) {
Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding registration for [{}]", rpcs);
- }
- final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
+ LOG.debug("Adding registration for [{}]", rpcs);
+
+ final List<RouteIdentifier<?,?,?>> routeIds = new ArrayList<>(rpcs.size());
for (final DOMRpcIdentifier rpc : rpcs) {
- final RpcRouter.RouteIdentifier<?,?,?> routeId =
+ // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly.
+ final RouteIdentifier<?,?,?> routeId =
new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
routeIds.add(routeId);
}
- final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds);
- rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
+ rpcRegistry.tell(new AddOrUpdateRoutes(routeIds), ActorRef.noSender());
}
@Override
LOG.debug("Removing registration for [{}]", rpcs);
- final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
+ final List<RouteIdentifier<?,?,?>> routeIds = new ArrayList<>(rpcs.size());
for (final DOMRpcIdentifier rpc : rpcs) {
- final RpcRouter.RouteIdentifier<?,?,?> routeId =
- new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
- routeIds.add(routeId);
+ routeIds.add(new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference()));
}
- final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds);
- rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
+ rpcRegistry.tell(new RemoveRoutes(routeIds), ActorRef.noSender());
+ }
+
+ @Override
+ public boolean acceptsImplementation(final DOMRpcImplementation impl) {
+ return !(impl instanceof RemoteRpcImplementation);
}
}
package org.opendaylight.controller.remote.rpc;
-
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
-import akka.actor.SupervisorStrategy.Directive;
-import akka.japi.Function;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
-import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import scala.concurrent.duration.Duration;
/**
- * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also starts
- * the rpc listeners
+ * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
+ * {@link RpcListener} with the local {@link DOMRpcService}.
*/
-
public class RpcManager extends AbstractUntypedActor {
- private SchemaContext schemaContext;
- private ActorRef rpcBroker;
- private ActorRef rpcRegistry;
- private final RemoteRpcProviderConfig config;
- private RpcListener rpcListener;
- private RemoteRpcImplementation rpcImplementation;
private final DOMRpcProviderService rpcProvisionRegistry;
+ private final RemoteRpcProviderConfig config;
private final DOMRpcService rpcServices;
- private RpcManager(final SchemaContext schemaContext,
- final DOMRpcProviderService rpcProvisionRegistry,
- final DOMRpcService rpcSevices,
- final RemoteRpcProviderConfig config) {
- this.schemaContext = schemaContext;
- this.rpcProvisionRegistry = rpcProvisionRegistry;
- rpcServices = rpcSevices;
- this.config = config;
+ private ListenerRegistration<RpcListener> listenerReg;
+ private ActorRef rpcInvoker;
+ private ActorRef rpcRegistry;
+ private ActorRef rpcRegistrar;
- createRpcActors();
- startListeners();
+ private RpcManager(final DOMRpcProviderService rpcProvisionRegistry,
+ final DOMRpcService rpcServices,
+ final RemoteRpcProviderConfig config) {
+ this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
+ this.rpcServices = Preconditions.checkNotNull(rpcServices);
+ this.config = Preconditions.checkNotNull(config);
}
-
- public static Props props(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry,
- final DOMRpcService rpcServices, final RemoteRpcProviderConfig config) {
- Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
+ public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+ final RemoteRpcProviderConfig config) {
Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
- return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
- }
-
- private void createRpcActors() {
- LOG.debug("Create rpc registry and broker actors");
-
- rpcRegistry = getContext().actorOf(RpcRegistry.props(config)
- .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
-
- rpcBroker = getContext().actorOf(RpcBroker.props(rpcServices)
- .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
-
- final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
- rpcRegistry.tell(localRouter, self());
+ Preconditions.checkNotNull(config, "RemoteRpcProviderConfig can not be null!");
+ return Props.create(RpcManager.class, rpcProvisionRegistry, rpcServices, config);
}
- private void startListeners() {
- LOG.debug("Registers rpc listeners");
+ @Override
+ public void preStart() throws Exception {
+ super.preStart();
- rpcListener = new RpcListener(rpcRegistry);
- rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
+ rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
+ .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+ LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
- rpcServices.registerRpcListener(rpcListener);
+ rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
+ .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
+ LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
- registerRoutedRpcDelegate();
- announceSupportedRpcs();
- }
+ rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
+ .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+ LOG.debug("Propagating RPC information with {}", rpcRegistry);
- private void registerRoutedRpcDelegate() {
- final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
- final Set<Module> modules = schemaContext.getModules();
- for (final Module module : modules) {
- for (final RpcDefinition rpcDefinition : module.getRpcs()) {
- if (RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
- LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
- rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
- }
- }
- }
- rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
+ final RpcListener rpcListener = new RpcListener(rpcRegistry);
+ LOG.debug("Registering local availabitility listener {}", rpcListener);
+ listenerReg = rpcServices.registerRpcListener(rpcListener);
}
- /**
- * Add all the locally registered RPCs in the clustered routing table.
- */
- private void announceSupportedRpcs() {
- LOG.debug("Adding all supported rpcs to routing table");
- final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
- final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
- for (final RpcDefinition rpcDef : currentlySupportedRpc) {
- rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
+ @Override
+ public void postStop() throws Exception {
+ if (listenerReg != null) {
+ listenerReg.close();
+ listenerReg = null;
}
- if (!rpcs.isEmpty()) {
- rpcListener.onRpcAvailable(rpcs);
- }
+ super.postStop();
}
-
@Override
- protected void handleReceive(final Object message) throws Exception {
- if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
- }
- }
-
- private void updateSchemaContext(final UpdateSchemaContext message) {
- schemaContext = message.getSchemaContext();
- registerRoutedRpcDelegate();
- rpcBroker.tell(message, ActorRef.noSender());
+ protected void handleReceive(final Object message) {
+ unknownMessage(message);
}
@Override
public SupervisorStrategy supervisorStrategy() {
- return new OneForOneStrategy(10, Duration.create("1 minute"), (Function<Throwable, Directive>) t -> {
+ return new OneForOneStrategy(10, Duration.create("1 minute"), t -> {
LOG.error("An exception happened actor will be resumed", t);
-
return SupervisorStrategy.resume();
});
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.Address;
+import akka.actor.Props;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
+
+/**
+ * Actor handling registration of RPCs available on remote nodes with the local {@link DOMRpcProviderService}.
+ *
+ * @author Robert Varga
+ */
+final class RpcRegistrar extends AbstractUntypedActor {
+ private final Map<Address, DOMRpcImplementationRegistration<?>> regs = new HashMap<>();
+ private final DOMRpcProviderService rpcProviderService;
+ private final RemoteRpcProviderConfig config;
+
+ RpcRegistrar(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) {
+ this.config = Preconditions.checkNotNull(config);
+ this.rpcProviderService = Preconditions.checkNotNull(rpcProviderService);
+ }
+
+ public static Props props(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) {
+ Preconditions.checkNotNull(rpcProviderService, "DOMRpcProviderService cannot be null");
+ return Props.create(RpcRegistrar.class, config, rpcProviderService);
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ regs.values().forEach(DOMRpcImplementationRegistration::close);
+ regs.clear();
+
+ super.postStop();
+ }
+
+ @Override
+ protected void handleReceive(final Object message) throws Exception {
+ if (message instanceof UpdateRemoteEndpoints) {
+ updateRemoteEndpoints(((UpdateRemoteEndpoints) message).getEndpoints());
+ } else {
+ unknownMessage(message);
+ }
+ }
+
+ private void updateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
+ /*
+ * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close
+ * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
+ * unavailability which would occur if we were to do it the other way around.
+ *
+ * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
+ * hence we register all new implementations before closing all registrations.
+ */
+ final Collection<DOMRpcImplementationRegistration<?>> prevRegs = new ArrayList<>(endpoints.size());
+
+ for (Entry<Address, Optional<RemoteRpcEndpoint>> e : endpoints.entrySet()) {
+ LOG.debug("Updating RPC registrations for {}", e.getKey());
+
+ final DOMRpcImplementationRegistration<?> prevReg;
+ final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
+ if (maybeEndpoint.isPresent()) {
+ final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
+ final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
+ prevReg = regs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
+ endpoint.getRpcs()));
+ } else {
+ prevReg = regs.remove(e.getKey());
+ }
+
+ if (prevReg != null) {
+ prevRegs.add(prevReg);
+ }
+ }
+
+ for (DOMRpcImplementationRegistration<?> r : prevRegs) {
+ r.close();
+ }
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc.messages;
-
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
-public class UpdateSchemaContext {
- private final SchemaContext schemaContext;
-
- public UpdateSchemaContext(final SchemaContext schemaContext) {
- this.schemaContext = schemaContext;
- }
-
- public SchemaContext getSchemaContext() {
- return schemaContext;
- }
-}
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
-import akka.japi.Option;
-import akka.japi.Pair;
+import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
public class RoutingTable implements Copier<RoutingTable>, Serializable {
private static final long serialVersionUID = 5592610415175278760L;
- private final Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
- private ActorRef router;
+ private final Map<RouteIdentifier<?, ?, ?>, Long> table;
+ private final ActorRef router;
- @Override
- public RoutingTable copy() {
- RoutingTable copy = new RoutingTable();
- copy.table.putAll(table);
- copy.setRouter(this.getRouter());
-
- return copy;
+ private RoutingTable(final ActorRef router, final Map<RouteIdentifier<?, ?, ?>, Long> table) {
+ this.router = Preconditions.checkNotNull(router);
+ this.table = Preconditions.checkNotNull(table);
}
- public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
- Long updatedTime = table.get(routeId);
+ RoutingTable(final ActorRef router) {
+ this(router, new HashMap<>());
+ }
- if (updatedTime == null || router == null) {
- return Option.none();
- } else {
- return Option.option(new Pair<>(router, updatedTime));
- }
+ @Override
+ public RoutingTable copy() {
+ return new RoutingTable(router, new HashMap<>(table));
}
public Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRoutes() {
return table.keySet();
}
- public void addRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ public void addRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
table.put(routeId, System.currentTimeMillis());
}
- public void removeRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ public void removeRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
table.remove(routeId);
}
- public boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ public boolean contains(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
return table.containsKey(routeId);
}
return router;
}
- public void setRouter(ActorRef router) {
- this.router = router;
- }
-
@Override
public String toString() {
return "RoutingTable{" + "table=" + table + ", router=" + router + '}';
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
-import akka.actor.Cancellable;
+import akka.actor.Address;
import akka.actor.Props;
-import akka.japi.Option;
-import akka.japi.Pair;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-import scala.concurrent.duration.FiniteDuration;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
/**
- * Registry to look up cluster nodes that have registered for a given rpc.
+ * Registry to look up cluster nodes that have registered for a given RPC.
*
* <p>
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
public class RpcRegistry extends BucketStore<RoutingTable> {
- private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
- private final FiniteDuration findRouterTimeout;
+ private final ActorRef rpcRegistrar;
- public RpcRegistry(RemoteRpcProviderConfig config) {
- super(config);
- getLocalBucket().setData(new RoutingTable());
- findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
+ public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+ super(config, new RoutingTable(rpcInvoker));
+ this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
}
- public static Props props(RemoteRpcProviderConfig config) {
- return Props.create(RpcRegistry.class, config);
+ /**
+ * Create a new props instance for instantiating an RpcRegistry actor.
+ *
+ * @param config Provider configuration
+ * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+ * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+ * @return A new {@link Props} instance
+ */
+ public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
+ final ActorRef rpcRegistrar) {
+ return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
}
@Override
- protected void handleReceive(Object message) throws Exception {
- //TODO: if sender is remote, reject message
-
- if (message instanceof SetLocalRouter) {
- receiveSetLocalRouter((SetLocalRouter) message);
- } else if (message instanceof AddOrUpdateRoutes) {
+ protected void handleReceive(final Object message) throws Exception {
+ if (message instanceof AddOrUpdateRoutes) {
receiveAddRoutes((AddOrUpdateRoutes) message);
} else if (message instanceof RemoveRoutes) {
receiveRemoveRoutes((RemoveRoutes) message);
- } else if (message instanceof Messages.FindRouters) {
- receiveGetRouter((FindRouters) message);
- } else if (message instanceof Runnable) {
- ((Runnable)message).run();
} else {
super.handleReceive(message);
}
}
- /**
- * Registers a rpc broker.
- *
- * @param message contains {@link akka.actor.ActorRef} for rpc broker
- */
- private void receiveSetLocalRouter(SetLocalRouter message) {
- getLocalBucket().getData().setRouter(message.getRouter());
- }
-
- private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
- log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+ private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+ LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
RoutingTable table = getLocalBucket().getData().copy();
for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
}
updateLocalBucket(table);
-
- onBucketsUpdated();
}
/**
*
* @param msg contains list of route ids to remove
*/
- private void receiveRemoveRoutes(RemoveRoutes msg) {
-
+ private void receiveRemoveRoutes(final RemoveRoutes msg) {
RoutingTable table = getLocalBucket().getData().copy();
for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
table.removeRoute(routeId);
updateLocalBucket(table);
}
- /**
- * Finds routers for the given rpc.
- *
- * @param findRouters the FindRouters request
- */
- private void receiveGetRouter(final FindRouters findRouters) {
- log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
-
- final ActorRef sender = getSender();
- if (!findRouters(findRouters, sender)) {
- log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
- findRouterTimeout.toMillis());
-
- final AtomicReference<Cancellable> timer = new AtomicReference<>();
- final Runnable routesUpdatedRunnable = new Runnable() {
- @Override
- public void run() {
- if (findRouters(findRouters, sender)) {
- routesUpdatedCallbacks.remove(this);
- timer.get().cancel();
- }
- }
- };
-
- routesUpdatedCallbacks.add(routesUpdatedRunnable);
-
- Runnable timerRunnable = () -> {
- log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
-
- routesUpdatedCallbacks.remove(routesUpdatedRunnable);
- sender.tell(new Messages.FindRoutersReply(
- Collections.<Pair<ActorRef, Long>>emptyList()), self());
- };
-
- timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
- getContext().dispatcher(), self()));
- }
+ @Override
+ protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
+ rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
}
- private boolean findRouters(FindRouters findRouters, ActorRef sender) {
- List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-
- RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
- findRoutes(getLocalBucket().getData(), routeId, routers);
+ @Override
+ protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+ final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
+
+ for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+ final RoutingTable table = e.getValue().getData();
+
+ final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
+ for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
+ if (ri instanceof RouteIdentifierImpl) {
+ final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
+ rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
+ } else {
+ LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
+ }
+ }
- for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
- findRoutes(bucket.getData(), routeId, routers);
+ endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+ : Optional.of(new RemoteRpcEndpoint(table.getRouter(), rpcs)));
}
- log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
-
- boolean foundRouters = !routers.isEmpty();
- if (foundRouters) {
- sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+ if (!endpoints.isEmpty()) {
+ rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
}
-
- return foundRouters;
}
- private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
- List<Pair<ActorRef, Long>> routers) {
- if (table == null) {
- return;
- }
+ public static final class RemoteRpcEndpoint {
+ private final Set<DOMRpcIdentifier> rpcs;
+ private final ActorRef router;
- Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
- if (!routerWithUpdateTime.isEmpty()) {
- routers.add(routerWithUpdateTime.get());
+ RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+ this.router = Preconditions.checkNotNull(router);
+ this.rpcs = ImmutableSet.copyOf(rpcs);
}
- }
- @Override
- protected void onBucketsUpdated() {
- if (routesUpdatedCallbacks.isEmpty()) {
- return;
+ public ActorRef getRouter() {
+ return router;
}
- for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
- callBack.run();
+ public Set<DOMRpcIdentifier> getRpcs() {
+ return rpcs;
}
}
* All messages used by the RpcRegistry.
*/
public static class Messages {
-
-
- public static class ContainsRoute {
+ abstract static class AbstractRouteMessage {
final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
- public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ AbstractRouteMessage(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
"Route Identifiers must be supplied");
this.routeIdentifiers = routeIdentifiers;
}
- public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
return this.routeIdentifiers;
}
}
}
- public static class AddOrUpdateRoutes extends ContainsRoute {
-
- public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
+ public AddOrUpdateRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
super(routeIdentifiers);
}
}
- public static class RemoveRoutes extends ContainsRoute {
-
- public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ public static final class RemoveRoutes extends AbstractRouteMessage {
+ public RemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
super(routeIdentifiers);
}
}
- public static class SetLocalRouter {
- private final ActorRef router;
-
- public SetLocalRouter(ActorRef router) {
- Preconditions.checkArgument(router != null, "Router must not be null");
- this.router = router;
- }
-
- public ActorRef getRouter() {
- return this.router;
- }
-
- @Override
- public String toString() {
- return "SetLocalRouter{" + "router=" + router + '}';
- }
- }
-
- public static class FindRouters {
- private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
-
- public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
- Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
- this.routeIdentifier = routeIdentifier;
- }
-
- public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
- return routeIdentifier;
- }
+ public static final class UpdateRemoteEndpoints {
+ private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
- @Override
- public String toString() {
- return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
+ UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
+ this.endpoints = ImmutableMap.copyOf(endpoints);
}
- }
- public static class FindRoutersReply {
- final List<Pair<ActorRef, Long>> routerWithUpdateTime;
-
- public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
- Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
- this.routerWithUpdateTime = routerWithUpdateTime;
- }
-
- public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
- return routerWithUpdateTime;
- }
-
- @Override
- public String toString() {
- return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
+ public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
+ return endpoints;
}
}
}
package org.opendaylight.controller.remote.rpc.registry.gossip;
public interface Bucket<T extends Copier<T>> {
- Long getVersion();
+ long getVersion();
T getData();
}
import java.io.Serializable;
-public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
+public final class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
private static final long serialVersionUID = 294779770032719196L;
private Long version = System.currentTimeMillis();
private T data;
- public BucketImpl() {
- }
-
- public BucketImpl(T data) {
+ public BucketImpl(final T data) {
this.data = data;
}
- public BucketImpl(Bucket<T> other) {
+ public BucketImpl(final Bucket<T> other) {
this.version = other.getVersion();
this.data = other.getData();
}
- public void setData(T data) {
+ public void setData(final T data) {
this.data = data;
this.version = System.currentTimeMillis() + 1;
}
@Override
- public Long getVersion() {
- return version;
+ public long getVersion() {
+ return version.longValue();
}
@Override
import akka.actor.ActorRef;
import akka.actor.ActorRefProvider;
import akka.actor.Address;
-import akka.actor.Props;
import akka.cluster.ClusterActorRefProvider;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
import org.opendaylight.controller.utils.ConditionalProbe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A store that syncs its data across nodes in the cluster.
*
*/
public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
-
- private static final Long NO_VERSION = -1L;
-
- protected final Logger log = LoggerFactory.getLogger(getClass());
-
/**
* Bucket owned by the node.
*/
- private final BucketImpl<T> localBucket = new BucketImpl<>();
+ private final BucketImpl<T> localBucket;
/**
- * Buckets ownded by other known nodes in the cluster.
+ * Buckets owned by other known nodes in the cluster.
*/
private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
*/
private Address selfAddress;
+ // FIXME: should be part of test-specific subclass
private ConditionalProbe probe;
private final RemoteRpcProviderConfig config;
- public BucketStore(RemoteRpcProviderConfig config) {
+ public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
this.config = Preconditions.checkNotNull(config);
+ this.localBucket = new BucketImpl<>(initialData);
}
@Override
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- if ( provider instanceof ClusterActorRefProvider) {
- getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper");
+ if (provider instanceof ClusterActorRefProvider) {
+ getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
}
}
@SuppressWarnings("unchecked")
@Override
- protected void handleReceive(Object message) throws Exception {
+ protected void handleReceive(final Object message) throws Exception {
if (probe != null) {
probe.tell(message, getSelf());
}
- if (message instanceof ConditionalProbe) {
- // The ConditionalProbe is only used for unit tests.
- log.info("Received probe {} {}", getSelf(), message);
- probe = (ConditionalProbe) message;
- // Send back any message to tell the caller we got the probe.
- getSender().tell("Got it", getSelf());
- } else if (message instanceof GetAllBuckets) {
- receiveGetAllBuckets();
- } else if (message instanceof GetBucketsByMembers) {
+ if (message instanceof GetBucketsByMembers) {
receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
} else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
} else if (message instanceof UpdateRemoteBuckets) {
receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
+ } else if (message instanceof RemoveRemoteBucket) {
+ removeBucket(((RemoveRemoteBucket) message).getAddress());
+ } else if (message instanceof GetAllBuckets) {
+ // GetAllBuckets is used only for unit tests.
+ receiveGetAllBuckets();
+ } else if (message instanceof ConditionalProbe) {
+ // The ConditionalProbe is only used for unit tests.
+ LOG.info("Received probe {} {}", getSelf(), message);
+ probe = (ConditionalProbe) message;
+ // Send back any message to tell the caller we got the probe.
+ getSender().tell("Got it", getSelf());
} else {
- log.debug("Unhandled message [{}]", message);
+ LOG.debug("Unhandled message [{}]", message);
unhandled(message);
}
}
*
* @param members requested members
*/
- void receiveGetBucketsByMembers(Set<Address> members) {
+ void receiveGetBucketsByMembers(final Set<Address> members) {
final ActorRef sender = getSender();
Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
* @param members requested members
* @return buckets for requested members
*/
- Map<Address, Bucket<T>> getBucketsByMembers(Set<Address> members) {
+ Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
Map<Address, Bucket<T>> buckets = new HashMap<>();
//first add the local bucket if asked
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
- void receiveUpdateRemoteBuckets(Map<Address, Bucket<T>> receivedBuckets) {
- log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
+ void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
+ LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
if (receivedBuckets == null || receivedBuckets.isEmpty()) {
- return; //nothing to do
+ //nothing to do
+ return;
}
- //Remote cant update self's bucket
- receivedBuckets.remove(selfAddress);
-
- for (Map.Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
-
- Long localVersion = versions.get(entry.getKey());
- if (localVersion == null) {
- localVersion = NO_VERSION;
+ final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
+ for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
+ if (selfAddress.equals(entry.getKey())) {
+ // Remote cannot update our bucket
+ continue;
}
- Bucket<T> receivedBucket = entry.getValue();
-
+ final Bucket<T> receivedBucket = entry.getValue();
if (receivedBucket == null) {
+ LOG.debug("Ignoring null bucket from {}", entry.getKey());
continue;
}
- Long remoteVersion = receivedBucket.getVersion();
- if (remoteVersion == null) {
- remoteVersion = NO_VERSION;
+ // update only if remote version is newer
+ final long remoteVersion = receivedBucket.getVersion();
+ final Long localVersion = versions.get(entry.getKey());
+ if (localVersion != null && remoteVersion <= localVersion.longValue()) {
+ LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion,
+ remoteVersion);
+ continue;
}
- //update only if remote version is newer
- if ( remoteVersion.longValue() > localVersion.longValue() ) {
- remoteBuckets.put(entry.getKey(), receivedBucket);
- versions.put(entry.getKey(), remoteVersion);
- }
+ newBuckets.put(entry.getKey(), receivedBucket);
+ remoteBuckets.put(entry.getKey(), receivedBucket);
+ versions.put(entry.getKey(), remoteVersion);
+ LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
}
- log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+
+ onBucketsUpdated(newBuckets);
+ }
+
+ private void removeBucket(final Address address) {
+ final Bucket<T> bucket = remoteBuckets.remove(address);
+ if (bucket != null) {
+ onBucketRemoved(address, bucket);
+ }
+ }
- onBucketsUpdated();
+ /**
+ * Callback to subclasses invoked when a bucket is removed.
+ *
+ * @param address Remote address
+ * @param bucket Bucket removed
+ */
+ protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
+ // Default noop
}
- protected void onBucketsUpdated() {
+ /**
+ * Callback to subclasses invoked when the set of remote buckets is updated.
+ *
+ * @param newBuckets Map of address to new bucket. Never null, but can be empty.
+ */
+ protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
+ // Default noop
}
public BucketImpl<T> getLocalBucket() {
return localBucket;
}
- protected void updateLocalBucket(T data) {
+ protected void updateLocalBucket(final T data) {
localBucket.setData(data);
versions.put(selfAddress, localBucket.getVersion());
}
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
+import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
* for update.
*/
public class Gossiper extends AbstractUntypedActorWithMetering {
+ private final boolean autoStartGossipTicks;
+ private final RemoteRpcProviderConfig config;
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private Cluster cluster;
+ /**
+ * All known cluster members.
+ */
+ private final List<Address> clusterMembers = new ArrayList<>();
/**
* ActorSystem's address for the current cluster node.
*/
private Address selfAddress;
- /**
- * All known cluster members.
- */
- private List<Address> clusterMembers = new ArrayList<>();
+ private Cluster cluster;
private Cancellable gossipTask;
- private Boolean autoStartGossipTicks = true;
+ Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+ this.config = Preconditions.checkNotNull(config);
+ this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
+ }
- private final RemoteRpcProviderConfig config;
+ Gossiper(final RemoteRpcProviderConfig config) {
+ this(config, Boolean.TRUE);
+ }
- public Gossiper(RemoteRpcProviderConfig config) {
- this.config = Preconditions.checkNotNull(config);
+ public static Props props(final RemoteRpcProviderConfig config) {
+ return Props.create(Gossiper.class, config);
}
- /**
- * Constructor for testing.
- *
- * @param autoStartGossipTicks used for turning off gossip ticks during testing.
- * Gossip tick can be manually sent.
- */
- @VisibleForTesting
- public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
- this(config);
- this.autoStartGossipTicks = autoStartGossipTicks;
+ static Props testProps(final RemoteRpcProviderConfig config) {
+ return Props.create(Gossiper.class, config, Boolean.FALSE);
}
@Override
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- if ( provider instanceof ClusterActorRefProvider ) {
+ if (provider instanceof ClusterActorRefProvider ) {
cluster = Cluster.get(getContext().system());
cluster.subscribe(getSelf(),
ClusterEvent.initialStateAsEvents(),
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
config.getGossipTickInterval(), //interval
- getSelf(), //target
- new Messages.GossiperMessages.GossipTick(), //message
- getContext().dispatcher(), //execution context
- getSelf() //sender
+ getSelf(), //target
+ new Messages.GossiperMessages.GossipTick(), //message
+ getContext().dispatcher(), //execution context
+ getSelf() //sender
);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- protected void handleReceive(Object message) throws Exception {
+ protected void handleReceive(final Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (message instanceof GossipTick) {
} else if (message instanceof ClusterEvent.MemberRemoved) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
- } else if ( message instanceof ClusterEvent.UnreachableMember) {
+ } else if (message instanceof ClusterEvent.UnreachableMember) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
} else {
*
* @param member who went down
*/
- void receiveMemberRemoveOrUnreachable(Member member) {
+ private void receiveMemberRemoveOrUnreachable(final Member member) {
//if its self, then stop itself
if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
}
clusterMembers.remove(member.address());
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+
+ getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
}
/**
*
* @param member the member to add
*/
- void receiveMemberUpOrReachable(final Member member) {
-
+ private void receiveMemberUpOrReachable(final Member member) {
+ //ignore up notification for self
if (selfAddress.equals(member.address())) {
- //ignore up notification for self
return;
}
clusterMembers.add(member.address());
}
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
* 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
+ @VisibleForTesting
void receiveGossipTick() {
- if (clusterMembers.size() == 0) {
- return; //no members to send gossip status to
- }
-
- Address remoteMemberToGossipTo;
-
- if (clusterMembers.size() == 1) {
- remoteMemberToGossipTo = clusterMembers.get(0);
- } else {
- Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
- remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+ final Address remoteMemberToGossipTo;
+ switch (clusterMembers.size()) {
+ case 0:
+ //no members to send gossip status to
+ return;
+ case 1:
+ remoteMemberToGossipTo = clusterMembers.get(0);
+ break;
+ default:
+ final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+ remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+ break;
}
- log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
+ LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
*
* @param status bucket versions from a remote member
*/
- void receiveGossipStatus(GossipStatus status) {
+ @VisibleForTesting
+ void receiveGossipStatus(final GossipStatus status) {
//Don't accept messages from non-members
if (!clusterMembers.contains(status.from())) {
return;
Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
-
}
/**
*
* @param envelope contains buckets from a remote gossiper
*/
- <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
+ @VisibleForTesting
+ <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- if (log.isTraceEnabled()) {
- log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
- envelope.to());
- }
+ LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
return;
}
updateRemoteBuckets(envelope.getBuckets());
-
}
/**
*
* @param buckets map of Buckets to update
*/
- <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
- UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
- getContext().parent().tell(updateRemoteBuckets, getSelf());
+ @VisibleForTesting
+ <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
+ getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
}
/**
*
* @param remoteActorSystemAddress remote gossiper to send to
*/
- void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
+ @VisibleForTesting
+ void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
//Get local status from bucket store and send to remote
Future<Object> futureReply =
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
- log.trace("Sending bucket versions to [{}]", remoteRef);
+ LOG.trace("Sending bucket versions to [{}]", remoteRef);
futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
-
}
/**
* @param remote remote gossiper to send versions to
* @param localVersions bucket versions received from local store
*/
- void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
+ void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
GossipStatus status = new GossipStatus(selfAddress, localVersions);
remote.tell(status, getSelf());
}
- void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
+ void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
GossipStatus status = new GossipStatus(selfAddress, localVersions);
remote.tell(status, getSelf());
return new Mapper<Object, Void>() {
@Override
- public Void apply(Object replyMessage) {
+ public Void apply(final Object replyMessage) {
if (replyMessage instanceof GetBucketVersionsReply) {
GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
Map<Address, Long> localVersions = reply.getVersions();
return new Mapper<Object, Void>() {
@Override
- public Void apply(Object replyMessage) {
+ public Void apply(final Object replyMessage) {
if (replyMessage instanceof GetBucketVersionsReply) {
GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
Map<Address, Long> localVersions = reply.getVersions();
Long remoteVersion = entry.getValue();
Long localVersion = localVersions.get(address);
if (localVersion == null || remoteVersion == null) {
- continue; //this condition is taken care of by above diffs
+ //this condition is taken care of by above diffs
+ continue;
}
if (localVersion < remoteVersion) {
}
if (!localIsOlder.isEmpty()) {
- sendGossipStatusTo(sender, localVersions );
+ sendGossipStatusTo(sender, localVersions);
}
if (!localIsNewer.isEmpty()) {
- sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+ //send newer buckets to remote
+ sendGossipTo(sender, localIsNewer);
}
-
}
return null;
}
return new Mapper<Object, Void>() {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- public Void apply(Object msg) {
+ public Void apply(final Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- log.trace("Buckets to send from {}: {}", selfAddress, buckets);
+ LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}
///
///Getter Setters
///
- List<Address> getClusterMembers() {
- return clusterMembers;
- }
- void setClusterMembers(List<Address> clusterMembers) {
- this.clusterMembers = clusterMembers;
- }
-
- Address getSelfAddress() {
- return selfAddress;
- }
-
- void setSelfAddress(Address selfAddress) {
- this.selfAddress = selfAddress;
+ @VisibleForTesting
+ void setClusterMembers(final Address... members) {
+ clusterMembers.clear();
+ clusterMembers.addAll(Arrays.asList(members));
}
}
import akka.actor.Address;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
-
/**
* These messages are used by {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} and
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} actors.
public static class BucketStoreMessages {
- public static class GetAllBuckets implements Serializable {
+ public static final class GetAllBuckets implements Serializable {
private static final long serialVersionUID = 1L;
}
- public static class GetBucketsByMembers implements Serializable {
+ public static final class GetBucketsByMembers implements Serializable {
private static final long serialVersionUID = 1L;
private final Set<Address> members;
- public GetBucketsByMembers(Set<Address> members) {
+ public GetBucketsByMembers(final Set<Address> members) {
Preconditions.checkArgument(members != null, "members can not be null");
- this.members = members;
+ this.members = ImmutableSet.copyOf(members);
}
public Set<Address> getMembers() {
- return new HashSet<>(members);
+ return members;
}
}
private final Map<Address, Bucket<T>> buckets;
- public ContainsBuckets(Map<Address, Bucket<T>> buckets) {
+ protected ContainsBuckets(final Map<Address, Bucket<T>> buckets) {
Preconditions.checkArgument(buckets != null, "buckets can not be null");
- this.buckets = buckets;
+ this.buckets = ImmutableMap.copyOf(buckets);
}
- public Map<Address, Bucket<T>> getBuckets() {
- Map<Address, Bucket<T>> copy = new HashMap<>(buckets.size());
-
- for (Map.Entry<Address, Bucket<T>> entry : buckets.entrySet()) {
- //ignore null entries
- if ( entry.getKey() == null || entry.getValue() == null ) {
- continue;
- }
- copy.put(entry.getKey(), entry.getValue());
- }
- return copy;
+ public final Map<Address, Bucket<T>> getBuckets() {
+ return buckets;
}
}
- public static class GetAllBucketsReply<T extends Copier<T>> extends ContainsBuckets<T> {
+ public static final class GetAllBucketsReply<T extends Copier<T>> extends ContainsBuckets<T> {
private static final long serialVersionUID = 1L;
- public GetAllBucketsReply(Map<Address, Bucket<T>> buckets) {
+ public GetAllBucketsReply(final Map<Address, Bucket<T>> buckets) {
super(buckets);
}
}
- public static class GetBucketsByMembersReply<T extends Copier<T>> extends ContainsBuckets<T> {
+ public static final class GetBucketsByMembersReply<T extends Copier<T>> extends ContainsBuckets<T> {
private static final long serialVersionUID = 1L;
- public GetBucketsByMembersReply(Map<Address, Bucket<T>> buckets) {
+ public GetBucketsByMembersReply(final Map<Address, Bucket<T>> buckets) {
super(buckets);
}
}
- public static class GetBucketVersions implements Serializable {
+ public static final class GetBucketVersions implements Serializable {
private static final long serialVersionUID = 1L;
}
Map<Address, Long> versions;
- public ContainsBucketVersions(Map<Address, Long> versions) {
+ public ContainsBucketVersions(final Map<Address, Long> versions) {
Preconditions.checkArgument(versions != null, "versions can not be null or empty");
- this.versions = versions;
+ this.versions = ImmutableMap.copyOf(versions);
}
public Map<Address, Long> getVersions() {
- return Collections.unmodifiableMap(versions);
+ return versions;
}
-
}
- public static class GetBucketVersionsReply extends ContainsBucketVersions {
+ public static final class GetBucketVersionsReply extends ContainsBucketVersions {
private static final long serialVersionUID = 1L;
- public GetBucketVersionsReply(Map<Address, Long> versions) {
+ public GetBucketVersionsReply(final Map<Address, Long> versions) {
super(versions);
}
}
- public static class UpdateRemoteBuckets<T extends Copier<T>> extends ContainsBuckets<T> {
+ public static final class UpdateRemoteBuckets<T extends Copier<T>> extends ContainsBuckets<T> {
private static final long serialVersionUID = 1L;
- public UpdateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
+ public UpdateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
super(buckets);
}
}
+
+ /**
+ * Message sent from the gossiper to its parent, therefore not Serializable, requesting removal
+ * of a bucket corresponding to an address.
+ */
+ public static final class RemoveRemoteBucket {
+ private final Address address;
+
+ public RemoveRemoteBucket(final Address address) {
+ this.address = Preconditions.checkNotNull(address);
+ }
+
+ public Address getAddress() {
+ return address;
+ }
+ }
}
public static class GossiperMessages {
private final Address from;
- public GossipStatus(Address from, Map<Address, Long> versions) {
+ public GossipStatus(final Address from, final Map<Address, Long> versions) {
super(versions);
this.from = from;
}
private final Address from;
private final Address to;
- public GossipEnvelope(Address from, Address to, Map<Address, Bucket<T>> buckets) {
+ public GossipEnvelope(final Address from, final Address to, final Map<Address, Bucket<T>> buckets) {
super(buckets);
Preconditions.checkArgument(to != null, "Recipient of message must not be null");
this.to = to;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
-import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.mockito.Mockito;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
static RemoteRpcProviderConfig config1;
static RemoteRpcProviderConfig config2;
- protected ActorRef rpcBroker1;
+ protected ActorRef rpcInvoker1;
protected JavaTestKit rpcRegistry1Probe;
- protected ActorRef rpcBroker2;
+ protected ActorRef rpcInvoker2;
protected JavaTestKit rpcRegistry2Probe;
protected Broker.ProviderSession brokerSession;
protected SchemaContext schemaContext;
protected RemoteRpcImplementation remoteRpcImpl1;
protected RemoteRpcImplementation remoteRpcImpl2;
+
+ @Mock
protected DOMRpcService domRpcService1;
+ @Mock
protected DOMRpcService domRpcService2;
@BeforeClass
}
@Before
- public void setUp() throws Exception {
- final List<InputStream> sources = Collections.singletonList(
- AbstractRpcTest.this.getClass().getResourceAsStream("/test-rpc.yang"));
-
- try {
- schemaContext = YangParserTestUtils.parseYangStreams(sources);
- } catch (ReactorException e) {
- throw new RuntimeException("Unable to build schema context from " + sources, e);
- }
+ public void setUp() throws ReactorException {
+ schemaContext = YangParserTestUtils.parseYangResources(AbstractRpcTest.class, "/test-rpc.yang");
+
+ MockitoAnnotations.initMocks(this);
- domRpcService1 = Mockito.mock(DOMRpcService.class);
- domRpcService2 = Mockito.mock(DOMRpcService.class);
rpcRegistry1Probe = new JavaTestKit(node1);
- rpcBroker1 = node1.actorOf(RpcBroker.props(domRpcService1));
+ rpcInvoker1 = node1.actorOf(RpcInvoker.props(domRpcService1));
rpcRegistry2Probe = new JavaTestKit(node2);
- rpcBroker2 = node2.actorOf(RpcBroker.props(domRpcService2));
- remoteRpcImpl1 = new RemoteRpcImplementation(rpcRegistry1Probe.getRef(), config1);
- remoteRpcImpl2 = new RemoteRpcImplementation(rpcRegistry2Probe.getRef(), config2);
-
+ rpcInvoker2 = node2.actorOf(RpcInvoker.props(domRpcService2));
+ remoteRpcImpl1 = new RemoteRpcImplementation(rpcInvoker2, config1);
+ remoteRpcImpl2 = new RemoteRpcImplementation(rpcInvoker1, config2);
}
static void assertRpcErrorEquals(final RpcError rpcError, final ErrorSeverity severity,
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.japi.Pair;
-import akka.testkit.JavaTestKit;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
/**
* Unit tests for RemoteRpcImplementation.
*/
public class RemoteRpcImplementationTest extends AbstractRpcTest {
-
-
- @Test(expected = DOMRpcImplementationNotAvailableException.class)
- public void testInvokeRpcWithNoRemoteActor() throws Exception {
- final ContainerNode input = makeRPCInput("foo");
- final CheckedFuture<DOMRpcResult, DOMRpcException> failedFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, input);
- rpcRegistry1Probe.expectMsgClass(JavaTestKit.duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
- rpcRegistry1Probe
- .reply(new RpcRegistry.Messages.FindRoutersReply(Collections.<Pair<ActorRef, Long>>emptyList()));
- failedFuture.checkedGet(5, TimeUnit.SECONDS);
- }
-
-
/**
* This test method invokes and executes the remote rpc.
*/
final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
- final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
- assertEquals("getType", TEST_RPC, routeIdentifier.getType());
- assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
- rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
- rpcBroker2, 200L))));
final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
assertEquals(rpcOutput, result.getResult());
final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
remoteRpcImpl1.invokeRpc(TEST_RPC_ID, null);
assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
- final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
- assertEquals("getType", TEST_RPC, routeIdentifier.getType());
- assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
- rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
- rpcBroker2, 200L))));
final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
assertEquals(rpcOutput, result.getResult());
}
-
/**
* This test method invokes and executes the remote rpc.
*/
final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
- final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
- assertEquals("getType", TEST_RPC, routeIdentifier.getType());
- assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
- rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
- rpcBroker2, 200L))));
final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
assertNull(result.getResult());
}
-
/**
* This test method invokes and executes the remote rpc.
*/
final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
- final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
- assertEquals("getType", TEST_RPC, routeIdentifier.getType());
- assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
- rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
- rpcBroker2, 200L))));
frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
}
final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
- final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
- assertEquals("getType", TEST_RPC, routeIdentifier.getType());
- assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
frontEndFuture.checkedGet(20, TimeUnit.SECONDS);
}
@Test(expected = DOMRpcException.class)
public void testInvokeRpcWithLookupException() throws Exception {
final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
- final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
- remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
- assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
- final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
- assertEquals("getType", TEST_RPC, routeIdentifier.getType());
- assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
- rpcRegistry1Probe.reply( new Status.Failure(new RuntimeException("test")));
- frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
- }
- /**
- * This test method invokes and executes the remote rpc.
- */
- @Test(expected = DOMRpcImplementationNotAvailableException.class)
- public void testInvokeRpcWithLoopException() throws Exception {
- final NormalizedNode<?, ?> invokeRpcInput = RemoteRpcInput.from(makeRPCInput("foo"));
+ doThrow(new RuntimeException("test")).when(domRpcService2).invokeRpc(any(SchemaPath.class),
+ any(NormalizedNode.class));
+
final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+ assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.remote.rpc;
-
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.model.SchemaService;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
try (final RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(DOMRpcProviderService.class),
new RemoteRpcProviderConfig(system.settings().config()))) {
final Broker.ProviderSession session = mock(Broker.ProviderSession.class);
- final SchemaService schemaService = mock(SchemaService.class);
- when(schemaService.getGlobalContext()).thenReturn(mock(SchemaContext.class));
- when(session.getService(SchemaService.class)).thenReturn(schemaService);
when(session.getService(DOMRpcService.class)).thenReturn(mock(DOMRpcService.class));
rpcProvider.onSessionInitiated(session);
final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
- rpcBroker1.tell(executeMsg, getRef());
+ rpcInvoker1.tell(executeMsg, getRef());
final RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
- rpcBroker1.tell(executeMsg, getRef());
+ rpcInvoker1.tell(executeMsg, getRef());
final Failure rpcResponse = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
-import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import akka.cluster.UniqueAddress;
-import akka.japi.Pair;
import akka.testkit.JavaTestKit;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
private static ActorSystem node2;
private static ActorSystem node3;
+ private JavaTestKit invoker1;
+ private JavaTestKit invoker2;
+ private JavaTestKit invoker3;
+ private JavaTestKit registrar1;
+ private JavaTestKit registrar2;
+ private JavaTestKit registrar3;
private ActorRef registry1;
private ActorRef registry2;
private ActorRef registry3;
waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
}
- static void waitForMembersUp(ActorSystem node, UniqueAddress... addresses) {
+ static void waitForMembersUp(final ActorSystem node, final UniqueAddress... addresses) {
Set<UniqueAddress> otherMembersSet = Sets.newHashSet(addresses);
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
@Before
public void setup() {
- registry1 = node1.actorOf(Props.create(RpcRegistry.class, config(node1)));
- registry2 = node2.actorOf(Props.create(RpcRegistry.class, config(node2)));
- registry3 = node3.actorOf(Props.create(RpcRegistry.class, config(node3)));
+ invoker1 = new JavaTestKit(node1);
+ registrar1 = new JavaTestKit(node1);
+ registry1 = node1.actorOf(RpcRegistry.props(config(node1), invoker1.getRef(), registrar1.getRef()));
+ invoker2 = new JavaTestKit(node2);
+ registrar2 = new JavaTestKit(node2);
+ registry2 = node2.actorOf(RpcRegistry.props(config(node2), invoker2.getRef(), registrar2.getRef()));
+ invoker3 = new JavaTestKit(node3);
+ registrar3 = new JavaTestKit(node3);
+ registry3 = node3.actorOf(RpcRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef()));
}
- private RemoteRpcProviderConfig config(ActorSystem node) {
+ private static RemoteRpcProviderConfig config(final ActorSystem node) {
return new RemoteRpcProviderConfig(node.settings().config());
}
if (registry3 != null) {
node3.stop(registry3);
}
+
+ if (invoker1 != null) {
+ node1.stop(invoker1.getRef());
+ }
+ if (invoker2 != null) {
+ node2.stop(invoker2.getRef());
+ }
+ if (invoker3 != null) {
+ node3.stop(invoker3.getRef());
+ }
+
+ if (registrar1 != null) {
+ node1.stop(registrar1.getRef());
+ }
+ if (registrar2 != null) {
+ node2.stop(registrar2.getRef());
+ }
+ if (registrar3 != null) {
+ node3.stop(registrar3.getRef());
+ }
}
/**
public void testAddRemoveRpcOnSameNode() throws Exception {
LOG.info("testAddRemoveRpcOnSameNode starting");
- final JavaTestKit mockBroker = new JavaTestKit(node1);
-
Address nodeAddress = node1.provider().getDefaultAddress();
// Add rpc on node 1
- registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
- registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef());
+ registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender());
// Bucket store should get an update bucket message. Updated bucket contains added rpc.
+ final JavaTestKit testKit = new JavaTestKit(node1);
- Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry1, mockBroker, nodeAddress);
+ Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry1, testKit, nodeAddress);
verifyBucket(buckets.get(nodeAddress), addedRouteIds);
- Map<Address, Long> versions = retrieveVersions(registry1, mockBroker);
- Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(),
+ Map<Address, Long> versions = retrieveVersions(registry1, testKit);
+ Assert.assertEquals("Version for bucket " + nodeAddress, (Long) buckets.get(nodeAddress).getVersion(),
versions.get(nodeAddress));
// Now remove rpc
- registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef());
+ registry1.tell(new RemoveRoutes(addedRouteIds), ActorRef.noSender());
// Bucket store should get an update bucket message. Rpc is removed in the updated bucket
- verifyEmptyBucket(mockBroker, registry1, nodeAddress);
+ verifyEmptyBucket(testKit, registry1, nodeAddress);
LOG.info("testAddRemoveRpcOnSameNode ending");
LOG.info("testRpcAddRemoveInCluster starting");
- final JavaTestKit mockBroker1 = new JavaTestKit(node1);
- final JavaTestKit mockBroker2 = new JavaTestKit(node2);
-
List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
Address node1Address = node1.provider().getDefaultAddress();
// Add rpc on node 1
- registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef());
+ registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender());
// Bucket store on node2 should get a message to update its local copy of remote buckets
+ final JavaTestKit testKit = new JavaTestKit(node2);
- Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry2, mockBroker2, node1Address);
+ Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry2, testKit, node1Address);
verifyBucket(buckets.get(node1Address), addedRouteIds);
// Now remove
- registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef());
+ registry1.tell(new RemoveRoutes(addedRouteIds), ActorRef.noSender());
// Bucket store on node2 should get a message to update its local copy of remote buckets.
// Wait for the bucket for node1 to be empty.
- verifyEmptyBucket(mockBroker2, registry2, node1Address);
+ verifyEmptyBucket(testKit, registry2, node1Address);
LOG.info("testRpcAddRemoveInCluster ending");
}
- private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address)
+ private void verifyEmptyBucket(final JavaTestKit testKit, final ActorRef registry, final Address address)
throws AssertionError {
Map<Address, Bucket<RoutingTable>> buckets;
int numTries = 0;
*/
@Test
public void testRpcAddedOnMultiNodes() throws Exception {
-
- final JavaTestKit mockBroker1 = new JavaTestKit(node1);
- final JavaTestKit mockBroker2 = new JavaTestKit(node2);
- final JavaTestKit mockBroker3 = new JavaTestKit(node3);
-
- registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
+ final JavaTestKit testKit = new JavaTestKit(node3);
// Add rpc on node 1
List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds1 = createRouteIds();
- registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef());
+ registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender());
+
+ final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+ UpdateRemoteEndpoints.class);
// Add rpc on node 2
List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds2 = createRouteIds();
- registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
- registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef());
+ registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender());
- Address node1Address = node1.provider().getDefaultAddress();
+ final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+ UpdateRemoteEndpoints.class);
Address node2Address = node2.provider().getDefaultAddress();
+ Address node1Address = node1.provider().getDefaultAddress();
- Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry3, mockBroker3, node1Address,
+ Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry3, testKit, node1Address,
node2Address);
verifyBucket(buckets.get(node1Address), addedRouteIds1);
verifyBucket(buckets.get(node2Address), addedRouteIds2);
- Map<Address, Long> versions = retrieveVersions(registry3, mockBroker3);
- Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(),
+ Map<Address, Long> versions = retrieveVersions(registry3, testKit);
+ Assert.assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(),
versions.get(node1Address));
- Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(),
+ Assert.assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(),
versions.get(node2Address));
- RouteIdentifier<?, ?, ?> routeID = addedRouteIds1.get(0);
- registry3.tell(new FindRouters(routeID), mockBroker3.getRef());
+ assertEndpoints(req1, node1Address, invoker1);
+ assertEndpoints(req2, node2Address, invoker2);
+
+ }
+
+ private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address,
+ final JavaTestKit invoker) {
+ final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = msg.getEndpoints();
+ Assert.assertEquals(1, endpoints.size());
- FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
- FindRoutersReply.class);
+ final Optional<RemoteRpcEndpoint> maybeEndpoint = endpoints.get(address);
+ Assert.assertNotNull(maybeEndpoint);
+ Assert.assertTrue(maybeEndpoint.isPresent());
- List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
- Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size());
+ final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
+ final ActorRef router = endpoint.getRouter();
+ Assert.assertNotNull(router);
- respList.get(0).first().tell("hello", ActorRef.noSender());
- mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello");
+ router.tell("hello", ActorRef.noSender());
+ final String s = invoker.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), String.class);
+ Assert.assertEquals("hello", s);
}
- private Map<Address, Long> retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) {
+ private static Map<Address, Long> retrieveVersions(final ActorRef bucketStore, final JavaTestKit testKit) {
bucketStore.tell(new GetBucketVersions(), testKit.getRef());
GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
GetBucketVersionsReply.class);
return reply.getVersions();
}
- private void verifyBucket(Bucket<RoutingTable> bucket, List<RouteIdentifier<?, ?, ?>> expRouteIds) {
+ private static void verifyBucket(final Bucket<RoutingTable> bucket,
+ final List<RouteIdentifier<?, ?, ?>> expRouteIds) {
RoutingTable table = bucket.getData();
Assert.assertNotNull("Bucket RoutingTable is null", table);
for (RouteIdentifier<?, ?, ?> r : expRouteIds) {
Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size());
}
- private Map<Address, Bucket<RoutingTable>> retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit,
- Address... addresses) {
+ private static Map<Address, Bucket<RoutingTable>> retrieveBuckets(final ActorRef bucketStore,
+ final JavaTestKit testKit, final Address... addresses) {
int numTries = 0;
while (true) {
bucketStore.tell(new GetAllBuckets(), testKit.getRef());
public void testAddRoutesConcurrency() throws Exception {
final JavaTestKit testKit = new JavaTestKit(node1);
- registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender());
-
final int nRoutes = 500;
final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
for (int i = 0; i < nRoutes; i++) {
private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++);
- List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>(1);
routeIds.add(new RouteIdentifierImpl(null, type, null));
return routeIds;
}
-
- @Test
- public void testFindRoutersNotPresentInitially() throws Exception {
-
- final JavaTestKit mockBroker1 = new JavaTestKit(node1);
- final JavaTestKit mockBroker2 = new JavaTestKit(node2);
-
- registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
-
- List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = createRouteIds();
- routeIds.addAll(createRouteIds());
-
- JavaTestKit replyKit1 = new JavaTestKit(node1);
- registry1.tell(new FindRouters(routeIds.get(0)), replyKit1.getRef());
- JavaTestKit replyKit2 = new JavaTestKit(node1);
- registry1.tell(new FindRouters(routeIds.get(1)), replyKit2.getRef());
-
- registry2.tell(new AddOrUpdateRoutes(routeIds), mockBroker2.getRef());
-
- FindRoutersReply reply = replyKit1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
- FindRoutersReply.class);
- Assert.assertEquals("getRouterWithUpdateTime size", 1, reply.getRouterWithUpdateTime().size());
-
- reply = replyKit2.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
- FindRoutersReply.class);
- Assert.assertEquals("getRouterWithUpdateTime size", 1, reply.getRouterWithUpdateTime().size());
- }
-
- @Test
- public void testFindRoutersNonExistent() throws Exception {
-
- final JavaTestKit mockBroker1 = new JavaTestKit(node1);
-
- registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-
- List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = createRouteIds();
-
- registry1.tell(new FindRouters(routeIds.get(0)), mockBroker1.getRef());
-
- FindRoutersReply reply = mockBroker1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
- FindRoutersReply.class);
- List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
- Assert.assertEquals("getRouterWithUpdateTime size", 0, respList.size());
- }
}
final BucketStore<T> store = createStore();
Address localAddress = system.provider().getDefaultAddress();
- Bucket<T> localBucket = new BucketImpl<>();
+ Bucket<T> localBucket = new BucketImpl<>(new T());
Address a1 = new Address("tcp", "system1");
Address a2 = new Address("tcp", "system2");
Address a3 = new Address("tcp", "system3");
- Bucket<T> b1 = new BucketImpl<>();
- Bucket<T> b2 = new BucketImpl<>();
- Bucket<T> b3 = new BucketImpl<>();
+ Bucket<T> b1 = new BucketImpl<>(new T());
+ Bucket<T> b2 = new BucketImpl<>(new T());
+ Bucket<T> b3 = new BucketImpl<>(new T());
Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
remoteBuckets.put(a1, b1);
//Add a new remote bucket
Address a4 = new Address("tcp", "system4");
- Bucket<T> b4 = new BucketImpl<>();
+ Bucket<T> b4 = new BucketImpl<>(new T());
remoteBuckets.clear();
remoteBuckets.put(a4, b4);
store.receiveUpdateRemoteBuckets(remoteBuckets);
Assert.assertTrue(remoteBucketsInStore.size() == 4);
//Update a bucket
- Bucket<T> b3New = new BucketImpl<>();
+ Bucket<T> b3New = new BucketImpl<>(new T());
remoteBuckets.clear();
remoteBuckets.put(a3, b3New);
remoteBuckets.put(a1, null);
//versions map contains versions for all remote buckets (4).
Map<Address, Long> versionsInStore = store.getVersions();
Assert.assertEquals(4, versionsInStore.size());
- Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1));
- Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2));
- Assert.assertEquals(b3New.getVersion(), versionsInStore.get(a3));
- Assert.assertEquals(b4.getVersion(), versionsInStore.get(a4));
+ Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1));
+ Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2));
+ Assert.assertEquals((Long)b3New.getVersion(), versionsInStore.get(a3));
+ Assert.assertEquals((Long)b4.getVersion(), versionsInStore.get(a4));
//Send older version of bucket
remoteBuckets.clear();
//Should NOT update a3
remoteBucketsInStore = store.getRemoteBuckets();
b3InStore = remoteBucketsInStore.get(a3);
- Assert.assertTrue(b3InStore.getVersion().longValue() == b3New.getVersion().longValue());
+ Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion());
}
* @return instance of BucketStore class
*/
private static BucketStore<T> createStore() {
- final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()));
+ final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()),
+ new T());
final TestActorRef<BucketStore<T>> testRef = TestActorRef.create(system, props, "testStore");
return testRef.underlyingActor();
}
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.typesafe.config.ConfigFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
import org.junit.After;
import org.junit.AfterClass;
@Test
public void testReceiveGossipTick_WhenNoRemoteMemberShouldIgnore() {
-
- mockGossiper.setClusterMembers(Collections.<Address>emptyList());
+ mockGossiper.setClusterMembers();
doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
mockGossiper.receiveGossipTick();
verify(mockGossiper, times(0)).getLocalStatusAndSendTo(any(Address.class));
@Test
public void testReceiveGossipTick_WhenRemoteMemberExistsShouldSendStatus() {
- List<Address> members = new ArrayList<>();
- Address remote = new Address("tcp", "member");
- members.add(remote);
-
- mockGossiper.setClusterMembers(members);
+ mockGossiper.setClusterMembers(new Address("tcp", "member"));
doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
mockGossiper.receiveGossipTick();
verify(mockGossiper, times(1)).getLocalStatusAndSendTo(any(Address.class));
GossipStatus remoteStatus = new GossipStatus(nonMember, mock(Map.class));
//add a member
- List<Address> members = new ArrayList<>();
- members.add(new Address("tcp", "member"));
-
- mockGossiper.setClusterMembers(members);
+ mockGossiper.setClusterMembers(new Address("tcp", "member"));
mockGossiper.receiveGossipStatus(remoteStatus);
verify(mockGossiper, times(0)).getSender();
}
* @return instance of Gossiper class
*/
private static Gossiper createGossiper() {
- final Props props = Props.create(Gossiper.class, false,
- new RemoteRpcProviderConfig(system.settings().config()));
+ final Props props = Gossiper.testProps(new RemoteRpcProviderConfig(system.settings().config()));
final TestActorRef<Gossiper> testRef = TestActorRef.create(system, props, "testGossiper");
return testRef.underlyingActor();