From: Abhishek Kumar Date: Thu, 31 Jul 2014 21:18:28 +0000 (-0700) Subject: Gossip based eventually consistent RPC Registry. X-Git-Tag: release/helium~321 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=08d30cebbbec8da5596840c6ef830687a76d9d33 Gossip based eventually consistent RPC Registry. There are 2 main components 1. Bucket Store: Stores data in buckets. Each cluster node gets a bucket. These buckets are sync'ed across nodes using Gossip protocol. Bucket Store uses a Gossiper that implements the protocol to sync data. 2. Rpc Registry: This uses the bucket store to store routing table. Routing table maintains mapping of RPC <--> Node Rpc Broker uses this regitry to route rpc requests to remote nodes. Change-Id: Ifaf8955dbf6e3074d4d2951d6f503ecc0624d141 Signed-off-by: Abhishek Kumar --- diff --git a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml index a2bee8ffee..50af48effc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml @@ -119,7 +119,7 @@ test - + org.slf4j slf4j-simple ${slf4j.version} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index 5c56455bd0..514a2f141d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -17,7 +17,7 @@ import akka.japi.Creator; import akka.japi.Function; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistryOld; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.opendaylight.yangtools.yang.common.QName; @@ -72,7 +72,7 @@ public class RpcManager extends AbstractUntypedActor { private void createRpcActors() { LOG.debug("Create rpc registry and broker actors"); - rpcRegistry = getContext().actorOf(RpcRegistry.props(clusterWrapper), ActorConstants.RPC_REGISTRY); + rpcRegistry = getContext().actorOf(RpcRegistryOld.props(clusterWrapper), ActorConstants.RPC_REGISTRY); rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java index 5e19653a22..c25aa523e2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -5,167 +5,79 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.remote.rpc.registry; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import akka.actor.ActorRef; +import akka.japi.Option; +import akka.japi.Pair; +import org.opendaylight.controller.remote.rpc.registry.gossip.Copier; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import java.io.Serializable; import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public class RoutingTable { - - private final Logger LOG = LoggerFactory.getLogger(RoutingTable.class); - - private ConcurrentMap globalRpcMap = new ConcurrentHashMap<>(); - private ConcurrentMap> routedRpcMap = new ConcurrentHashMap<>(); - - public ConcurrentMap getGlobalRpcMap() { - return globalRpcMap; - } - - public ConcurrentMap> getRoutedRpcMap() { - return routedRpcMap; - } - - public R getGlobalRoute(final I routeId) { - Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!"); - return globalRpcMap.get(routeId); - } - - public void addGlobalRoute(final I routeId, final R route) { - Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!"); - Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!"); - LOG.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route); - if(globalRpcMap.putIfAbsent(routeId, route) != null) { - LOG.debug("A route already exist for route id [{}] ", routeId); - } - } +import java.util.HashMap; +import java.util.Map; - public void removeGlobalRoute(final I routeId) { - Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!"); - LOG.debug("removeGlobalRoute: removing a new route with id [{}]", routeId); - globalRpcMap.remove(routeId); - } +public class RoutingTable implements Copier, Serializable { - public Set getRoutedRpc(final I routeId) { - Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!"); - Set routes = routedRpcMap.get(routeId); - - if (routes == null) { - return Collections.emptySet(); - } + private Map, Long> table = new HashMap<>(); + private ActorRef router; - return ImmutableSet.copyOf(routes); - } + @Override + public RoutingTable copy() { + RoutingTable copy = new RoutingTable(); + copy.setTable(Collections.unmodifiableMap(table)); + copy.setRouter(this.getRouter()); - public R getLastAddedRoutedRpc(final I routeId) { + return copy; + } - Set routes = getRoutedRpc(routeId); + public Option> getRouterFor(RpcRouter.RouteIdentifier routeId){ + Long updatedTime = table.get(routeId); - if (routes.isEmpty()) { - return null; + if (updatedTime == null || router == null) + return Option.none(); + else + return Option.option(new Pair<>(router, updatedTime)); } - R route = null; - Iterator iter = routes.iterator(); - while (iter.hasNext()) { - route = iter.next(); + public void addRoute(RpcRouter.RouteIdentifier routeId){ + table.put(routeId, System.currentTimeMillis()); } - return route; - } - - public void addRoutedRpc(final I routeId, final R route) { - Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null"); - Preconditions.checkNotNull(route, "addRoute: route cannot be null"); - LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route); - threadSafeAdd(routeId, route); - } - - public void addRoutedRpcs(final Set routeIds, final R route) { - Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null"); - for (I routeId : routeIds){ - addRoutedRpc(routeId, route); + public void removeRoute(RpcRouter.RouteIdentifier routeId){ + table.remove(routeId); } - } - public void removeRoute(final I routeId, final R route) { - Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!"); - Preconditions.checkNotNull(route, "removeRoute: route cannot be null!"); - - LinkedHashSet routes = routedRpcMap.get(routeId); - if (routes == null) { - return; - } - LOG.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route); - threadSafeRemove(routeId, route); - } - - public void removeRoutes(final Set routeIds, final R route) { - Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null"); - for (I routeId : routeIds){ - removeRoute(routeId, route); + public Boolean contains(RpcRouter.RouteIdentifier routeId){ + return table.containsKey(routeId); } - } - - /** - * This method guarantees that no 2 thread over write each other's changes. - * Just so that we dont end up in infinite loop, it tries for 100 times then throw - */ - private void threadSafeAdd(final I routeId, final R route) { - for (int i=0;i<100;i++){ + /// + /// Getter, Setters + /// + //TODO: Remove public + public Map, Long> getTable() { + return table; + } - LinkedHashSet updatedRoutes = new LinkedHashSet<>(); - updatedRoutes.add(route); - LinkedHashSet oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes); - if (oldRoutes == null) { - return; - } + void setTable(Map, Long> table) { + this.table = table; + } - updatedRoutes = new LinkedHashSet<>(oldRoutes); - updatedRoutes.add(route); + public ActorRef getRouter() { + return router; + } - if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) { - return; - } + public void setRouter(ActorRef router) { + this.router = router; } - //the method did not already return means it failed to add route in 100 attempts - throw new IllegalStateException("Failed to add route [" + routeId + "]"); - } - - /** - * This method guarantees that no 2 thread over write each other's changes. - * Just so that we dont end up in infinite loop, it tries for 100 times then throw - */ - private void threadSafeRemove(final I routeId, final R route) { - LinkedHashSet updatedRoutes = null; - for (int i=0;i<100;i++){ - LinkedHashSet oldRoutes = routedRpcMap.get(routeId); - - // if route to be deleted is the only entry in the set then remove routeId from the cache - if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){ - routedRpcMap.remove(routeId); - return; - } - - // if there are multiple routes for this routeId, remove the route to be deleted only from the set. - updatedRoutes = new LinkedHashSet<>(oldRoutes); - updatedRoutes.remove(route); - if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) { - return; - } + @Override + public String toString() { + return "RoutingTable{" + + "table=" + table + + ", router=" + router + + '}'; } - //the method did not already return means it failed to remove route in 100 attempts - throw new IllegalStateException("Failed to remove route [" + routeId + "]"); - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java new file mode 100644 index 0000000000..5951776f2c --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.registry; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class RoutingTableOld { + + private final Logger LOG = LoggerFactory.getLogger(RoutingTableOld.class); + + private ConcurrentMap globalRpcMap = new ConcurrentHashMap<>(); + private ConcurrentMap> routedRpcMap = new ConcurrentHashMap<>(); + + public ConcurrentMap getGlobalRpcMap() { + return globalRpcMap; + } + + public ConcurrentMap> getRoutedRpcMap() { + return routedRpcMap; + } + + public R getGlobalRoute(final I routeId) { + Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!"); + return globalRpcMap.get(routeId); + } + + public void addGlobalRoute(final I routeId, final R route) { + Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!"); + Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!"); + LOG.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route); + if(globalRpcMap.putIfAbsent(routeId, route) != null) { + LOG.debug("A route already exist for route id [{}] ", routeId); + } + } + + public void removeGlobalRoute(final I routeId) { + Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!"); + LOG.debug("removeGlobalRoute: removing a new route with id [{}]", routeId); + globalRpcMap.remove(routeId); + } + + public Set getRoutedRpc(final I routeId) { + Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!"); + Set routes = routedRpcMap.get(routeId); + + if (routes == null) { + return Collections.emptySet(); + } + + return ImmutableSet.copyOf(routes); + } + + public R getLastAddedRoutedRpc(final I routeId) { + + Set routes = getRoutedRpc(routeId); + + if (routes.isEmpty()) { + return null; + } + + R route = null; + Iterator iter = routes.iterator(); + while (iter.hasNext()) { + route = iter.next(); + } + + return route; + } + + public void addRoutedRpc(final I routeId, final R route) { + Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null"); + Preconditions.checkNotNull(route, "addRoute: route cannot be null"); + LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route); + threadSafeAdd(routeId, route); + } + + public void addRoutedRpcs(final Set routeIds, final R route) { + Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null"); + for (I routeId : routeIds){ + addRoutedRpc(routeId, route); + } + } + + public void removeRoute(final I routeId, final R route) { + Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!"); + Preconditions.checkNotNull(route, "removeRoute: route cannot be null!"); + + LinkedHashSet routes = routedRpcMap.get(routeId); + if (routes == null) { + return; + } + LOG.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route); + threadSafeRemove(routeId, route); + } + + public void removeRoutes(final Set routeIds, final R route) { + Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null"); + for (I routeId : routeIds){ + removeRoute(routeId, route); + } + } + + /** + * This method guarantees that no 2 thread over write each other's changes. + * Just so that we dont end up in infinite loop, it tries for 100 times then throw + */ + private void threadSafeAdd(final I routeId, final R route) { + + for (int i=0;i<100;i++){ + + LinkedHashSet updatedRoutes = new LinkedHashSet<>(); + updatedRoutes.add(route); + LinkedHashSet oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes); + if (oldRoutes == null) { + return; + } + + updatedRoutes = new LinkedHashSet<>(oldRoutes); + updatedRoutes.add(route); + + if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) { + return; + } + } + //the method did not already return means it failed to add route in 100 attempts + throw new IllegalStateException("Failed to add route [" + routeId + "]"); + } + + /** + * This method guarantees that no 2 thread over write each other's changes. + * Just so that we dont end up in infinite loop, it tries for 100 times then throw + */ + private void threadSafeRemove(final I routeId, final R route) { + LinkedHashSet updatedRoutes = null; + for (int i=0;i<100;i++){ + LinkedHashSet oldRoutes = routedRpcMap.get(routeId); + + // if route to be deleted is the only entry in the set then remove routeId from the cache + if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){ + routedRpcMap.remove(routeId); + return; + } + + // if there are multiple routes for this routeId, remove the route to be deleted only from the set. + updatedRoutes = new LinkedHashSet<>(oldRoutes); + updatedRoutes.remove(route); + if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) { + return; + } + + } + //the method did not already return means it failed to remove route in 100 attempts + throw new IllegalStateException("Failed to remove route [" + routeId + "]"); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index e36060cc13..51609870cc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -7,197 +7,379 @@ */ package org.opendaylight.controller.remote.rpc.registry; -import akka.actor.ActorSelection; +import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import akka.cluster.ClusterEvent; -import akka.cluster.Member; -import akka.japi.Creator; -import org.opendaylight.controller.remote.rpc.AbstractUntypedActor; -import org.opendaylight.controller.remote.rpc.ActorConstants; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; -import org.opendaylight.controller.remote.rpc.messages.RoutingTableData; +import akka.actor.UntypedActor; +import akka.dispatch.Mapper; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Option; +import akka.japi.Pair; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.JavaConversions; +import scala.concurrent.Future; -import java.util.LinkedHashSet; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; + +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; /** - * This Actor maintains the routing table state and sync it with other nodes in the cluster. - * - * A scheduler runs after an interval of time, which pick a random member from the cluster - * and send the current state of routing table to the member. + * Registry to look up cluster nodes that have registered for a given rpc. + *

+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this + * cluster wide information. * - * when a message of routing table data is received, it gets merged with the local routing table - * to keep the latest data. */ +public class RpcRegistry extends UntypedActor { + + final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + /** + * Store to keep the registry. Bucket store sync's it across nodes in the cluster + */ + private ActorRef bucketStore; + + /** + * Rpc broker that would use the registry to route requests. + */ + private ActorRef localRouter; -public class RpcRegistry extends AbstractUntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class); - private RoutingTable, String> routingTable; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private final ClusterWrapper clusterWrapper; - private final ScheduledFuture syncScheduler; - - private RpcRegistry(ClusterWrapper clusterWrapper){ - this.routingTable = new RoutingTable<>(); - this.clusterWrapper = clusterWrapper; - this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS); - } - - public static Props props(final ClusterWrapper clusterWrapper){ - return Props.create(new Creator(){ - - @Override - public RpcRegistry create() throws Exception { - return new RpcRegistry(clusterWrapper); - } - }); - } - - @Override - protected void handleReceive(Object message) throws Exception { - LOG.debug("Received message {}", message); - if(message instanceof RoutingTableData) { - syncRoutingTable((RoutingTableData) message); - } else if(message instanceof GetRoutedRpc) { - getRoutedRpc((GetRoutedRpc) message); - } else if(message instanceof GetRpc) { - getRpc((GetRpc) message); - } else if(message instanceof AddRpc) { - addRpc((AddRpc) message); - } else if(message instanceof RemoveRpc) { - removeRpc((RemoveRpc) message); - } else if(message instanceof AddRoutedRpc) { - addRoutedRpc((AddRoutedRpc) message); - } else if(message instanceof RemoveRoutedRpc) { - removeRoutedRpc((RemoveRoutedRpc) message); + public RpcRegistry() { + bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store"); } - } - private void getRoutedRpc(GetRoutedRpc rpcMsg){ - LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId()); - GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath); + public RpcRegistry(ActorRef bucketStore) { + this.bucketStore = bucketStore; + } + + @Override + public void onReceive(Object message) throws Exception { - getSender().tell(routedRpcReply, self()); - } + log.debug("Received message: message [{}]", message); - private void getRpc(GetRpc rpcMsg) { - LOG.debug("Get global Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId()); - GetRpcReply rpcReply = new GetRpcReply(remoteActorPath); + //TODO: if sender is remote, reject message - getSender().tell(rpcReply, self()); - } + if (message instanceof SetLocalRouter) + receiveSetLocalRouter((SetLocalRouter) message); - private void addRpc(AddRpc rpcMsg) { - LOG.debug("Add Rpc to routing table {}", rpcMsg); - routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath()); + if (message instanceof AddOrUpdateRoute) + receiveAddRoute((AddOrUpdateRoute) message); - getSender().tell("Success", self()); - } + else if (message instanceof RemoveRoute) + receiveRemoveRoute((RemoveRoute) message); - private void removeRpc(RemoveRpc rpcMsg) { - LOG.debug("Removing Rpc to routing table {}", rpcMsg); - routingTable.removeGlobalRoute(rpcMsg.getRouteId()); + else if (message instanceof Messages.FindRouters) + receiveGetRouter((Messages.FindRouters) message); + + else + unhandled(message); + } - getSender().tell("Success", self()); - } + /** + * Register's rpc broker + * + * @param message contains {@link akka.actor.ActorRef} for rpc broker + */ + private void receiveSetLocalRouter(SetLocalRouter message) { + if (message == null || message.getRouter() == null) + return;//ignore - private void addRoutedRpc(AddRoutedRpc rpcMsg) { - routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + localRouter = message.getRouter(); + } - private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) { - routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + /** + * //TODO: update this to accept multiple route registration + * @param msg + */ + private void receiveAddRoute(AddOrUpdateRoute msg) { + if (msg.getRouteIdentifier() == null) + return;//ignore - private void syncRoutingTable(RoutingTableData routingTableData) { - LOG.debug("Syncing routing table {}", routingTableData); + Preconditions.checkState(localRouter != null, "Router must be set first"); - Map, String> newRpcMap = routingTableData.getRpcMap(); - Set> routeIds = newRpcMap.keySet(); - for(RpcRouter.RouteIdentifier routeId : routeIds) { - routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId)); + Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000); + futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher()); } - Map, LinkedHashSet> newRoutedRpcMap = - routingTableData.getRoutedRpcMap(); - routeIds = newRoutedRpcMap.keySet(); + /** + * //TODO: update this to accept multiple routes + * @param msg + */ + private void receiveRemoveRoute(RemoveRoute msg) { + if (msg.getRouteIdentifier() == null) + return;//ignore + + Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000); + futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher()); - for(RpcRouter.RouteIdentifier routeId : routeIds) { - Set routeAddresses = newRoutedRpcMap.get(routeId); - for(String routeAddress : routeAddresses) { - routingTable.addRoutedRpc(routeId, routeAddress); - } } - } - - private ActorSelection getRandomRegistryActor() { - ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState(); - ActorSelection actor = null; - Set members = JavaConversions.asJavaSet(clusterState.members()); - int memberSize = members.size(); - // Don't select yourself - if(memberSize > 1) { - Address currentNodeAddress = clusterWrapper.getAddress(); - int index = new Random().nextInt(memberSize); - int i = 0; - // keeping previous member, in case when random index member is same as current actor - // and current actor member is last in set - Member previousMember = null; - for(Member member : members){ - if(i == index-1) { - previousMember = member; + + /** + * Finds routers for the given rpc. + * @param msg + */ + private void receiveGetRouter(Messages.FindRouters msg) { + final ActorRef sender = getSender(); + + //if empty message, return empty list + if (msg.getRouteIdentifier() == null) { + sender.tell(createEmptyReply(), getSelf()); + return; } - if(i == index) { - if(!currentNodeAddress.equals(member.address())) { - actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH); - break; - } else if(index < memberSize-1){ // pick the next element in the set - index++; - } + + Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000); + futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher()); + + } + + /** + * Helper to create empty reply when no routers are found + * + * @return + */ + private Messages.FindRoutersReply createEmptyReply() { + List> routerWithUpdateTime = Collections.emptyList(); + return new Messages.FindRoutersReply(routerWithUpdateTime); + } + + /** + * Helper to create a reply when routers are found for the given rpc + * @param buckets + * @param routeId + * @return + */ + private Messages.FindRoutersReply createReplyWithRouters(Map buckets, RpcRouter.RouteIdentifier routeId) { + + List> routers = new ArrayList<>(); + + Option> routerWithUpdateTime = null; + + for (Bucket bucket : buckets.values()) { + + RoutingTable table = (RoutingTable) bucket.getData(); + + if (table == null) + continue; + + routerWithUpdateTime = table.getRouterFor(routeId); + + if (routerWithUpdateTime.isEmpty()) + continue; + + routers.add(routerWithUpdateTime.get()); } - i++; - } - if(actor == null && previousMember != null) { - actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH); - } + + return new Messages.FindRoutersReply(routers); } - return actor; - } - private class SendRoutingTable implements Runnable { - @Override - public void run() { - RoutingTableData routingTableData = - new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap()); - LOG.debug("Sending routing table for sync {}", routingTableData); - ActorSelection actor = getRandomRegistryActor(); - if(actor != null) { - actor.tell(routingTableData, self()); - } + /// + ///private factories to create Mapper + /// + + /** + * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found + * + * @param routeId the rpc + * @param sender client who asked to find the routers. + * @return + */ + private Mapper getMapperToGetRouter(final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { + return new Mapper() { + @Override + public Void apply(Object replyMessage) { + + if (replyMessage instanceof GetAllBucketsReply) { + + GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage; + Map buckets = reply.getBuckets(); + + if (buckets == null || buckets.isEmpty()) { + sender.tell(createEmptyReply(), getSelf()); + return null; + } + + sender.tell(createReplyWithRouters(buckets, routeId), getSelf()); + } + return null; + } + }; + } + + /** + * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently, + * it updates the local bucket in bucket store. + * + * @param routeId rpc to remote + * @return + */ + private Mapper getMapperToRemoveRoute(final RpcRouter.RouteIdentifier routeId) { + return new Mapper() { + @Override + public Void apply(Object replyMessage) { + if (replyMessage instanceof GetLocalBucketReply) { + + GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; + Bucket bucket = reply.getBucket(); + + if (bucket == null) { + log.debug("Local bucket is null"); + return null; + } + + RoutingTable table = bucket.getData(); + if (table == null) + table = new RoutingTable(); + + table.setRouter(localRouter); + table.removeRoute(routeId); + + bucket.setData(table); + + UpdateBucket updateBucketMessage = new UpdateBucket(bucket); + bucketStore.tell(updateBucketMessage, getSelf()); + } + return null; + } + }; + } + + /** + * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently, + * it updates the local bucket in bucket store. + * + * @param routeId rpc to add + * @return + */ + private Mapper getMapperToAddRoute(final RpcRouter.RouteIdentifier routeId) { + + return new Mapper() { + @Override + public Void apply(Object replyMessage) { + if (replyMessage instanceof GetLocalBucketReply) { + + GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; + Bucket bucket = reply.getBucket(); + + if (bucket == null) { + log.debug("Local bucket is null"); + return null; + } + + RoutingTable table = bucket.getData(); + if (table == null) + table = new RoutingTable(); + + table.setRouter(localRouter); + table.addRoute(routeId); + + bucket.setData(table); + + UpdateBucket updateBucketMessage = new UpdateBucket(bucket); + bucketStore.tell(updateBucketMessage, getSelf()); + } + + return null; + } + }; + } + + /** + * All messages used by the RpcRegistry + */ + public static class Messages { + + + public static class ContainsRoute { + final RpcRouter.RouteIdentifier routeIdentifier; + + public ContainsRoute(RpcRouter.RouteIdentifier routeIdentifier) { + Preconditions.checkArgument(routeIdentifier != null); + this.routeIdentifier = routeIdentifier; + } + + public RpcRouter.RouteIdentifier getRouteIdentifier(){ + return this.routeIdentifier; + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "{" + + "routeIdentifier=" + routeIdentifier + + '}'; + } + } + + public static class AddOrUpdateRoute extends ContainsRoute{ + + public AddOrUpdateRoute(RpcRouter.RouteIdentifier routeIdentifier) { + super(routeIdentifier); + } + } + + public static class RemoveRoute extends ContainsRoute { + + public RemoveRoute(RpcRouter.RouteIdentifier routeIdentifier) { + super(routeIdentifier); + } + } + + public static class SetLocalRouter{ + private final ActorRef router; + + public SetLocalRouter(ActorRef router) { + this.router = router; + } + + public ActorRef getRouter(){ + return this.router; + } + + @Override + public String toString() { + return "SetLocalRouter{" + + "router=" + router + + '}'; + } + } + + public static class FindRouters extends ContainsRoute { + public FindRouters(RpcRouter.RouteIdentifier routeIdentifier) { + super(routeIdentifier); + } + } + + public static class FindRoutersReply { + final List> routerWithUpdateTime; + + public FindRoutersReply(List> routerWithUpdateTime) { + this.routerWithUpdateTime = routerWithUpdateTime; + } + + public List> getRouterWithUpdateTime(){ + return routerWithUpdateTime; + } + + @Override + public String toString() { + return "FindRoutersReply{" + + "routerWithUpdateTime=" + routerWithUpdateTime + + '}'; + } + } } - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java new file mode 100644 index 0000000000..96c8802ce6 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry; + +import akka.actor.ActorSelection; +import akka.actor.Address; +import akka.actor.Props; +import akka.cluster.ClusterEvent; +import akka.cluster.Member; +import akka.japi.Creator; +import org.opendaylight.controller.remote.rpc.AbstractUntypedActor; +import org.opendaylight.controller.remote.rpc.ActorConstants; +import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.AddRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; +import org.opendaylight.controller.remote.rpc.messages.GetRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; +import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; +import org.opendaylight.controller.remote.rpc.messages.RoutingTableData; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConversions; + +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * This Actor maintains the routing table state and sync it with other nodes in the cluster. + * + * A scheduler runs after an interval of time, which pick a random member from the cluster + * and send the current state of routing table to the member. + * + * when a message of routing table data is received, it gets merged with the local routing table + * to keep the latest data. + */ + +public class RpcRegistryOld extends AbstractUntypedActor { + + private static final Logger LOG = LoggerFactory.getLogger(RpcRegistryOld.class); + private RoutingTableOld, String> routingTable; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final ClusterWrapper clusterWrapper; + private final ScheduledFuture syncScheduler; + + private RpcRegistryOld(ClusterWrapper clusterWrapper){ + this.routingTable = new RoutingTableOld<>(); + this.clusterWrapper = clusterWrapper; + this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS); + } + + public static Props props(final ClusterWrapper clusterWrapper){ + return Props.create(new Creator(){ + + @Override + public RpcRegistryOld create() throws Exception { + return new RpcRegistryOld(clusterWrapper); + } + }); + } + + @Override + protected void handleReceive(Object message) throws Exception { + LOG.debug("Received message {}", message); + if(message instanceof RoutingTableData) { + syncRoutingTable((RoutingTableData) message); + } else if(message instanceof GetRoutedRpc) { + getRoutedRpc((GetRoutedRpc) message); + } else if(message instanceof GetRpc) { + getRpc((GetRpc) message); + } else if(message instanceof AddRpc) { + addRpc((AddRpc) message); + } else if(message instanceof RemoveRpc) { + removeRpc((RemoveRpc) message); + } else if(message instanceof AddRoutedRpc) { + addRoutedRpc((AddRoutedRpc) message); + } else if(message instanceof RemoveRoutedRpc) { + removeRoutedRpc((RemoveRoutedRpc) message); + } + } + + private void getRoutedRpc(GetRoutedRpc rpcMsg){ + LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg); + String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId()); + GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath); + + getSender().tell(routedRpcReply, self()); + } + + private void getRpc(GetRpc rpcMsg) { + LOG.debug("Get global Rpc location from routing table {}", rpcMsg); + String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId()); + GetRpcReply rpcReply = new GetRpcReply(remoteActorPath); + + getSender().tell(rpcReply, self()); + } + + private void addRpc(AddRpc rpcMsg) { + LOG.debug("Add Rpc to routing table {}", rpcMsg); + routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath()); + + getSender().tell("Success", self()); + } + + private void removeRpc(RemoveRpc rpcMsg) { + LOG.debug("Removing Rpc to routing table {}", rpcMsg); + routingTable.removeGlobalRoute(rpcMsg.getRouteId()); + + getSender().tell("Success", self()); + } + + private void addRoutedRpc(AddRoutedRpc rpcMsg) { + routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); + getSender().tell("Success", self()); + } + + private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) { + routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); + getSender().tell("Success", self()); + } + + private void syncRoutingTable(RoutingTableData routingTableData) { + LOG.debug("Syncing routing table {}", routingTableData); + + Map, String> newRpcMap = routingTableData.getRpcMap(); + Set> routeIds = newRpcMap.keySet(); + for(RpcRouter.RouteIdentifier routeId : routeIds) { + routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId)); + } + + Map, LinkedHashSet> newRoutedRpcMap = + routingTableData.getRoutedRpcMap(); + routeIds = newRoutedRpcMap.keySet(); + + for(RpcRouter.RouteIdentifier routeId : routeIds) { + Set routeAddresses = newRoutedRpcMap.get(routeId); + for(String routeAddress : routeAddresses) { + routingTable.addRoutedRpc(routeId, routeAddress); + } + } + } + + private ActorSelection getRandomRegistryActor() { + ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState(); + ActorSelection actor = null; + Set members = JavaConversions.asJavaSet(clusterState.members()); + int memberSize = members.size(); + // Don't select yourself + if(memberSize > 1) { + Address currentNodeAddress = clusterWrapper.getAddress(); + int index = new Random().nextInt(memberSize); + int i = 0; + // keeping previous member, in case when random index member is same as current actor + // and current actor member is last in set + Member previousMember = null; + for(Member member : members){ + if(i == index-1) { + previousMember = member; + } + if(i == index) { + if(!currentNodeAddress.equals(member.address())) { + actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH); + break; + } else if(index < memberSize-1){ // pick the next element in the set + index++; + } + } + i++; + } + if(actor == null && previousMember != null) { + actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH); + } + } + return actor; + } + + private class SendRoutingTable implements Runnable { + + @Override + public void run() { + RoutingTableData routingTableData = + new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap()); + LOG.debug("Sending routing table for sync {}", routingTableData); + ActorSelection actor = getRandomRegistryActor(); + if(actor != null) { + actor.tell(routingTableData, self()); + } + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java new file mode 100644 index 0000000000..f5dfbc5650 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + + +public interface Bucket> { + public Long getVersion(); + public T getData(); + public void setData(T data); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java new file mode 100644 index 0000000000..3cdd924e85 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import java.io.Serializable; + +public class BucketImpl> implements Bucket, Serializable { + + private Long version = System.currentTimeMillis();; + + private T data; + + @Override + public Long getVersion() { + return version; + } + + @Override + public T getData() { + if (this.data == null) + return null; + + return data.copy(); + } + + public void setData(T data){ + this.version = System.currentTimeMillis()+1; + this.data = data; + } + + @Override + public String toString() { + return "BucketImpl{" + + "version=" + version + + ", data=" + data + + '}'; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java new file mode 100644 index 0000000000..2f634ce1fa --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import akka.actor.ActorRef; +import akka.actor.Address; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.cluster.Cluster; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; + +/** + * A store that syncs its data across nodes in the cluster. + * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned. + * A node can write ONLY to its bucket. This way, write conflicts are avoided. + *

+ * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)

+ * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. + * + */ +public class BucketStore extends UntypedActor { + + final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + /** + * Bucket owned by the node + */ + private BucketImpl localBucket = new BucketImpl();; + + /** + * Buckets ownded by other known nodes in the cluster + */ + private ConcurrentMap remoteBuckets = new ConcurrentHashMap<>(); + + /** + * Bucket version for every known node in the cluster including this node + */ + private ConcurrentMap versions = new ConcurrentHashMap<>(); + + /** + * Cluster address for this node + */ + private final Address selfAddress = Cluster.get(getContext().system()).selfAddress(); + + /** + * Our private gossiper + */ + private ActorRef gossiper; + + public BucketStore(){ + gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper"); + } + + /** + * This constructor is useful for testing. + * TODO: Pass Props instead of ActorRef + * + * @param gossiper + */ + public BucketStore(ActorRef gossiper){ + this.gossiper = gossiper; + } + + @Override + public void onReceive(Object message) throws Exception { + + log.debug("Received message: node[{}], message[{}]", selfAddress, message); + + if (message instanceof UpdateBucket) + receiveUpdateBucket(((UpdateBucket) message).getBucket()); + + else if (message instanceof GetAllBuckets) + receiveGetAllBucket(); + + else if (message instanceof GetLocalBucket) + receiveGetLocalBucket(); + + else if (message instanceof GetBucketsByMembers) + receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); + + else if (message instanceof GetBucketVersions) + receiveGetBucketVersions(); + + else if (message instanceof UpdateRemoteBuckets) + receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); + + else { + log.debug("Unhandled message [{}]", message); + unhandled(message); + } + + } + + /** + * Returns a copy of bucket owned by this node + */ + private void receiveGetLocalBucket() { + final ActorRef sender = getSender(); + GetLocalBucketReply reply = new GetLocalBucketReply(localBucket); + sender.tell(reply, getSelf()); + } + + /** + * Updates the bucket owned by this node + * + * @param updatedBucket + */ + void receiveUpdateBucket(Bucket updatedBucket){ + + localBucket = (BucketImpl) updatedBucket; + versions.put(selfAddress, localBucket.getVersion()); + } + + /** + * Returns all the buckets the this node knows about, self owned + remote + */ + void receiveGetAllBucket(){ + final ActorRef sender = getSender(); + sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf()); + } + + /** + * Helper to collect all known buckets + * + * @return self owned + remote buckets + */ + Map getAllBuckets(){ + Map all = new HashMap<>(remoteBuckets.size() + 1); + + //first add the local bucket + all.put(selfAddress, localBucket); + + //then get all remote buckets + all.putAll(remoteBuckets); + + return all; + } + + /** + * Returns buckets for requested members that this node knows about + * + * @param members requested members + */ + void receiveGetBucketsByMembers(Set

members){ + final ActorRef sender = getSender(); + Map buckets = getBucketsByMembers(members); + sender.tell(new GetBucketsByMembersReply(buckets), getSelf()); + } + + /** + * Helper to collect buckets for requested memebers + * + * @param members requested members + * @return buckets for requested memebers + */ + Map getBucketsByMembers(Set
members) { + Map buckets = new HashMap<>(); + + //first add the local bucket if asked + if (members.contains(selfAddress)) + buckets.put(selfAddress, localBucket); + + //then get buckets for requested remote nodes + for (Address address : members){ + if (remoteBuckets.containsKey(address)) + buckets.put(address, remoteBuckets.get(address)); + } + + return buckets; + } + + /** + * Returns versions for all buckets known + */ + void receiveGetBucketVersions(){ + final ActorRef sender = getSender(); + GetBucketVersionsReply reply = new GetBucketVersionsReply(versions); + sender.tell(reply, getSelf()); + } + + /** + * Update local copy of remote buckets where local copy's version is older + * + * @param receivedBuckets buckets sent by remote + * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} + */ + void receiveUpdateRemoteBuckets(Map receivedBuckets){ + + if (receivedBuckets == null || receivedBuckets.isEmpty()) + return; //nothing to do + + //Remote cant update self's bucket + receivedBuckets.remove(selfAddress); + + for (Map.Entry entry : receivedBuckets.entrySet()){ + + Long localVersion = versions.get(entry.getKey()); + if (localVersion == null) localVersion = -1L; + + Bucket receivedBucket = entry.getValue(); + + if (receivedBucket == null) + continue; + + Long remoteVersion = receivedBucket.getVersion(); + if (remoteVersion == null) remoteVersion = -1L; + + //update only if remote version is newer + if ( remoteVersion > localVersion ) { + remoteBuckets.put(entry.getKey(), receivedBucket); + versions.put(entry.getKey(), remoteVersion); + } + } + + log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + } + + /// + ///Getter Setters + /// + + BucketImpl getLocalBucket() { + return localBucket; + } + + void setLocalBucket(BucketImpl localBucket) { + this.localBucket = localBucket; + } + + ConcurrentMap getRemoteBuckets() { + return remoteBuckets; + } + + void setRemoteBuckets(ConcurrentMap remoteBuckets) { + this.remoteBuckets = remoteBuckets; + } + + ConcurrentMap getVersions() { + return versions; + } + + void setVersions(ConcurrentMap versions) { + this.versions = versions; + } + + Address getSelfAddress() { + return selfAddress; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Copier.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Copier.java new file mode 100644 index 0000000000..45279eb9a2 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Copier.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +/** + * Type of data that goes in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. + * The implementers should do deep cloning in copy() method. + */ +public interface Copier { + public T copy(); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java new file mode 100644 index 0000000000..0b64136c49 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -0,0 +1,437 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Address; +import akka.actor.Cancellable; +import akka.actor.UntypedActor; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.cluster.Member; +import akka.dispatch.Mapper; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.pattern.Patterns; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; + +/** + * Gossiper that syncs bucket store across nodes in the cluster. + *

+ * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions + * to a randomly selected remote gossiper. + *

+ * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions. + * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a + * gossip status is sent to remote gossiper so that it can send the newer buckets. + *

+ * When a bucket is received from a remote gossiper, its sent to the bucket store for update. + * + */ + +public class Gossiper extends UntypedActor { + + final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + Cluster cluster = Cluster.get(getContext().system()); + + /** + * ActorSystem's address for the current cluster node. + */ + private Address selfAddress = cluster.selfAddress(); + + /** + * All known cluster members + */ + private List

clusterMembers = new ArrayList<>(); + + private Cancellable gossipTask; + + private Boolean autoStartGossipTicks = true; + + public Gossiper(){} + + /** + * Helpful for testing + * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent. + */ + public Gossiper(Boolean autoStartGossipTicks){ + this.autoStartGossipTicks = autoStartGossipTicks; + } + + @Override + public void preStart(){ + + cluster.subscribe(getSelf(), + ClusterEvent.initialStateAsEvents(), + ClusterEvent.MemberEvent.class, + ClusterEvent.UnreachableMember.class); + + if (autoStartGossipTicks) { + gossipTask = getContext().system().scheduler().schedule( + new FiniteDuration(1, TimeUnit.SECONDS), //initial delay + new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval + getSelf(), //target + new Messages.GossiperMessages.GossipTick(), //message + getContext().dispatcher(), //execution context + getSelf() //sender + ); + } + } + + @Override + public void postStop(){ + if (cluster != null) + cluster.unsubscribe(getSelf()); + if (gossipTask != null) + gossipTask.cancel(); + } + + @Override + public void onReceive(Object message) throws Exception { + + log.debug("Received message: node[{}], message[{}]", selfAddress, message); + + //Usually sent by self via gossip task defined above. But its not enforced. + //These ticks can be sent by another actor as well which is esp. useful while testing + if (message instanceof GossipTick) + receiveGossipTick(); + + //Message from remote gossiper with its bucket versions + else if (message instanceof GossipStatus) + receiveGossipStatus((GossipStatus) message); + + //Message from remote gossiper with buckets. This is usually in response to GossipStatus message + //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus + //message with its local versions + else if (message instanceof GossipEnvelope) + receiveGossip((GossipEnvelope) message); + + else if (message instanceof ClusterEvent.MemberUp) { + receiveMemberUp(((ClusterEvent.MemberUp) message).member()); + + } else if (message instanceof ClusterEvent.MemberRemoved) { + receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member()); + + } else if ( message instanceof ClusterEvent.UnreachableMember){ + receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member()); + + } else + unhandled(message); + } + + /** + * Remove member from local copy of member list. If member down is self, then stop the actor + * + * @param member who went down + */ + void receiveMemberRemoveOrUnreachable(Member member) { + //if its self, then stop itself + if (selfAddress.equals(member.address())){ + getContext().stop(getSelf()); + return; + } + + clusterMembers.remove(member.address()); + log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + } + + /** + * Add member to the local copy of member list if it doesnt already + * @param member + */ + void receiveMemberUp(Member member) { + + if (selfAddress.equals(member.address())) + return; //ignore up notification for self + + if (!clusterMembers.contains(member.address())) + clusterMembers.add(member.address()); + + log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); + } + + /** + * Sends Gossip status to other members in the cluster.
+ * 1. If there are no member, ignore the tick.
+ * 2. If there's only 1 member, send gossip status (bucket versions) to it.
+ * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it. + */ + void receiveGossipTick(){ + if (clusterMembers.size() == 0) return; //no members to send gossip status to + + Address remoteMemberToGossipTo = null; + + if (clusterMembers.size() == 1) + remoteMemberToGossipTo = clusterMembers.get(0); + else { + Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); + remoteMemberToGossipTo = clusterMembers.get(randomIndex); + } + + log.debug("Gossiping to [{}]", remoteMemberToGossipTo); + getLocalStatusAndSendTo(remoteMemberToGossipTo); + } + + /** + * Process gossip status received from a remote gossiper. Remote versions are compared with + * the local copy.

+ * + * For each bucket + *

    + *
  • If local copy is newer, the newer buckets are sent in GossipEnvelope to remote
  • + *
  • If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope
  • + *
  • If both are same, noop
  • + *
+ * + * @param status bucket versions from a remote member + */ + void receiveGossipStatus(GossipStatus status){ + //Dont want to accept messages from non-members + if (!clusterMembers.contains(status.from())) + return; + + final ActorRef sender = getSender(); + + Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000); + + futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher()); + + } + + /** + * Sends the received buckets in the envelope to the parent Bucket store. + * + * @param envelope contains buckets from a remote gossiper + */ + void receiveGossip(GossipEnvelope envelope){ + //TODO: Add more validations + if (!selfAddress.equals(envelope.to())) { + log.info("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); + return; + } + if (envelope.getBuckets() == null) + return; //nothing to do + + updateRemoteBuckets(envelope.getBuckets()); + + } + + /** + * Helper to send received buckets to bucket store + * + * @param buckets + */ + void updateRemoteBuckets(Map buckets) { + + if (buckets == null || buckets.isEmpty()) + return; //nothing to merge + + UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets); + + getContext().parent().tell(updateRemoteBuckets, getSelf()); + } + + /** + * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper + * + * @param remote remote node to send Buckets to + * @param addresses node addresses whose buckets needs to be sent + */ + void sendGossipTo(final ActorRef remote, final Set
addresses){ + + Future futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000); + + futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher()); + + } + + /** + * Gets bucket versions from bucket store and sends to the supplied address + * + * @param remoteActorSystemAddress remote gossiper to send to + */ + void getLocalStatusAndSendTo(Address remoteActorSystemAddress){ + + //Get local status from bucket store and send to remote + Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000); + + ActorSelection remoteRef = getContext().system().actorSelection( + remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress()); + + log.debug("Sending bucket versions to [{}]", remoteRef); + + futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher()); + + } + + /** + * Helper to send bucket versions received from local store + * @param remote remote gossiper to send versions to + * @param localVersions bucket versions received from local store + */ + void sendGossipStatusTo(ActorRef remote, Map localVersions){ + + GossipStatus status = new GossipStatus(selfAddress, localVersions); + remote.tell(status, getSelf()); + } + + void sendGossipStatusTo(ActorSelection remote, Map localVersions){ + + GossipStatus status = new GossipStatus(selfAddress, localVersions); + remote.tell(status, getSelf()); + } + + /// + /// Private factories to create mappers + /// + + private Mapper getMapperToSendLocalStatus(final ActorSelection remote){ + + return new Mapper() { + @Override + public Void apply(Object replyMessage) { + if (replyMessage instanceof GetBucketVersionsReply) { + GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; + Map localVersions = reply.getVersions(); + + sendGossipStatusTo(remote, localVersions); + + } + return null; + } + }; + } + + /** + * Process bucket versions received from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}. + * Then this method compares remote bucket versions with local bucket versions. + *
    + *
  • The buckets that are newer locally, send + * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} to remote + *
  • The buckets that are older locally, send + * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} to remote so that + * remote sends GossipEnvelop. + *
+ * + * @param sender the remote member + * @param status bucket versions from a remote member + * @return a {@link akka.dispatch.Mapper} that gets evaluated in future + * + */ + private Mapper getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){ + + final Map remoteVersions = status.getVersions(); + + return new Mapper() { + @Override + public Void apply(Object replyMessage) { + if (replyMessage instanceof GetBucketVersionsReply) { + GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; + Map localVersions = reply.getVersions(); + + //diff between remote list and local + Set
localIsOlder = new HashSet<>(); + localIsOlder.addAll(remoteVersions.keySet()); + localIsOlder.removeAll(localVersions.keySet()); + + //diff between local list and remote + Set
localIsNewer = new HashSet<>(); + localIsNewer.addAll(localVersions.keySet()); + localIsNewer.removeAll(remoteVersions.keySet()); + + + for (Address address : remoteVersions.keySet()){ + + if (localVersions.get(address) == null || remoteVersions.get(address) == null) + continue; //this condition is taken care of by above diffs + if (localVersions.get(address) < remoteVersions.get(address)) + localIsOlder.add(address); + else if (localVersions.get(address) > remoteVersions.get(address)) + localIsNewer.add(address); + else + continue; + } + + if (!localIsOlder.isEmpty()) + sendGossipStatusTo(sender, localVersions ); + + if (!localIsNewer.isEmpty()) + sendGossipTo(sender, localIsNewer);//send newer buckets to remote + + } + return null; + } + }; + } + + /** + * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} that contains + * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. These buckets are sent to a remote member encapsulated + * in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} + * + * @param sender the remote member that sent + * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} + * in reply to which bucket is being sent back + * @return a {@link akka.dispatch.Mapper} that gets evaluated in future + * + */ + private Mapper getMapperToSendGossip(final ActorRef sender) { + + return new Mapper() { + @Override + public Void apply(Object msg) { + if (msg instanceof GetBucketsByMembersReply) { + Map buckets = ((GetBucketsByMembersReply) msg).getBuckets(); + log.info("Buckets to send from {}: {}", selfAddress, buckets); + GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets); + sender.tell(envelope, getSelf()); + } + return null; + } + }; + } + + /// + ///Getter Setters + /// + List
getClusterMembers() { + return clusterMembers; + } + + void setClusterMembers(List
clusterMembers) { + this.clusterMembers = clusterMembers; + } + + Address getSelfAddress() { + return selfAddress; + } + + void setSelfAddress(Address selfAddress) { + this.selfAddress = selfAddress; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java new file mode 100644 index 0000000000..9a247d97c7 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import akka.actor.Address; +import com.google.common.base.Preconditions; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * These messages are used by {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} and + * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} actors. + */ +public class Messages { + + public static class BucketStoreMessages{ + + public static class GetLocalBucket implements Serializable{} + + public static class ContainsBucket implements Serializable { + final private Bucket bucket; + + public ContainsBucket(Bucket bucket){ + Preconditions.checkArgument(bucket != null, "bucket can not be null"); + this.bucket = bucket; + } + + public Bucket getBucket(){ + return bucket; + } + + } + + public static class UpdateBucket extends ContainsBucket implements Serializable { + public UpdateBucket(Bucket bucket){ + super(bucket); + } + } + + public static class GetLocalBucketReply extends ContainsBucket implements Serializable { + public GetLocalBucketReply(Bucket bucket){ + super(bucket); + } + } + + public static class GetAllBuckets implements Serializable{} + + public static class GetBucketsByMembers implements Serializable{ + private Set
members; + + public GetBucketsByMembers(Set
members){ + Preconditions.checkArgument(members != null, "members can not be null"); + this.members = members; + } + + public Set
getMembers() { + return new HashSet<>(members); + } + } + + public static class ContainsBuckets implements Serializable{ + private Map buckets; + + public ContainsBuckets(Map buckets){ + Preconditions.checkArgument(buckets != null, "buckets can not be null"); + this.buckets = buckets; + } + + public Map getBuckets() { + Map copy = new HashMap<>(buckets.size()); + + for (Map.Entry entry : buckets.entrySet()){ + //ignore null entries + if ( (entry.getKey() == null) || (entry.getValue() == null) ) + continue; + copy.put(entry.getKey(), entry.getValue()); + } + return new HashMap<>(copy); + } + } + + public static class GetAllBucketsReply extends ContainsBuckets implements Serializable{ + public GetAllBucketsReply(Map buckets) { + super(buckets); + } + } + + public static class GetBucketsByMembersReply extends ContainsBuckets implements Serializable{ + public GetBucketsByMembersReply(Map buckets) { + super(buckets); + } + } + + public static class GetBucketVersions implements Serializable{} + + public static class ContainsBucketVersions implements Serializable{ + Map versions; + + public ContainsBucketVersions(Map versions) { + Preconditions.checkArgument(versions != null, "versions can not be null"); + this.versions = versions; + } + + public Map getVersions() { + return Collections.unmodifiableMap(versions); + } + + } + + public static class GetBucketVersionsReply extends ContainsBucketVersions implements Serializable{ + public GetBucketVersionsReply(Map versions) { + super(versions); + } + } + + public static class UpdateRemoteBuckets extends ContainsBuckets implements Serializable{ + public UpdateRemoteBuckets(Map buckets) { + super(buckets); + } + } + } + + public static class GossiperMessages{ + public static class Tick implements Serializable {} + + public static final class GossipTick extends Tick {} + + public static final class GossipStatus extends BucketStoreMessages.ContainsBucketVersions implements Serializable{ + private Address from; + + public GossipStatus(Address from, Map versions) { + super(versions); + this.from = from; + } + + public Address from() { + return from; + } + } + + public static final class GossipEnvelope extends BucketStoreMessages.ContainsBuckets implements Serializable { + private final Address from; + private final Address to; + + public GossipEnvelope(Address from, Address to, Map buckets) { + super(buckets); + this.to = to; + this.from = from; + } + + public Address from() { + return from; + } + + public Address to() { + return to; + } + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java index 392c1e637d..55aa1d6c87 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java @@ -25,7 +25,7 @@ import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistryOld; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.core.api.Broker; @@ -69,7 +69,7 @@ public class RpcBrokerTest { @Test public void testInvokeRpcError() throws URISyntaxException { new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); + ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class))); Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class); SchemaContext schemaContext = mock(SchemaContext.class); ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); @@ -100,7 +100,7 @@ public class RpcBrokerTest { @Test public void testInvokeRpc() throws URISyntaxException { new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class))); + ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(mock(ClusterWrapper.class))); Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class); SchemaContext schemaContext = mock(SchemaContext.class); ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); @@ -141,7 +141,7 @@ public class RpcBrokerTest { @Test public void testInvokeRoutedRpcError() throws URISyntaxException { new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); + ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class))); Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class); SchemaContext schemaContext = mock(SchemaContext.class); ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); @@ -172,7 +172,7 @@ public class RpcBrokerTest { @Test public void testInvokeRoutedRpc() throws URISyntaxException { new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class))); + ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(mock(ClusterWrapper.class))); Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class); SchemaContext schemaContext = mock(SchemaContext.class); ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOldTest.java similarity index 97% rename from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java rename to opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOldTest.java index 129a5a56e8..524a91288d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOldTest.java @@ -19,10 +19,10 @@ import java.net.URISyntaxException; import java.util.HashSet; import java.util.Set; -public class RoutingTableTest { +public class RoutingTableOldTest { - private RoutingTable, String> routingTable = - new RoutingTable<>(); + private RoutingTableOld, String> routingTable = + new RoutingTableOld<>(); @Test public void addGlobalRouteNullRouteIdTest() { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOldTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOldTest.java new file mode 100644 index 0000000000..0f711b4e85 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOldTest.java @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.registry; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import junit.framework.Assert; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; +import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.AddRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; +import org.opendaylight.controller.remote.rpc.messages.GetRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; +import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.yangtools.yang.common.QName; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.Set; + +public class RpcRegistryOldTest { + + static ActorSystem system; + + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + /** + This test add, read and remove an entry in global rpc + */ + @Test + public void testGlobalRpc() throws URISyntaxException { + new JavaTestKit(system) {{ + ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class))); + QName type = new QName(new URI("actor1"), "actor1"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); + final String route = "actor1"; + + AddRpc rpcMsg = new AddRpc(routeId, route); + rpcRegistry.tell(rpcMsg, getRef()); + expectMsgEquals(duration("2 second"), "Success"); + + GetRpc getRpc = new GetRpc(routeId); + rpcRegistry.tell(getRpc, getRef()); + + Boolean getMsg = new ExpectMsg("GetRpcReply") { + protected Boolean match(Object in) { + if (in instanceof GetRpcReply) { + GetRpcReply reply = (GetRpcReply)in; + return route.equals(reply.getRoutePath()); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(getMsg); + + RemoveRpc removeMsg = new RemoveRpc(routeId); + rpcRegistry.tell(removeMsg, getRef()); + expectMsgEquals(duration("2 second"), "Success"); + + rpcRegistry.tell(getRpc, getRef()); + + Boolean getNullMsg = new ExpectMsg("GetRpcReply") { + protected Boolean match(Object in) { + if (in instanceof GetRpcReply) { + GetRpcReply reply = (GetRpcReply)in; + return reply.getRoutePath() == null; + } else { + throw noMatch(); + } + } + }.get(); + Assert.assertTrue(getNullMsg); + }}; + + } + + /** + This test add, read and remove an entry in routed rpc + */ + @Test + public void testRoutedRpc() throws URISyntaxException { + new JavaTestKit(system) {{ + ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class))); + QName type = new QName(new URI("actor1"), "actor1"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); + final String route = "actor1"; + + Set> routeIds = new HashSet<>(); + routeIds.add(routeId); + + AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route); + rpcRegistry.tell(rpcMsg, getRef()); + expectMsgEquals(duration("2 second"), "Success"); + + GetRoutedRpc getRpc = new GetRoutedRpc(routeId); + rpcRegistry.tell(getRpc, getRef()); + + Boolean getMsg = new ExpectMsg("GetRoutedRpcReply") { + protected Boolean match(Object in) { + if (in instanceof GetRoutedRpcReply) { + GetRoutedRpcReply reply = (GetRoutedRpcReply)in; + return route.equals(reply.getRoutePath()); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(getMsg); + + RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route); + rpcRegistry.tell(removeMsg, getRef()); + expectMsgEquals(duration("2 second"), "Success"); + + rpcRegistry.tell(getRpc, getRef()); + + Boolean getNullMsg = new ExpectMsg("GetRoutedRpcReply") { + protected Boolean match(Object in) { + if (in instanceof GetRoutedRpcReply) { + GetRoutedRpcReply reply = (GetRoutedRpcReply)in; + return reply.getRoutePath() == null; + } else { + throw noMatch(); + } + } + }.get(); + Assert.assertTrue(getNullMsg); + }}; + + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index d011d331a6..ab609413dd 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -1,159 +1,248 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - package org.opendaylight.controller.remote.rpc.registry; +import akka.actor.ActorPath; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.actor.ChildActorPath; +import akka.actor.Props; +import akka.japi.Pair; import akka.testkit.JavaTestKit; -import junit.framework.Assert; +import com.typesafe.config.ConfigFactory; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.yangtools.yang.common.QName; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; import java.net.URI; import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.Set; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; public class RpcRegistryTest { - static ActorSystem system; - - - @BeforeClass - public static void setup() { - system = ActorSystem.create(); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } - - /** - This test add, read and remove an entry in global rpc - */ - @Test - public void testGlobalRpc() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - final String route = "actor1"; - - AddRpc rpcMsg = new AddRpc(routeId, route); - rpcRegistry.tell(rpcMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - - GetRpc getRpc = new GetRpc(routeId); - rpcRegistry.tell(getRpc, getRef()); - - Boolean getMsg = new ExpectMsg("GetRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRpcReply) { - GetRpcReply reply = (GetRpcReply)in; - return route.equals(reply.getRoutePath()); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + private static ActorSystem node1; + private static ActorSystem node2; + private static ActorSystem node3; + + private ActorRef registry1; + private ActorRef registry2; + private ActorRef registry3; + + @BeforeClass + public static void setup() throws InterruptedException { + Thread.sleep(1000); //give some time for previous test to close netty ports + node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA")); + node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB")); + node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC")); + } + + @AfterClass + public static void teardown(){ + JavaTestKit.shutdownActorSystem(node1); + JavaTestKit.shutdownActorSystem(node2); + JavaTestKit.shutdownActorSystem(node3); + if (node1 != null) + node1.shutdown(); + if (node2 != null) + node2.shutdown(); + if (node3 != null) + node3.shutdown(); + + } + + @Before + public void createRpcRegistry() throws InterruptedException { + registry1 = node1.actorOf(Props.create(RpcRegistry.class)); + registry2 = node2.actorOf(Props.create(RpcRegistry.class)); + registry3 = node3.actorOf(Props.create(RpcRegistry.class)); + } + + @After + public void stopRpcRegistry() throws InterruptedException { + if (registry1 != null) + node1.stop(registry1); + if (registry2 != null) + node2.stop(registry2); + if (registry3 != null) + node3.stop(registry3); + } + + /** + * One node cluster. + * Register rpc. Ensure router can be found + * + * @throws URISyntaxException + * @throws InterruptedException + */ + @Test + public void testWhenRpcAddedOneNodeShouldAppearOnSameNode() throws URISyntaxException, InterruptedException { + + final JavaTestKit mockBroker = new JavaTestKit(node1); + + //Add rpc on node 1 + registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef()); + registry1.tell(getAddRouteMessage(), mockBroker.getRef()); + + Thread.sleep(1000);// + + //find the route on node 1's registry + registry1.tell(new FindRouters(createRouteId()), mockBroker.getRef()); + FindRoutersReply message = mockBroker.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class); + List> pairs = message.getRouterWithUpdateTime(); + + validateRouterReceived(pairs, mockBroker.getRef()); + } + + /** + * Three node cluster. + * Register rpc on 1 node. Ensure its router can be found on other 2. + * + * @throws URISyntaxException + * @throws InterruptedException + */ + @Test + public void testWhenRpcAddedOneNodeShouldAppearOnAnother() throws URISyntaxException, InterruptedException { + + validateSystemStartup(); + + final JavaTestKit mockBroker1 = new JavaTestKit(node1); + final JavaTestKit mockBroker2 = new JavaTestKit(node2); + final JavaTestKit mockBroker3 = new JavaTestKit(node3); + + //Add rpc on node 1 + registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); + registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); + + Thread.sleep(5000);// give some time for bucket store data sync + + //find the route in node 2's registry + registry2.tell(new FindRouters(createRouteId()), mockBroker2.getRef()); + FindRoutersReply message = mockBroker2.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class); + List> pairs = message.getRouterWithUpdateTime(); + + validateRouterReceived(pairs, mockBroker1.getRef()); + + //find the route in node 3's registry + registry3.tell(new FindRouters(createRouteId()), mockBroker3.getRef()); + message = mockBroker3.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class); + pairs = message.getRouterWithUpdateTime(); + + validateRouterReceived(pairs, mockBroker1.getRef()); + + } + + /** + * Three node cluster. + * Register rpc on 2 nodes. Ensure 2 routers are found on 3rd. + * + * @throws Exception + */ + @Test + public void testAnRpcAddedOnMultiNodesShouldReturnMultiRouter() throws Exception { + + validateSystemStartup(); + + final JavaTestKit mockBroker1 = new JavaTestKit(node1); + final JavaTestKit mockBroker2 = new JavaTestKit(node2); + final JavaTestKit mockBroker3 = new JavaTestKit(node3); + + //Thread.sleep(5000);//let system come up + + //Add rpc on node 1 + registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); + registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); + + //Add same rpc on node 2 + registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef()); + registry2.tell(getAddRouteMessage(), mockBroker2.getRef()); + + registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef()); + Thread.sleep(5000);// give some time for bucket store data sync + + //find the route in node 3's registry + registry3.tell(new FindRouters(createRouteId()), mockBroker3.getRef()); + FindRoutersReply message = mockBroker3.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class); + List> pairs = message.getRouterWithUpdateTime(); - Assert.assertTrue(getMsg); + validateMultiRouterReceived(pairs, mockBroker1.getRef(), mockBroker2.getRef()); - RemoveRpc removeMsg = new RemoveRpc(routeId); - rpcRegistry.tell(removeMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); + } - rpcRegistry.tell(getRpc, getRef()); + private void validateMultiRouterReceived(List> actual, ActorRef... expected) { + Assert.assertTrue(actual != null); + Assert.assertTrue(actual.size() == expected.length); + } - Boolean getNullMsg = new ExpectMsg("GetRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRpcReply) { - GetRpcReply reply = (GetRpcReply)in; - return reply.getRoutePath() == null; - } else { - throw noMatch(); - } - } - }.get(); - Assert.assertTrue(getNullMsg); - }}; - - } - - /** - This test add, read and remove an entry in routed rpc - */ - @Test - public void testRoutedRpc() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - final String route = "actor1"; - - Set> routeIds = new HashSet<>(); - routeIds.add(routeId); - - AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route); - rpcRegistry.tell(rpcMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - - GetRoutedRpc getRpc = new GetRoutedRpc(routeId); - rpcRegistry.tell(getRpc, getRef()); - - Boolean getMsg = new ExpectMsg("GetRoutedRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRoutedRpcReply) { - GetRoutedRpcReply reply = (GetRoutedRpcReply)in; - return route.equals(reply.getRoutePath()); - } else { - throw noMatch(); - } + private void validateRouterReceived(List> actual, ActorRef expected){ + Assert.assertTrue(actual != null); + Assert.assertTrue(actual.size() == 1); + + for (Pair pair : actual){ + Assert.assertTrue(expected.path().uid() == pair.first().path().uid()); } - }.get(); // this extracts the received message + } - Assert.assertTrue(getMsg); + private void validateSystemStartup() throws InterruptedException { - RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route); - rpcRegistry.tell(removeMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); + Thread.sleep(5000); + ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper"); + ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper"); + ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper"); - rpcRegistry.tell(getRpc, getRef()); + ActorSelection gossiper1 = node1.actorSelection(gossiper1Path); + ActorSelection gossiper2 = node2.actorSelection(gossiper2Path); + ActorSelection gossiper3 = node3.actorSelection(gossiper3Path); - Boolean getNullMsg = new ExpectMsg("GetRoutedRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRoutedRpcReply) { - GetRoutedRpcReply reply = (GetRoutedRpcReply)in; - return reply.getRoutePath() == null; - } else { - throw noMatch(); - } - } - }.get(); - Assert.assertTrue(getNullMsg); - }}; - } + if (!resolveReference(gossiper1, gossiper2, gossiper3)) + Assert.fail("Could not find gossipers"); + } -} + private Boolean resolveReference(ActorSelection... gossipers) throws InterruptedException { + + Boolean resolved = true; + + for (int i=0; i< 5; i++) { + Thread.sleep(1000); + for (ActorSelection gossiper : gossipers) { + Future future = gossiper.resolveOne(new FiniteDuration(5000, TimeUnit.MILLISECONDS)); + + ActorRef ref = null; + try { + ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS)); + } catch (Exception e) { + e.printStackTrace(); + } + + if (ref == null) + resolved = false; + } + + if (resolved) break; + } + return resolved; + } + + private AddOrUpdateRoute getAddRouteMessage() throws URISyntaxException { + return new AddOrUpdateRoute(createRouteId()); + } + + private RpcRouter.RouteIdentifier createRouteId() throws URISyntaxException { + QName type = new QName(new URI("/mockrpc"), "mockrpc"); + return new RouteIdentifierImpl(null, type, null); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java new file mode 100644 index 0000000000..7e87da0f99 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.testkit.TestActorRef; +import akka.testkit.TestProbe; +import com.typesafe.config.ConfigFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.remote.rpc.TerminationMonitor; + +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; + +public class BucketStoreTest { + + private static ActorSystem system; + private static BucketStore store; + + private BucketStore mockStore; + + @BeforeClass + public static void setup() { + + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); + system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); + + store = createStore(); + } + + @AfterClass + public static void teardown() { + system.shutdown(); + } + + @Before + public void createMocks(){ + mockStore = spy(store); + } + + @After + public void resetMocks(){ + reset(mockStore); + } + + @Test + public void testReceiveUpdateBucket_WhenInputBucketShouldUpdateVersion(){ + Bucket bucket = new BucketImpl(); + Long expectedVersion = bucket.getVersion(); + + mockStore.receiveUpdateBucket(bucket); + + Assert.assertEquals(bucket, mockStore.getLocalBucket()); + Assert.assertEquals(expectedVersion, mockStore.getLocalBucket().getVersion()); + } + + /** + * Create BucketStore actor and returns the underlying instance of BucketStore class. + * + * @return instance of BucketStore class + */ + private static BucketStore createStore(){ + TestProbe mockActor = new TestProbe(system); + ActorRef mockGossiper = mockActor.ref(); + final Props props = Props.create(BucketStore.class, mockGossiper); + final TestActorRef testRef = TestActorRef.create(system, props, "testStore"); + + return testRef.underlyingActor(); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java new file mode 100644 index 0000000000..d862dcb8cd --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.Props; +import akka.testkit.TestActorRef; +import com.typesafe.config.ConfigFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.remote.rpc.TerminationMonitor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; + + +public class GossiperTest { + + private static ActorSystem system; + private static Gossiper gossiper; + + private Gossiper mockGossiper; + + @BeforeClass + public static void setup() throws InterruptedException { + Thread.sleep(1000);//give some time for previous test to stop the system. Netty port conflict arises otherwise. + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); + system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); + + gossiper = createGossiper(); + } + + @AfterClass + public static void teardown() { + if (system != null) + system.shutdown(); + } + + @Before + public void createMocks(){ + mockGossiper = spy(gossiper); + } + + @After + public void resetMocks(){ + reset(mockGossiper); + + } + + @Test + public void testReceiveGossipTick_WhenNoRemoteMemberShouldIgnore(){ + + mockGossiper.setClusterMembers(Collections.EMPTY_LIST); + doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class)); + mockGossiper.receiveGossipTick(); + verify(mockGossiper, times(0)).getLocalStatusAndSendTo(any(Address.class)); + } + + @Test + public void testReceiveGossipTick_WhenRemoteMemberExistsShouldSendStatus(){ + List
members = new ArrayList<>(); + Address remote = new Address("tcp", "member"); + members.add(remote); + + mockGossiper.setClusterMembers(members); + doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class)); + mockGossiper.receiveGossipTick(); + verify(mockGossiper, times(1)).getLocalStatusAndSendTo(any(Address.class)); + } + + @Test + public void testReceiveGossipStatus_WhenSenderIsNonMemberShouldIgnore(){ + + Address nonMember = new Address("tcp", "non-member"); + GossipStatus remoteStatus = new GossipStatus(nonMember, mock(Map.class)); + + //add a member + List
members = new ArrayList<>(); + members.add(new Address("tcp", "member")); + + mockGossiper.setClusterMembers(members); + mockGossiper.receiveGossipStatus(remoteStatus); + verify(mockGossiper, times(0)).getSender(); + } + + @Test + public void testReceiveGossip_WhenNotAddressedToSelfShouldIgnore(){ + Address notSelf = new Address("tcp", "not-self"); + + GossipEnvelope envelope = new GossipEnvelope(notSelf, notSelf, mock(Map.class)); + doNothing().when(mockGossiper).updateRemoteBuckets(anyMap()); + mockGossiper.receiveGossip(envelope); + verify(mockGossiper, times(0)).updateRemoteBuckets(anyMap()); + } + + @Test + public void testUpdateRemoteBuckets_WhenNoBucketShouldIgnore(){ + + mockGossiper.updateRemoteBuckets(null); + verify(mockGossiper, times(0)).getContext(); + + Map empty = Collections.emptyMap(); + mockGossiper.updateRemoteBuckets(empty); + verify(mockGossiper, times(0)).getContext(); + } + + /** + * Create Gossiper actor and return the underlying instance of Gossiper class. + * + * @return instance of Gossiper class + */ + private static Gossiper createGossiper(){ + + final Props props = Props.create(Gossiper.class, false); + final TestActorRef testRef = TestActorRef.create(system, props, "testGossiper"); + + return testRef.underlyingActor(); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf new file mode 100644 index 0000000000..fbfb0e159c --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf @@ -0,0 +1,116 @@ +odl-cluster{ + akka { + loglevel = "INFO" + #log-config-on-start = on + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + debug{ + #autoreceive = on + #lifecycle = on + + } + } + remote { + log-received-messages = on + log-sent-messages = on + + log-remote-lifecycle-events = off + netty.tcp { + hostname = "localhost" + port = 2551 + } + } + + cluster { + seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"] + + auto-down-unreachable-after = 10s + } + } +} +unit-test{ + akka { + loglevel = "INFO" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + } + } +} + +memberA{ + akka { + loglevel = "INFO" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + } + remote { + log-received-messages = off + log-sent-messages = off + + log-remote-lifecycle-events = off + netty.tcp { + hostname = "localhost" + port = 2551 + } + } + + cluster { + seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"] + + auto-down-unreachable-after = 10s + } + } +} +memberB{ + akka { + loglevel = "INFO" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + } + remote { + log-received-messages = off + log-sent-messages = off + + log-remote-lifecycle-events = off + netty.tcp { + hostname = "localhost" + port = 2552 + } + } + + cluster { + seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"] + + auto-down-unreachable-after = 10s + } + } +} +memberC{ + akka { + loglevel = "INFO" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + } + remote { + log-received-messages = off + log-sent-messages = off + + log-remote-lifecycle-events = off + netty.tcp { + hostname = "localhost" + port = 2553 + } + } + + cluster { + seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"] + + auto-down-unreachable-after = 10s + } + } +} \ No newline at end of file