From: Harman Singh Date: Tue, 5 Aug 2014 21:06:24 +0000 (-0700) Subject: Adding more unit tests for remote rpc connector and Integrating routing table X-Git-Tag: release/helium~282^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=b896a5f4bd63df605ecb886deafc19416171b013;ds=inline Adding more unit tests for remote rpc connector and Integrating routing table Updated with code review comments Change-Id: Ie88870e6cf6e9446f352e5b931e5063f5f0ec012 Signed-off-by: Harman Singh --- diff --git a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml index 674c5bf5a5..38ec5f5ac2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml @@ -200,6 +200,9 @@ + + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java index 1f1a0f5cc6..da0d62897a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java @@ -16,4 +16,5 @@ public class ActorConstants { public static final String RPC_BROKER_PATH= "/user/rpc/rpc-broker"; public static final String RPC_REGISTRY_PATH = "/user/rpc/rpc-registry"; + public static final String RPC_MANAGER_PATH = "/user/rpc"; } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java index d384144f4f..02e2d12015 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -4,7 +4,6 @@ import akka.actor.ActorRef; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.remote.rpc.messages.ErrorResponse; -import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; import org.opendaylight.controller.remote.rpc.utils.ActorUtil; @@ -36,7 +35,7 @@ public class RemoteRpcImplementation implements RpcImplementation, @Override public ListenableFuture> invokeRpc(QName rpc, YangInstanceIdentifier identifier, CompositeNode input) { - InvokeRoutedRpc rpcMsg = new InvokeRoutedRpc(rpc, identifier, input); + InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input); return executeMsg(rpcMsg); } @@ -49,7 +48,7 @@ public class RemoteRpcImplementation implements RpcImplementation, @Override public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { - InvokeRpc rpcMsg = new InvokeRpc(rpc, input); + InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input); return executeMsg(rpcMsg); } @@ -57,7 +56,7 @@ public class RemoteRpcImplementation implements RpcImplementation, ListenableFuture> listenableFuture = null; try { - Object response = ActorUtil.executeLocalOperation(rpcBroker, rpcMsg, ActorUtil.ASK_DURATION, ActorUtil.AWAIT_DURATION); + Object response = ActorUtil.executeOperation(rpcBroker, rpcMsg, ActorUtil.ASK_DURATION, ActorUtil.AWAIT_DURATION); if(response instanceof RpcResponse) { RpcResponse rpcResponse = (RpcResponse) response; @@ -74,7 +73,7 @@ public class RemoteRpcImplementation implements RpcImplementation, } } catch (Exception e) { - LOG.error("Error occurred while invoking RPC actor {}", e.toString()); + LOG.error("Error occurred while invoking RPC actor {}", e); final RpcResultBuilder failed = RpcResultBuilder.failed(); failed.withError(null, null, e.getMessage(), null, null, e.getCause()); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java index ac50b8fe5b..d088f2284d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java @@ -12,8 +12,6 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorRef; import akka.actor.ActorSystem; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; -import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper; -import org.opendaylight.controller.remote.rpc.registry.ClusterWrapperImpl; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.Provider; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; @@ -64,11 +62,10 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext private void start() { LOG.info("Starting all rpc listeners and actors."); // Create actor to handle and sync routing table in cluster - ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem); SchemaService schemaService = brokerSession.getService(SchemaService.class); schemaContext = schemaService.getGlobalContext(); - rpcManager = actorSystem.actorOf(RpcManager.props(clusterWrapper, schemaContext, brokerSession, rpcProvisionRegistry), ActorConstants.RPC_MANAGER); + rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry), ActorConstants.RPC_MANAGER); LOG.debug("Rpc actors are created."); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java index a6eeac0270..98cf6a329f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java @@ -13,15 +13,14 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.core.api.RpcRoutingContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -29,23 +28,24 @@ import java.util.Set; public class RoutedRpcListener implements RouteChangeListener{ private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class); private final ActorRef rpcRegistry; - private final String actorPath; - public RoutedRpcListener(ActorRef rpcRegistry, String actorPath) { + public RoutedRpcListener(ActorRef rpcRegistry) { Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null"); - Preconditions.checkNotNull(actorPath, "actor path of rpc broker on current node should not be null"); this.rpcRegistry = rpcRegistry; - this.actorPath = actorPath; } @Override public void onRouteChange(RouteChange routeChange) { Map> announcements = routeChange.getAnnouncements(); - announce(getRouteIdentifiers(announcements)); + if(announcements != null && announcements.size() > 0){ + announce(getRouteIdentifiers(announcements)); + } Map> removals = routeChange.getRemovals(); - remove(getRouteIdentifiers(removals)); + if(removals != null && removals.size() > 0 ) { + remove(getRouteIdentifiers(removals)); + } } /** @@ -54,13 +54,8 @@ public class RoutedRpcListener implements RouteChangeListener> announcements) { LOG.debug("Announcing [{}]", announcements); - AddRoutedRpc addRpcMsg = new AddRoutedRpc(announcements, actorPath); - try { - ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); - } catch (Exception e) { - // Just logging it because Akka API throws this exception - LOG.error(e.toString()); - } + RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements)); + rpcRegistry.tell(addRpcMsg, ActorRef.noSender()); } /** @@ -69,13 +64,8 @@ public class RoutedRpcListener implements RouteChangeListener> removals){ LOG.debug("Removing [{}]", removals); - RemoveRoutedRpc removeRpcMsg = new RemoveRoutedRpc(removals, actorPath); - try { - ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); - } catch (Exception e) { - // Just logging it because Akka API throws this exception - LOG.error(e.toString()); - } + RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals)); + rpcRegistry.tell(removeRpcMsg, ActorRef.noSender()); } /** diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 26e8e960e3..611618f1f6 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -11,17 +11,17 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; +import akka.japi.Pair; import org.opendaylight.controller.remote.rpc.messages.ErrorResponse; import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; +import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.remote.rpc.utils.ActorUtil; +import org.opendaylight.controller.remote.rpc.utils.RoutingLogic; import org.opendaylight.controller.remote.rpc.utils.XmlUtils; +import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; @@ -29,6 +29,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.concurrent.Future; /** @@ -59,81 +60,57 @@ public class RpcBroker extends AbstractUntypedActor { } @Override protected void handleReceive(Object message) throws Exception { - if(message instanceof InvokeRoutedRpc) { - invokeRemoteRoutedRpc((InvokeRoutedRpc) message); - } else if(message instanceof InvokeRpc) { + if(message instanceof InvokeRpc) { invokeRemoteRpc((InvokeRpc) message); } else if(message instanceof ExecuteRpc) { executeRpc((ExecuteRpc) message); } } - private void invokeRemoteRoutedRpc(InvokeRoutedRpc msg) { - // Look up the remote actor to execute rpc - LOG.debug("Looking up the remote actor for route {}", msg); - try { - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier()); - GetRoutedRpc routedRpcMsg = new GetRoutedRpc(routeId); - GetRoutedRpcReply rpcReply = (GetRoutedRpcReply) ActorUtil.executeLocalOperation(rpcRegistry, routedRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); - - String remoteActorPath = rpcReply.getRoutePath(); - if(remoteActorPath == null) { - LOG.debug("No remote actor found for rpc execution."); - - getSender().tell(new ErrorResponse( - new IllegalStateException("No remote actor found for rpc execution.")), self()); - } else { - - ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc()); - - Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath), - executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION); - - getSender().tell(operationRes, self()); - } - } catch (Exception e) { - LOG.error(e.toString()); - getSender().tell(new ErrorResponse(e), self()); - } - } - private void invokeRemoteRpc(InvokeRpc msg) { // Look up the remote actor to execute rpc LOG.debug("Looking up the remote actor for route {}", msg); try { - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), null); - GetRpc rpcMsg = new GetRpc(routeId); - GetRpcReply rpcReply = (GetRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); - String remoteActorPath = rpcReply.getRoutePath(); + // Find router + RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier()); + RpcRegistry.Messages.FindRouters rpcMsg = new RpcRegistry.Messages.FindRouters(routeId); + RpcRegistry.Messages.FindRoutersReply rpcReply = + (RpcRegistry.Messages.FindRoutersReply) ActorUtil.executeOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); + + List> actorRefList = rpcReply.getRouterWithUpdateTime(); - if(remoteActorPath == null) { + if(actorRefList == null || actorRefList.isEmpty()) { LOG.debug("No remote actor found for rpc {{}}.", msg.getRpc()); getSender().tell(new ErrorResponse( - new IllegalStateException("No remote actor found for rpc execution of : " + msg.getRpc())), self()); + new IllegalStateException("No remote actor found for rpc execution of : " + msg.getRpc())), self()); } else { + RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList); ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc()); - Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath), + Object operationRes = ActorUtil.executeOperation(logic.select(), executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION); getSender().tell(operationRes, self()); } } catch (Exception e) { - LOG.error(e.toString()); + LOG.error("invokeRemoteRpc: {}", e); getSender().tell(new ErrorResponse(e), self()); } } + + private void executeRpc(ExecuteRpc msg) { LOG.debug("Executing rpc for rpc {}", msg.getRpc()); try { - Future> rpc = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext)); + Future> rpc = brokerSession.rpc(msg.getRpc(), + XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext)); RpcResult rpcResult = rpc != null ? rpc.get():null; CompositeNode result = rpcResult != null ? rpcResult.getResult() : null; getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self()); } catch (Exception e) { - LOG.error(e.toString()); + LOG.error("executeRpc: {}", e); getSender().tell(new ErrorResponse(e), self()); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java index f614990669..dee98521ae 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java @@ -10,50 +10,42 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorRef; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; import org.opendaylight.yangtools.yang.common.QName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + public class RpcListener implements RpcRegistrationListener{ private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class); private final ActorRef rpcRegistry; - private final String actorPath; - public RpcListener(ActorRef rpcRegistry, String actorPath) { + public RpcListener(ActorRef rpcRegistry) { this.rpcRegistry = rpcRegistry; - this.actorPath = actorPath; } @Override public void onRpcImplementationAdded(QName rpc) { LOG.debug("Adding registration for [{}]", rpc); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null); - AddRpc addRpcMsg = new AddRpc(routeId, actorPath); - try { - ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); - LOG.debug("Route added [{}-{}]", routeId, this.actorPath); - } catch (Exception e) { - // Just logging it because Akka API throws this exception - LOG.error(e.toString()); - } - + RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc, null); + List> routeIds = new ArrayList<>(); + routeIds.add(routeId); + RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds); + rpcRegistry.tell(addRpcMsg, ActorRef.noSender()); } @Override public void onRpcImplementationRemoved(QName rpc) { LOG.debug("Removing registration for [{}]", rpc); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null); - RemoveRpc removeRpcMsg = new RemoveRpc(routeId); - try { - ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); - } catch (Exception e) { - // Just logging it because Akka API throws this exception - LOG.error(e.toString()); - } + RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, rpc, null); + List> routeIds = new ArrayList<>(); + routeIds.add(routeId); + RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds); + rpcRegistry.tell(removeRpcMsg, ActorRef.noSender()); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index 514a2f141d..96f2472428 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -16,8 +16,7 @@ import akka.actor.SupervisorStrategy; import akka.japi.Creator; import akka.japi.Function; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; -import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistryOld; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.opendaylight.yangtools.yang.common.QName; @@ -38,7 +37,6 @@ public class RpcManager extends AbstractUntypedActor { private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class); private SchemaContext schemaContext; - private final ClusterWrapper clusterWrapper; private ActorRef rpcBroker; private ActorRef rpcRegistry; private final Broker.ProviderSession brokerSession; @@ -47,9 +45,8 @@ public class RpcManager extends AbstractUntypedActor { private RemoteRpcImplementation rpcImplementation; private final RpcProvisionRegistry rpcProvisionRegistry; - private RpcManager(ClusterWrapper clusterWrapper, SchemaContext schemaContext, + private RpcManager(SchemaContext schemaContext, Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) { - this.clusterWrapper = clusterWrapper; this.schemaContext = schemaContext; this.brokerSession = brokerSession; this.rpcProvisionRegistry = rpcProvisionRegistry; @@ -59,12 +56,12 @@ public class RpcManager extends AbstractUntypedActor { } - public static Props props(final ClusterWrapper clusterWrapper, final SchemaContext schemaContext, + public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) { return Props.create(new Creator() { @Override public RpcManager create() throws Exception { - return new RpcManager(clusterWrapper, schemaContext, brokerSession, rpcProvisionRegistry); + return new RpcManager(schemaContext, brokerSession, rpcProvisionRegistry); } }); } @@ -72,16 +69,19 @@ public class RpcManager extends AbstractUntypedActor { private void createRpcActors() { LOG.debug("Create rpc registry and broker actors"); - rpcRegistry = getContext().actorOf(RpcRegistryOld.props(clusterWrapper), ActorConstants.RPC_REGISTRY); + + rpcRegistry = getContext().actorOf(Props.create(RpcRegistry.class), ActorConstants.RPC_REGISTRY); + rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER); + RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker); + rpcRegistry.tell(localRouter, self()); } private void startListeners() { LOG.debug("Registers rpc listeners"); - String rpcBrokerPath = clusterWrapper.getAddress().toString() + ActorConstants.RPC_BROKER_PATH; - rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath); - routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath); + rpcListener = new RpcListener(rpcRegistry); + routeChangeListener = new RoutedRpcListener(rpcRegistry); rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext); brokerSession.addRpcRegistrationListener(rpcListener); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRoutedRpc.java deleted file mode 100644 index fd1af2b33c..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRoutedRpc.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.messages; - -import com.google.common.base.Preconditions; -import org.opendaylight.controller.sal.connector.api.RpcRouter; - -import java.io.Serializable; -import java.util.Set; - -public class AddRoutedRpc implements Serializable { - - private final Set> announcements; - private final String actorPath; - - public AddRoutedRpc(final Set> announcements, final String actorPath) { - Preconditions.checkNotNull(announcements, "Route identifier should not be null"); - Preconditions.checkNotNull(actorPath, "Actor path should not be null"); - - this.announcements = announcements; - this.actorPath = actorPath; - } - - public Set> getAnnouncements() { - return announcements; - } - - public String getActorPath() { - return actorPath; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRpc.java deleted file mode 100644 index 7eaa8f0618..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRpc.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.messages; - -import com.google.common.base.Preconditions; -import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; - -import java.io.Serializable; - -public class AddRpc implements Serializable { - - private final RouteIdentifierImpl routeId; - private final String actorPath; - - public AddRpc(final RouteIdentifierImpl routeId, final String actorPath) { - Preconditions.checkNotNull(routeId, "Route identifier should not be null"); - Preconditions.checkNotNull(actorPath, "Actor path should not be null"); - - this.routeId = routeId; - this.actorPath = actorPath; - } - - public RouteIdentifierImpl getRouteId() { - return routeId; - } - - public String getActorPath() { - return actorPath; - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpc.java deleted file mode 100644 index e8d2262182..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpc.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.messages; - - -import com.google.common.base.Preconditions; -import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; - -import java.io.Serializable; - -public class GetRoutedRpc implements Serializable { - - private final RouteIdentifierImpl routeId; - - public GetRoutedRpc(final RouteIdentifierImpl routeId) { - Preconditions.checkNotNull(routeId, "route id should not be null"); - this.routeId = routeId; - } - - public RouteIdentifierImpl getRouteId() { - return routeId; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpcReply.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpcReply.java deleted file mode 100644 index d426662192..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpcReply.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.opendaylight.controller.remote.rpc.messages; - -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -import java.io.Serializable; - -public class GetRoutedRpcReply implements Serializable { - - private final String routePath; - - public GetRoutedRpcReply(final String routePath) { - this.routePath = routePath; - } - - public String getRoutePath() { - return routePath; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpc.java deleted file mode 100644 index c1d4240dca..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpc.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc.messages; - -import com.google.common.base.Preconditions; -import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; - -import java.io.Serializable; - -public class GetRpc implements Serializable { - - private final RouteIdentifierImpl routeId; - - public GetRpc(final RouteIdentifierImpl routeId) { - Preconditions.checkNotNull(routeId, "Route Id should not be null"); - this.routeId = routeId; - } - - public RouteIdentifierImpl getRouteId() { - return routeId; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpcReply.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpcReply.java deleted file mode 100644 index aaf089d16f..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpcReply.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.opendaylight.controller.remote.rpc.messages; - -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -import java.io.Serializable; - -public class GetRpcReply implements Serializable { - - private final String routePath; - - public GetRpcReply(final String routePath) { - this.routePath = routePath; - } - - public String getRoutePath() { - return routePath; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRoutedRpc.java deleted file mode 100644 index fd73144e1d..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRoutedRpc.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc.messages; - -import com.google.common.base.Preconditions; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; - -import java.io.Serializable; - -public class InvokeRoutedRpc implements Serializable { - - private final QName rpc; - private final YangInstanceIdentifier identifier; - private final CompositeNode input; - - public InvokeRoutedRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) { - Preconditions.checkNotNull(rpc, "rpc qname should not be null"); - Preconditions.checkNotNull(identifier, "instance identifier of routed rpc should not be null"); - Preconditions.checkNotNull(input, "rpc input should not be null"); - - this.rpc = rpc; - this.identifier = identifier; - this.input = input; - } - - public QName getRpc() { - return rpc; - } - - public YangInstanceIdentifier getIdentifier() { - return identifier; - } - - public CompositeNode getInput() { - return input; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java index 94b7fe4024..59d09fc41b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java @@ -10,19 +10,22 @@ package org.opendaylight.controller.remote.rpc.messages; import com.google.common.base.Preconditions; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import java.io.Serializable; public class InvokeRpc implements Serializable { private final QName rpc; + private final YangInstanceIdentifier identifier; private final CompositeNode input; - public InvokeRpc(final QName rpc, final CompositeNode input) { + public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) { Preconditions.checkNotNull(rpc, "rpc qname should not be null"); Preconditions.checkNotNull(input, "rpc input should not be null"); this.rpc = rpc; + this.identifier = identifier; this.input = input; } @@ -30,6 +33,10 @@ public class InvokeRpc implements Serializable { return rpc; } + public YangInstanceIdentifier getIdentifier() { + return identifier; + } + public CompositeNode getInput() { return input; } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRoutedRpc.java deleted file mode 100644 index b560b8c8c0..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRoutedRpc.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.messages; - -import com.google.common.base.Preconditions; -import org.opendaylight.controller.sal.connector.api.RpcRouter; - -import java.io.Serializable; -import java.util.Set; - -public class RemoveRoutedRpc implements Serializable { - - private final Set> announcements; - private final String actorPath; - - public RemoveRoutedRpc(final Set> announcements, final String actorPath) { - Preconditions.checkNotNull(announcements, "Route identifier should not be null"); - Preconditions.checkNotNull(actorPath, "Actor path should not be null"); - - this.announcements = announcements; - this.actorPath = actorPath; - } - - public Set> getAnnouncements() { - return announcements; - } - - public String getActorPath() { - return actorPath; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRpc.java deleted file mode 100644 index 289334fccc..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRpc.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.messages; - -import com.google.common.base.Preconditions; -import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; - -import java.io.Serializable; - -public class RemoveRpc implements Serializable { - - private final RouteIdentifierImpl routeId; - - public RemoveRpc(final RouteIdentifierImpl routeId) { - Preconditions.checkNotNull(routeId, "Route Id should not be null"); - - this.routeId = routeId; - } - - public RouteIdentifierImpl getRouteId() { - return routeId; - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RoutingTableData.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RoutingTableData.java deleted file mode 100644 index c57a258426..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RoutingTableData.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.messages; - -import org.opendaylight.controller.sal.connector.api.RpcRouter; - -import java.io.Serializable; -import java.util.LinkedHashSet; -import java.util.Map; - -public class RoutingTableData implements Serializable { - private final Map, String> rpcMap; - private final Map, LinkedHashSet> routedRpcMap; - - public RoutingTableData(final Map, String> rpcMap, - final Map, LinkedHashSet> routedRpcMap) { - this.rpcMap = rpcMap; - this.routedRpcMap = routedRpcMap; - } - - public Map, String> getRpcMap() { - return rpcMap; - } - - public Map, LinkedHashSet> getRoutedRpcMap() { - return routedRpcMap; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapper.java deleted file mode 100644 index 4ddc2bee85..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapper.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.registry; - - -import akka.actor.Address; -import akka.cluster.ClusterEvent; - -public interface ClusterWrapper { - - ClusterEvent.CurrentClusterState getState(); - - Address getAddress(); -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapperImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapperImpl.java deleted file mode 100644 index 89603a134c..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapperImpl.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.registry; - - -import akka.actor.ActorSystem; -import akka.actor.Address; -import akka.cluster.Cluster; -import akka.cluster.ClusterEvent; - - -public class ClusterWrapperImpl implements ClusterWrapper{ - - private Cluster cluster; - - public ClusterWrapperImpl(ActorSystem actorSystem) { - cluster = Cluster.get(actorSystem); - } - - @Override - public ClusterEvent.CurrentClusterState getState() { - return cluster.state(); - } - - @Override - public Address getAddress() { - return cluster.selfAddress(); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java deleted file mode 100644 index 5951776f2c..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.registry; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public class RoutingTableOld { - - private final Logger LOG = LoggerFactory.getLogger(RoutingTableOld.class); - - private ConcurrentMap globalRpcMap = new ConcurrentHashMap<>(); - private ConcurrentMap> routedRpcMap = new ConcurrentHashMap<>(); - - public ConcurrentMap getGlobalRpcMap() { - return globalRpcMap; - } - - public ConcurrentMap> getRoutedRpcMap() { - return routedRpcMap; - } - - public R getGlobalRoute(final I routeId) { - Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!"); - return globalRpcMap.get(routeId); - } - - public void addGlobalRoute(final I routeId, final R route) { - Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!"); - Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!"); - LOG.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route); - if(globalRpcMap.putIfAbsent(routeId, route) != null) { - LOG.debug("A route already exist for route id [{}] ", routeId); - } - } - - public void removeGlobalRoute(final I routeId) { - Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!"); - LOG.debug("removeGlobalRoute: removing a new route with id [{}]", routeId); - globalRpcMap.remove(routeId); - } - - public Set getRoutedRpc(final I routeId) { - Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!"); - Set routes = routedRpcMap.get(routeId); - - if (routes == null) { - return Collections.emptySet(); - } - - return ImmutableSet.copyOf(routes); - } - - public R getLastAddedRoutedRpc(final I routeId) { - - Set routes = getRoutedRpc(routeId); - - if (routes.isEmpty()) { - return null; - } - - R route = null; - Iterator iter = routes.iterator(); - while (iter.hasNext()) { - route = iter.next(); - } - - return route; - } - - public void addRoutedRpc(final I routeId, final R route) { - Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null"); - Preconditions.checkNotNull(route, "addRoute: route cannot be null"); - LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route); - threadSafeAdd(routeId, route); - } - - public void addRoutedRpcs(final Set routeIds, final R route) { - Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null"); - for (I routeId : routeIds){ - addRoutedRpc(routeId, route); - } - } - - public void removeRoute(final I routeId, final R route) { - Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!"); - Preconditions.checkNotNull(route, "removeRoute: route cannot be null!"); - - LinkedHashSet routes = routedRpcMap.get(routeId); - if (routes == null) { - return; - } - LOG.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route); - threadSafeRemove(routeId, route); - } - - public void removeRoutes(final Set routeIds, final R route) { - Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null"); - for (I routeId : routeIds){ - removeRoute(routeId, route); - } - } - - /** - * This method guarantees that no 2 thread over write each other's changes. - * Just so that we dont end up in infinite loop, it tries for 100 times then throw - */ - private void threadSafeAdd(final I routeId, final R route) { - - for (int i=0;i<100;i++){ - - LinkedHashSet updatedRoutes = new LinkedHashSet<>(); - updatedRoutes.add(route); - LinkedHashSet oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes); - if (oldRoutes == null) { - return; - } - - updatedRoutes = new LinkedHashSet<>(oldRoutes); - updatedRoutes.add(route); - - if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) { - return; - } - } - //the method did not already return means it failed to add route in 100 attempts - throw new IllegalStateException("Failed to add route [" + routeId + "]"); - } - - /** - * This method guarantees that no 2 thread over write each other's changes. - * Just so that we dont end up in infinite loop, it tries for 100 times then throw - */ - private void threadSafeRemove(final I routeId, final R route) { - LinkedHashSet updatedRoutes = null; - for (int i=0;i<100;i++){ - LinkedHashSet oldRoutes = routedRpcMap.get(routeId); - - // if route to be deleted is the only entry in the set then remove routeId from the cache - if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){ - routedRpcMap.remove(routeId); - return; - } - - // if there are multiple routes for this routeId, remove the route to be deleted only from the set. - updatedRoutes = new LinkedHashSet<>(oldRoutes); - updatedRoutes.remove(route); - if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) { - return; - } - - } - //the method did not already return means it failed to remove route in 100 attempts - throw new IllegalStateException("Failed to remove route [" + routeId + "]"); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java deleted file mode 100644 index 96c8802ce6..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc.registry; - -import akka.actor.ActorSelection; -import akka.actor.Address; -import akka.actor.Props; -import akka.cluster.ClusterEvent; -import akka.cluster.Member; -import akka.japi.Creator; -import org.opendaylight.controller.remote.rpc.AbstractUntypedActor; -import org.opendaylight.controller.remote.rpc.ActorConstants; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; -import org.opendaylight.controller.remote.rpc.messages.RoutingTableData; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.JavaConversions; - -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -/** - * This Actor maintains the routing table state and sync it with other nodes in the cluster. - * - * A scheduler runs after an interval of time, which pick a random member from the cluster - * and send the current state of routing table to the member. - * - * when a message of routing table data is received, it gets merged with the local routing table - * to keep the latest data. - */ - -public class RpcRegistryOld extends AbstractUntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(RpcRegistryOld.class); - private RoutingTableOld, String> routingTable; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private final ClusterWrapper clusterWrapper; - private final ScheduledFuture syncScheduler; - - private RpcRegistryOld(ClusterWrapper clusterWrapper){ - this.routingTable = new RoutingTableOld<>(); - this.clusterWrapper = clusterWrapper; - this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS); - } - - public static Props props(final ClusterWrapper clusterWrapper){ - return Props.create(new Creator(){ - - @Override - public RpcRegistryOld create() throws Exception { - return new RpcRegistryOld(clusterWrapper); - } - }); - } - - @Override - protected void handleReceive(Object message) throws Exception { - LOG.debug("Received message {}", message); - if(message instanceof RoutingTableData) { - syncRoutingTable((RoutingTableData) message); - } else if(message instanceof GetRoutedRpc) { - getRoutedRpc((GetRoutedRpc) message); - } else if(message instanceof GetRpc) { - getRpc((GetRpc) message); - } else if(message instanceof AddRpc) { - addRpc((AddRpc) message); - } else if(message instanceof RemoveRpc) { - removeRpc((RemoveRpc) message); - } else if(message instanceof AddRoutedRpc) { - addRoutedRpc((AddRoutedRpc) message); - } else if(message instanceof RemoveRoutedRpc) { - removeRoutedRpc((RemoveRoutedRpc) message); - } - } - - private void getRoutedRpc(GetRoutedRpc rpcMsg){ - LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId()); - GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath); - - getSender().tell(routedRpcReply, self()); - } - - private void getRpc(GetRpc rpcMsg) { - LOG.debug("Get global Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId()); - GetRpcReply rpcReply = new GetRpcReply(remoteActorPath); - - getSender().tell(rpcReply, self()); - } - - private void addRpc(AddRpc rpcMsg) { - LOG.debug("Add Rpc to routing table {}", rpcMsg); - routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath()); - - getSender().tell("Success", self()); - } - - private void removeRpc(RemoveRpc rpcMsg) { - LOG.debug("Removing Rpc to routing table {}", rpcMsg); - routingTable.removeGlobalRoute(rpcMsg.getRouteId()); - - getSender().tell("Success", self()); - } - - private void addRoutedRpc(AddRoutedRpc rpcMsg) { - routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } - - private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) { - routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } - - private void syncRoutingTable(RoutingTableData routingTableData) { - LOG.debug("Syncing routing table {}", routingTableData); - - Map, String> newRpcMap = routingTableData.getRpcMap(); - Set> routeIds = newRpcMap.keySet(); - for(RpcRouter.RouteIdentifier routeId : routeIds) { - routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId)); - } - - Map, LinkedHashSet> newRoutedRpcMap = - routingTableData.getRoutedRpcMap(); - routeIds = newRoutedRpcMap.keySet(); - - for(RpcRouter.RouteIdentifier routeId : routeIds) { - Set routeAddresses = newRoutedRpcMap.get(routeId); - for(String routeAddress : routeAddresses) { - routingTable.addRoutedRpc(routeId, routeAddress); - } - } - } - - private ActorSelection getRandomRegistryActor() { - ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState(); - ActorSelection actor = null; - Set members = JavaConversions.asJavaSet(clusterState.members()); - int memberSize = members.size(); - // Don't select yourself - if(memberSize > 1) { - Address currentNodeAddress = clusterWrapper.getAddress(); - int index = new Random().nextInt(memberSize); - int i = 0; - // keeping previous member, in case when random index member is same as current actor - // and current actor member is last in set - Member previousMember = null; - for(Member member : members){ - if(i == index-1) { - previousMember = member; - } - if(i == index) { - if(!currentNodeAddress.equals(member.address())) { - actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH); - break; - } else if(index < memberSize-1){ // pick the next element in the set - index++; - } - } - i++; - } - if(actor == null && previousMember != null) { - actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH); - } - } - return actor; - } - - private class SendRoutingTable implements Runnable { - - @Override - public void run() { - RoutingTableData routingTableData = - new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap()); - LOG.debug("Sending routing table for sync {}", routingTableData); - ActorSelection actor = getRandomRegistryActor(); - if(actor != null) { - actor.tell(routingTableData, self()); - } - } - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java index 8f60eabf5f..b7b2216a08 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.remote.rpc.utils; import akka.actor.ActorRef; -import akka.actor.ActorSelection; import akka.util.Timeout; import scala.concurrent.Await; import scala.concurrent.Future; @@ -36,27 +35,13 @@ public class ActorUtil { * @param awaitDuration * @return The response of the operation */ - public static Object executeLocalOperation(ActorRef actor, Object message, - FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{ + public static Object executeOperation(ActorRef actor, Object message, + FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{ Future future = ask(actor, message, new Timeout(askDuration)); return Await.result(future, awaitDuration); } - /** - * Execute an operation on a remote actor and wait for it's response - * @param actor - * @param message - * @param askDuration - * @param awaitDuration - * @return - */ - public static Object executeRemoteOperation(ActorSelection actor, Object message, - FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{ - Future future = - ask(actor, message, new Timeout(askDuration)); - return Await.result(future, awaitDuration); - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogic.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogic.java new file mode 100644 index 0000000000..f01baf009b --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogic.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.utils; + +import akka.actor.ActorRef; +import akka.japi.Pair; +import com.google.common.base.Preconditions; + +import java.util.Collection; +import java.util.Comparator; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * This class will return First Entry + */ +public class LatestEntryRoutingLogic implements RoutingLogic{ + + private SortedSet> actorRefSet; + + public LatestEntryRoutingLogic(Collection> entries) { + Preconditions.checkNotNull(entries, "Entries should not be null"); + Preconditions.checkArgument(!entries.isEmpty(), "Entries collection should not be empty"); + + actorRefSet = new TreeSet<>(new LatestEntryComparator()); + actorRefSet.addAll(entries); + } + + @Override + public ActorRef select() { + return actorRefSet.last().first(); + } + + + private class LatestEntryComparator implements Comparator> { + + @Override + public int compare(Pair o1, Pair o2) { + if(o1 == null && o2 == null) { + return 0; + } + if(o1 == null && o2 != null) { + return -1; + } + if(o1 != null && o2 == null) { + return 1; + } + + return o1.second().compareTo(o2.second()); + + } + + } +} + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/RoutingLogic.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/RoutingLogic.java new file mode 100644 index 0000000000..4de71949fc --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/RoutingLogic.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.utils; + +import akka.actor.ActorRef; + +/** + * This Interface is added to abstract out the way rpc execution could be + * routed, if more than one node in cluster is capable of executing the rpc. + * + * We can pick node randomly, round robin manner or based on last updated time etc. + */ + +public interface RoutingLogic { + + ActorRef select(); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf index daac89c4c8..711ae1c48b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf @@ -53,7 +53,6 @@ odl-cluster-rpc { cluster { seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"] - auto-down-unreachable-after = 10s } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java new file mode 100644 index 0000000000..ed5fa6d16e --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + + +import akka.actor.ActorSystem; +import junit.framework.Assert; +import org.junit.After; +import org.junit.Test; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ActorSystemFactoryTest { + ActorSystem system = null; + + @Test + public void testActorSystemCreation(){ + BundleContext context = mock(BundleContext.class); + when(context.getBundle()).thenReturn(mock(Bundle.class)); + ActorSystemFactory.createInstance(context); + system = ActorSystemFactory.getInstance(); + Assert.assertNotNull(system); + // Check illegal state exception + + try { + ActorSystemFactory.createInstance(context); + fail("Illegal State exception should be thrown, while creating actor system second time"); + } catch (IllegalStateException e) { + } + } + + @After + public void cleanup() throws InterruptedException { + if(system != null) { + system.shutdown(); + } + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java new file mode 100644 index 0000000000..17ad237ad7 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + + +package org.opendaylight.controller.remote.rpc; + + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import com.typesafe.config.ConfigFactory; +import junit.framework.Assert; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; +import org.opendaylight.controller.sal.core.api.model.SchemaService; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + + +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteRpcProviderTest { + + static ActorSystem system; + + + @BeforeClass + public static void setup() throws InterruptedException { + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster")); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testRemoteRpcProvider() throws Exception { + RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(RpcProvisionRegistry.class)); + Broker.ProviderSession session = mock(Broker.ProviderSession.class); + SchemaService schemaService = mock(SchemaService.class); + when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class)); + when(session.getService(SchemaService.class)).thenReturn(schemaService); + rpcProvider.onSessionInitiated(session); + ActorRef actorRef = Await.result(system.actorSelection(ActorConstants.RPC_MANAGER_PATH).resolveOne(Duration.create(1, TimeUnit.SECONDS)), + Duration.create(2, TimeUnit.SECONDS)); + Assert.assertTrue(actorRef.path().toString().contains(ActorConstants.RPC_MANAGER_PATH)); + } + + + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java new file mode 100644 index 0000000000..9b6215addd --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import com.typesafe.config.ConfigFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.sal.core.api.RpcRoutingContext; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +import java.net.URI; +import java.net.URISyntaxException; + +public class RouteRpcListenerTest { + + static ActorSystem system; + + + @BeforeClass + public static void setup() throws InterruptedException { + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster")); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testRouteAdd() throws URISyntaxException, InterruptedException { + new JavaTestKit(system) { + { + // Test announcements + JavaTestKit probeReg = new JavaTestKit(system); + ActorRef rpcRegistry = probeReg.getRef(); + + RoutedRpcListener rpcListener = new RoutedRpcListener(rpcRegistry); + + QName qName = new QName(new URI("actor2"), "actor2"); + RpcRoutingContext context = RpcRoutingContext.create(qName, qName); + YangInstanceIdentifier identifier = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(qName)); + rpcListener.onRouteChange(RoutingUtils.announcementChange(context, identifier)); + + probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class); + }}; + } + + @Test + public void testRouteRemove() throws URISyntaxException, InterruptedException { + new JavaTestKit(system) { + { + // Test announcements + JavaTestKit probeReg = new JavaTestKit(system); + ActorRef rpcRegistry = probeReg.getRef(); + + RoutedRpcListener rpcListener = new RoutedRpcListener(rpcRegistry); + + QName qName = new QName(new URI("actor2"), "actor2"); + RpcRoutingContext context = RpcRoutingContext.create(qName, qName); + YangInstanceIdentifier identifier = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(qName)); + rpcListener.onRouteChange(RoutingUtils.removalChange(context, identifier)); + + probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class); + }}; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java index 55aa1d6c87..d9a3b6a414 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java @@ -11,23 +11,21 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.japi.Pair; import akka.testkit.JavaTestKit; import com.google.common.util.concurrent.Futures; +import com.typesafe.config.ConfigFactory; import junit.framework.Assert; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; import org.opendaylight.controller.remote.rpc.messages.ErrorResponse; -import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistryOld; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.sal.common.util.Rpcs; -import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; @@ -35,7 +33,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.ModifyAction; import org.opendaylight.yangtools.yang.data.api.Node; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -43,8 +40,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; -import java.util.Set; +import java.util.List; import java.util.concurrent.Future; import static org.mockito.Mockito.mock; @@ -52,31 +48,50 @@ import static org.mockito.Mockito.when; public class RpcBrokerTest { - static ActorSystem system; + static ActorSystem node1; + static ActorSystem node2; + private ActorRef rpcBroker1; + private JavaTestKit probeReg1; + private ActorRef rpcBroker2; + private JavaTestKit probeReg2; + private Broker.ProviderSession brokerSession; @BeforeClass - public static void setup() { - system = ActorSystem.create(); + public static void setup() throws InterruptedException { + node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA")); + node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB")); } @AfterClass public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; + JavaTestKit.shutdownActorSystem(node1); + JavaTestKit.shutdownActorSystem(node2); + node1 = null; + node2 = null; } + @Before + public void createActor() { + brokerSession = Mockito.mock(Broker.ProviderSession.class); + SchemaContext schemaContext = mock(SchemaContext.class); + probeReg1 = new JavaTestKit(node1); + rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext)); + probeReg2 = new JavaTestKit(node2); + rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext)); + + } @Test - public void testInvokeRpcError() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class))); - Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class); - SchemaContext schemaContext = mock(SchemaContext.class); - ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); + public void testInvokeRpcError() throws Exception { + new JavaTestKit(node1) {{ QName rpc = new QName(new URI("noactor1"), "noactor1"); CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "no child"), new ArrayList>(), ModifyAction.REPLACE); - InvokeRpc invokeMsg = new InvokeRpc(rpc, input); - rpcBroker.tell(invokeMsg, getRef()); + + + InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input); + rpcBroker1.tell(invokeMsg, getRef()); + probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(new ArrayList>())); Boolean getMsg = new ExpectMsg("ErrorResponse") { protected Boolean match(Object in) { @@ -90,114 +105,36 @@ public class RpcBrokerTest { }.get(); // this extracts the received message Assert.assertTrue(getMsg); + }}; } + /** * This test method invokes and executes the remote rpc */ @Test public void testInvokeRpc() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(mock(ClusterWrapper.class))); - Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class); - SchemaContext schemaContext = mock(SchemaContext.class); - ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); - ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor1"); - // Add RPC in table - QName rpc = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null); - final String route = rpcBrokerRemote.path().toString(); - AddRpc rpcMsg = new AddRpc(routeId, route); - rpcRegistry.tell(rpcMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - + new JavaTestKit(node1) {{ + QName rpc = new QName(new URI("noactor1"), "noactor1"); // invoke rpc CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList>(), ModifyAction.REPLACE); - CompositeNode invokeRpcResult = mock(CompositeNode.class); - Collection errors = new ArrayList<>(); - RpcResult result = Rpcs.getRpcResult(true, invokeRpcResult, errors); - Future> rpcResult = Futures.immediateFuture(result); - when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult); - InvokeRpc invokeMsg = new InvokeRpc(rpc, input); - rpcBroker.tell(invokeMsg, getRef()); - - //verify response msg - Boolean getMsg = new ExpectMsg("RpcResponse") { - protected Boolean match(Object in) { - if (in instanceof RpcResponse) { - return true; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - Assert.assertTrue(getMsg); - }}; - } - - @Test - public void testInvokeRoutedRpcError() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class))); - Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class); - SchemaContext schemaContext = mock(SchemaContext.class); - ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); - QName rpc = new QName(new URI("actor1"), "actor1"); - CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList>(), ModifyAction.REPLACE); - InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(rpc)), input); - rpcBroker.tell(invokeMsg, getRef()); + InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input); + rpcBroker1.tell(invokeMsg, getRef()); - Boolean getMsg = new ExpectMsg("ErrorResponse") { - protected Boolean match(Object in) { - if (in instanceof ErrorResponse) { - ErrorResponse reply = (ErrorResponse)in; - return "No remote actor found for rpc execution.".equals(reply.getException().getMessage()); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - Assert.assertTrue(getMsg); - }}; - } + probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + List> routerList = new ArrayList>(); - /** - * This test method invokes and executes the remote routed rpc - */ + routerList.add(new Pair(rpcBroker2, 200L)); - @Test - public void testInvokeRoutedRpc() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(mock(ClusterWrapper.class))); - Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class); - SchemaContext schemaContext = mock(SchemaContext.class); - ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); - ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor2"); - // Add Routed RPC in table - QName rpc = new QName(new URI("actor2"), "actor2"); - YangInstanceIdentifier identifier = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(rpc)); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, identifier); - final String route = rpcBrokerRemote.path().toString(); - Set> routeIds = new HashSet<>(); - routeIds.add(routeId); - - AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route); - rpcRegistry.tell(rpcMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); + probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(routerList)); - // invoke rpc - CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList>(), ModifyAction.REPLACE); CompositeNode invokeRpcResult = mock(CompositeNode.class); Collection errors = new ArrayList<>(); RpcResult result = Rpcs.getRpcResult(true, invokeRpcResult, errors); Future> rpcResult = Futures.immediateFuture(result); when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult); - InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, identifier, input); - rpcBroker.tell(invokeMsg, getRef()); //verify response msg Boolean getMsg = new ExpectMsg("RpcResponse") { @@ -213,5 +150,4 @@ public class RpcBrokerTest { Assert.assertTrue(getMsg); }}; } - } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java new file mode 100644 index 0000000000..7b5a968866 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import com.typesafe.config.ConfigFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.yangtools.yang.common.QName; + +import java.net.URI; +import java.net.URISyntaxException; + +public class RpcListenerTest { + + static ActorSystem system; + + + @BeforeClass + public static void setup() throws InterruptedException { + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster")); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testRpcAdd() throws URISyntaxException { + new JavaTestKit(system) { + { + JavaTestKit probeReg = new JavaTestKit(system); + ActorRef rpcRegistry = probeReg.getRef(); + + RpcListener rpcListener = new RpcListener(rpcRegistry); + + QName qName = new QName(new URI("actor2"), "actor2"); + + rpcListener.onRpcImplementationAdded(qName); + probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class); + }}; + + } + + @Test + public void testRpcRemove() throws URISyntaxException { + new JavaTestKit(system) { + { + JavaTestKit probeReg = new JavaTestKit(system); + ActorRef rpcRegistry = probeReg.getRef(); + + RpcListener rpcListener = new RpcListener(rpcRegistry); + + QName qName = new QName(new URI("actor2"), "actor2"); + + rpcListener.onRpcImplementationRemoved(qName); + probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class); + }}; + + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOldTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOldTest.java deleted file mode 100644 index 524a91288d..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOldTest.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.registry; - -import junit.framework.Assert; -import org.junit.Test; -import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.yangtools.yang.common.QName; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.Set; - -public class RoutingTableOldTest { - - private RoutingTableOld, String> routingTable = - new RoutingTableOld<>(); - - @Test - public void addGlobalRouteNullRouteIdTest() { - try { - routingTable.addGlobalRoute(null, null); - - Assert.fail("Null pointer exception was not thrown."); - } catch (Exception e) { - Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); - Assert.assertEquals("addGlobalRoute: routeId cannot be null!", e.getMessage()); - } - } - - @Test - public void addGlobalRouteNullRouteTest() { - try { - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - routingTable.addGlobalRoute(routeId, null); - - Assert.fail("Null pointer exception was not thrown."); - } catch (Exception e) { - Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); - Assert.assertEquals("addGlobalRoute: route cannot be null!", e.getMessage()); - } - } - - @Test - public void getGlobalRouteNullTest() { - try { - routingTable.getGlobalRoute(null); - - Assert.fail("Null pointer exception was not thrown."); - } catch (Exception e) { - Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); - Assert.assertEquals("getGlobalRoute: routeId cannot be null!", e.getMessage()); - } - } - - @Test - public void getGlobalRouteTest() throws URISyntaxException { - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - String route = "actor1"; - - routingTable.addGlobalRoute(routeId, route); - - String returnedRoute = routingTable.getGlobalRoute(routeId); - - Assert.assertEquals(route, returnedRoute); - - } - - @Test - public void removeGlobalRouteTest() throws URISyntaxException { - QName type = new QName(new URI("actorRemove"), "actorRemove"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - String route = "actorRemove"; - - routingTable.addGlobalRoute(routeId, route); - - String returnedRoute = routingTable.getGlobalRoute(routeId); - - Assert.assertEquals(route, returnedRoute); - - routingTable.removeGlobalRoute(routeId); - - String deletedRoute = routingTable.getGlobalRoute(routeId); - - Assert.assertNull(deletedRoute); - } - - @Test - public void addRoutedRpcNullRouteIdTest() { - try { - routingTable.addRoutedRpc(null, null); - - Assert.fail("Null pointer exception was not thrown."); - } catch (Exception e) { - Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); - Assert.assertEquals("addRoute: routeId cannot be null", e.getMessage()); - } - } - - @Test - public void addRoutedRpcNullRouteTest() { - try { - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - - routingTable.addRoutedRpc(routeId, null); - - Assert.fail("Null pointer exception was not thrown."); - } catch (Exception e) { - Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); - Assert.assertEquals("addRoute: route cannot be null", e.getMessage()); - } - } - - @Test - public void getRoutedRpcNullTest() { - try { - routingTable.getRoutedRpc(null); - - Assert.fail("Null pointer exception was not thrown."); - } catch (Exception e) { - Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); - Assert.assertEquals("getRoutes: routeId cannot be null!", e.getMessage()); - } - } - - @Test - public void getRoutedRpcTest() throws URISyntaxException { - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - String route = "actor1"; - - routingTable.addRoutedRpc(routeId, route); - - Set routes = routingTable.getRoutedRpc(routeId); - - Assert.assertEquals(1, routes.size()); - Assert.assertTrue(routes.contains(route)); - - } - - @Test - public void getLastRoutedRpcTest() throws URISyntaxException { - QName type = new QName(new URI("first1"), "first1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - String route = "first1"; - - routingTable.addRoutedRpc(routeId, route); - - String route2 = "second1"; - routingTable.addRoutedRpc(routeId, route2); - - String latest = routingTable.getLastAddedRoutedRpc(routeId); - Assert.assertEquals(route2, latest); - - } - - @Test - public void removeRoutedRpcTest() throws URISyntaxException { - QName type = new QName(new URI("remove"), "remove"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - String route = "remove"; - routingTable.addRoutedRpc(routeId, route); - - String latest = routingTable.getLastAddedRoutedRpc(routeId); - Assert.assertEquals(route, latest); - - routingTable.removeRoute(routeId, route); - String removed = routingTable.getLastAddedRoutedRpc(routeId); - Assert.assertNull(removed); - } - - @Test - public void removeRoutedRpcsTest() throws URISyntaxException { - QName type = new QName(new URI("remove1"), "remove1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - - QName type2 = new QName(new URI("remove2"), "remove2"); - RouteIdentifierImpl routeId2 = new RouteIdentifierImpl(null, type2, null); - - Set> routeIds = new HashSet<>(); - routeIds.add(routeId); - routeIds.add(routeId2); - String route = "remove1"; - - routingTable.addRoutedRpcs(routeIds, route); - String latest1 = routingTable.getLastAddedRoutedRpc(routeId); - Assert.assertEquals(route, latest1); - - String latest2 = routingTable.getLastAddedRoutedRpc(routeId2); - Assert.assertEquals(route, latest2); - - routingTable.removeRoutes(routeIds, route); - String removed1 = routingTable.getLastAddedRoutedRpc(routeId); - Assert.assertNull(removed1); - - String removed2 = routingTable.getLastAddedRoutedRpc(routeId2); - Assert.assertNull(removed2); - } - -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOldTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOldTest.java deleted file mode 100644 index 0f711b4e85..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOldTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.registry; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import junit.framework.Assert; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; -import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.yangtools.yang.common.QName; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.Set; - -public class RpcRegistryOldTest { - - static ActorSystem system; - - - @BeforeClass - public static void setup() { - system = ActorSystem.create(); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } - - /** - This test add, read and remove an entry in global rpc - */ - @Test - public void testGlobalRpc() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class))); - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - final String route = "actor1"; - - AddRpc rpcMsg = new AddRpc(routeId, route); - rpcRegistry.tell(rpcMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - - GetRpc getRpc = new GetRpc(routeId); - rpcRegistry.tell(getRpc, getRef()); - - Boolean getMsg = new ExpectMsg("GetRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRpcReply) { - GetRpcReply reply = (GetRpcReply)in; - return route.equals(reply.getRoutePath()); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - Assert.assertTrue(getMsg); - - RemoveRpc removeMsg = new RemoveRpc(routeId); - rpcRegistry.tell(removeMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - - rpcRegistry.tell(getRpc, getRef()); - - Boolean getNullMsg = new ExpectMsg("GetRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRpcReply) { - GetRpcReply reply = (GetRpcReply)in; - return reply.getRoutePath() == null; - } else { - throw noMatch(); - } - } - }.get(); - Assert.assertTrue(getNullMsg); - }}; - - } - - /** - This test add, read and remove an entry in routed rpc - */ - @Test - public void testRoutedRpc() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class))); - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - final String route = "actor1"; - - Set> routeIds = new HashSet<>(); - routeIds.add(routeId); - - AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route); - rpcRegistry.tell(rpcMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - - GetRoutedRpc getRpc = new GetRoutedRpc(routeId); - rpcRegistry.tell(getRpc, getRef()); - - Boolean getMsg = new ExpectMsg("GetRoutedRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRoutedRpcReply) { - GetRoutedRpcReply reply = (GetRoutedRpcReply)in; - return route.equals(reply.getRoutePath()); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - Assert.assertTrue(getMsg); - - RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route); - rpcRegistry.tell(removeMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - - rpcRegistry.tell(getRpc, getRef()); - - Boolean getNullMsg = new ExpectMsg("GetRoutedRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRoutedRpcReply) { - GetRoutedRpcReply reply = (GetRoutedRpcReply)in; - return reply.getRoutePath() == null; - } else { - throw noMatch(); - } - } - }.get(); - Assert.assertTrue(getNullMsg); - }}; - - } - -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index 7e87da0f99..fd6664af9e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -34,7 +34,7 @@ public class BucketStoreTest { @BeforeClass public static void setup() { - system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster")); system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); store = createStore(); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java index f076c136fe..bb60ed6eec 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java @@ -45,8 +45,7 @@ public class GossiperTest { @BeforeClass public static void setup() throws InterruptedException { - Thread.sleep(1000);//give some time for previous test to stop the system. Netty port conflict arises otherwise. - system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster")); system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); gossiper = createGossiper(); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogicTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogicTest.java new file mode 100644 index 0000000000..b21f0f0069 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogicTest.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.utils; + + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import com.typesafe.config.ConfigFactory; +import junit.framework.Assert; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + + +public class LatestEntryRoutingLogicTest { + + static ActorSystem system; + + @BeforeClass + public static void setup() throws InterruptedException { + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster")); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testRoutingLogic() { + List> pairList = new ArrayList<>(); + TestProbe probe1 = new TestProbe(system); + TestProbe probe2 = new TestProbe(system); + TestProbe probe3 = new TestProbe(system); + ActorRef actor1 = probe1.ref(); + ActorRef actor2 = probe2.ref(); + ActorRef actor3 = probe3.ref(); + pairList.add(new Pair(actor1, 1000L)); + pairList.add(new Pair(actor2, 3000L)); + pairList.add(new Pair(actor3, 2000L)); + RoutingLogic logic = new LatestEntryRoutingLogic(pairList); + Assert.assertTrue(logic.select().equals(actor2)); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf index 459eb78903..61fab7e0fe 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf @@ -18,12 +18,12 @@ odl-cluster{ log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" - port = 2551 + port = 2550 } } cluster { - seed-nodes = ["akka.tcp://opendaylight-rpc@127.0.0.1:2551"] + seed-nodes = ["akka.tcp://opendaylight-rpc@127.0.0.1:2550"] auto-down-unreachable-after = 10s }