<scope>test</scope>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
import akka.japi.Function;
import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistryOld;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.yangtools.yang.common.QName;
private void createRpcActors() {
LOG.debug("Create rpc registry and broker actors");
- rpcRegistry = getContext().actorOf(RpcRegistry.props(clusterWrapper), ActorConstants.RPC_REGISTRY);
+ rpcRegistry = getContext().actorOf(RpcRegistryOld.props(clusterWrapper), ActorConstants.RPC_REGISTRY);
rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER);
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.remote.rpc.registry;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import akka.actor.ActorRef;
+import akka.japi.Option;
+import akka.japi.Pair;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import java.io.Serializable;
import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class RoutingTable<I, R> {
-
- private final Logger LOG = LoggerFactory.getLogger(RoutingTable.class);
-
- private ConcurrentMap<I,R> globalRpcMap = new ConcurrentHashMap<>();
- private ConcurrentMap<I, LinkedHashSet<R>> routedRpcMap = new ConcurrentHashMap<>();
-
- public ConcurrentMap<I, R> getGlobalRpcMap() {
- return globalRpcMap;
- }
-
- public ConcurrentMap<I, LinkedHashSet<R>> getRoutedRpcMap() {
- return routedRpcMap;
- }
-
- public R getGlobalRoute(final I routeId) {
- Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
- return globalRpcMap.get(routeId);
- }
-
- public void addGlobalRoute(final I routeId, final R route) {
- Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
- Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
- LOG.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route);
- if(globalRpcMap.putIfAbsent(routeId, route) != null) {
- LOG.debug("A route already exist for route id [{}] ", routeId);
- }
- }
+import java.util.HashMap;
+import java.util.Map;
- public void removeGlobalRoute(final I routeId) {
- Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
- LOG.debug("removeGlobalRoute: removing a new route with id [{}]", routeId);
- globalRpcMap.remove(routeId);
- }
+public class RoutingTable implements Copier<RoutingTable>, Serializable {
- public Set<R> getRoutedRpc(final I routeId) {
- Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
- Set<R> routes = routedRpcMap.get(routeId);
-
- if (routes == null) {
- return Collections.emptySet();
- }
+ private Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
+ private ActorRef router;
- return ImmutableSet.copyOf(routes);
- }
+ @Override
+ public RoutingTable copy() {
+ RoutingTable copy = new RoutingTable();
+ copy.setTable(Collections.unmodifiableMap(table));
+ copy.setRouter(this.getRouter());
- public R getLastAddedRoutedRpc(final I routeId) {
+ return copy;
+ }
- Set<R> routes = getRoutedRpc(routeId);
+ public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+ Long updatedTime = table.get(routeId);
- if (routes.isEmpty()) {
- return null;
+ if (updatedTime == null || router == null)
+ return Option.none();
+ else
+ return Option.option(new Pair<>(router, updatedTime));
}
- R route = null;
- Iterator<R> iter = routes.iterator();
- while (iter.hasNext()) {
- route = iter.next();
+ public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
+ table.put(routeId, System.currentTimeMillis());
}
- return route;
- }
-
- public void addRoutedRpc(final I routeId, final R route) {
- Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
- Preconditions.checkNotNull(route, "addRoute: route cannot be null");
- LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
- threadSafeAdd(routeId, route);
- }
-
- public void addRoutedRpcs(final Set<I> routeIds, final R route) {
- Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
- for (I routeId : routeIds){
- addRoutedRpc(routeId, route);
+ public void removeRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+ table.remove(routeId);
}
- }
- public void removeRoute(final I routeId, final R route) {
- Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
- Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
-
- LinkedHashSet<R> routes = routedRpcMap.get(routeId);
- if (routes == null) {
- return;
- }
- LOG.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route);
- threadSafeRemove(routeId, route);
- }
-
- public void removeRoutes(final Set<I> routeIds, final R route) {
- Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
- for (I routeId : routeIds){
- removeRoute(routeId, route);
+ public Boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+ return table.containsKey(routeId);
}
- }
-
- /**
- * This method guarantees that no 2 thread over write each other's changes.
- * Just so that we dont end up in infinite loop, it tries for 100 times then throw
- */
- private void threadSafeAdd(final I routeId, final R route) {
- for (int i=0;i<100;i++){
+ ///
+ /// Getter, Setters
+ ///
+ //TODO: Remove public
+ public Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> getTable() {
+ return table;
+ }
- LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
- updatedRoutes.add(route);
- LinkedHashSet<R> oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes);
- if (oldRoutes == null) {
- return;
- }
+ void setTable(Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table) {
+ this.table = table;
+ }
- updatedRoutes = new LinkedHashSet<>(oldRoutes);
- updatedRoutes.add(route);
+ public ActorRef getRouter() {
+ return router;
+ }
- if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
- return;
- }
+ public void setRouter(ActorRef router) {
+ this.router = router;
}
- //the method did not already return means it failed to add route in 100 attempts
- throw new IllegalStateException("Failed to add route [" + routeId + "]");
- }
-
- /**
- * This method guarantees that no 2 thread over write each other's changes.
- * Just so that we dont end up in infinite loop, it tries for 100 times then throw
- */
- private void threadSafeRemove(final I routeId, final R route) {
- LinkedHashSet<R> updatedRoutes = null;
- for (int i=0;i<100;i++){
- LinkedHashSet<R> oldRoutes = routedRpcMap.get(routeId);
-
- // if route to be deleted is the only entry in the set then remove routeId from the cache
- if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
- routedRpcMap.remove(routeId);
- return;
- }
-
- // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
- updatedRoutes = new LinkedHashSet<>(oldRoutes);
- updatedRoutes.remove(route);
- if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
- return;
- }
+ @Override
+ public String toString() {
+ return "RoutingTable{" +
+ "table=" + table +
+ ", router=" + router +
+ '}';
}
- //the method did not already return means it failed to remove route in 100 attempts
- throw new IllegalStateException("Failed to remove route [" + routeId + "]");
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class RoutingTableOld<I, R> {
+
+ private final Logger LOG = LoggerFactory.getLogger(RoutingTableOld.class);
+
+ private ConcurrentMap<I,R> globalRpcMap = new ConcurrentHashMap<>();
+ private ConcurrentMap<I, LinkedHashSet<R>> routedRpcMap = new ConcurrentHashMap<>();
+
+ public ConcurrentMap<I, R> getGlobalRpcMap() {
+ return globalRpcMap;
+ }
+
+ public ConcurrentMap<I, LinkedHashSet<R>> getRoutedRpcMap() {
+ return routedRpcMap;
+ }
+
+ public R getGlobalRoute(final I routeId) {
+ Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
+ return globalRpcMap.get(routeId);
+ }
+
+ public void addGlobalRoute(final I routeId, final R route) {
+ Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
+ Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
+ LOG.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route);
+ if(globalRpcMap.putIfAbsent(routeId, route) != null) {
+ LOG.debug("A route already exist for route id [{}] ", routeId);
+ }
+ }
+
+ public void removeGlobalRoute(final I routeId) {
+ Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
+ LOG.debug("removeGlobalRoute: removing a new route with id [{}]", routeId);
+ globalRpcMap.remove(routeId);
+ }
+
+ public Set<R> getRoutedRpc(final I routeId) {
+ Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
+ Set<R> routes = routedRpcMap.get(routeId);
+
+ if (routes == null) {
+ return Collections.emptySet();
+ }
+
+ return ImmutableSet.copyOf(routes);
+ }
+
+ public R getLastAddedRoutedRpc(final I routeId) {
+
+ Set<R> routes = getRoutedRpc(routeId);
+
+ if (routes.isEmpty()) {
+ return null;
+ }
+
+ R route = null;
+ Iterator<R> iter = routes.iterator();
+ while (iter.hasNext()) {
+ route = iter.next();
+ }
+
+ return route;
+ }
+
+ public void addRoutedRpc(final I routeId, final R route) {
+ Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
+ Preconditions.checkNotNull(route, "addRoute: route cannot be null");
+ LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
+ threadSafeAdd(routeId, route);
+ }
+
+ public void addRoutedRpcs(final Set<I> routeIds, final R route) {
+ Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
+ for (I routeId : routeIds){
+ addRoutedRpc(routeId, route);
+ }
+ }
+
+ public void removeRoute(final I routeId, final R route) {
+ Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
+ Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
+
+ LinkedHashSet<R> routes = routedRpcMap.get(routeId);
+ if (routes == null) {
+ return;
+ }
+ LOG.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route);
+ threadSafeRemove(routeId, route);
+ }
+
+ public void removeRoutes(final Set<I> routeIds, final R route) {
+ Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
+ for (I routeId : routeIds){
+ removeRoute(routeId, route);
+ }
+ }
+
+ /**
+ * This method guarantees that no 2 thread over write each other's changes.
+ * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+ */
+ private void threadSafeAdd(final I routeId, final R route) {
+
+ for (int i=0;i<100;i++){
+
+ LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
+ updatedRoutes.add(route);
+ LinkedHashSet<R> oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes);
+ if (oldRoutes == null) {
+ return;
+ }
+
+ updatedRoutes = new LinkedHashSet<>(oldRoutes);
+ updatedRoutes.add(route);
+
+ if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
+ return;
+ }
+ }
+ //the method did not already return means it failed to add route in 100 attempts
+ throw new IllegalStateException("Failed to add route [" + routeId + "]");
+ }
+
+ /**
+ * This method guarantees that no 2 thread over write each other's changes.
+ * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+ */
+ private void threadSafeRemove(final I routeId, final R route) {
+ LinkedHashSet<R> updatedRoutes = null;
+ for (int i=0;i<100;i++){
+ LinkedHashSet<R> oldRoutes = routedRpcMap.get(routeId);
+
+ // if route to be deleted is the only entry in the set then remove routeId from the cache
+ if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
+ routedRpcMap.remove(routeId);
+ return;
+ }
+
+ // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
+ updatedRoutes = new LinkedHashSet<>(oldRoutes);
+ updatedRoutes.remove(route);
+ if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
+ return;
+ }
+
+ }
+ //the method did not already return means it failed to remove route in 100 attempts
+ throw new IllegalStateException("Failed to remove route [" + routeId + "]");
+ }
+}
*/
package org.opendaylight.controller.remote.rpc.registry;
-import akka.actor.ActorSelection;
+import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
-import akka.cluster.ClusterEvent;
-import akka.cluster.Member;
-import akka.japi.Creator;
-import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.ActorConstants;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
-import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
+import akka.actor.UntypedActor;
+import akka.dispatch.Mapper;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Option;
+import akka.japi.Pair;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
+import scala.concurrent.Future;
-import java.util.LinkedHashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
/**
- * This Actor maintains the routing table state and sync it with other nodes in the cluster.
- *
- * A scheduler runs after an interval of time, which pick a random member from the cluster
- * and send the current state of routing table to the member.
+ * Registry to look up cluster nodes that have registered for a given rpc.
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * cluster wide information.
*
- * when a message of routing table data is received, it gets merged with the local routing table
- * to keep the latest data.
*/
+public class RpcRegistry extends UntypedActor {
+
+ final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+ /**
+ * Store to keep the registry. Bucket store sync's it across nodes in the cluster
+ */
+ private ActorRef bucketStore;
+
+ /**
+ * Rpc broker that would use the registry to route requests.
+ */
+ private ActorRef localRouter;
-public class RpcRegistry extends AbstractUntypedActor {
-
- private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
- private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- private final ClusterWrapper clusterWrapper;
- private final ScheduledFuture<?> syncScheduler;
-
- private RpcRegistry(ClusterWrapper clusterWrapper){
- this.routingTable = new RoutingTable<>();
- this.clusterWrapper = clusterWrapper;
- this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
- }
-
- public static Props props(final ClusterWrapper clusterWrapper){
- return Props.create(new Creator<RpcRegistry>(){
-
- @Override
- public RpcRegistry create() throws Exception {
- return new RpcRegistry(clusterWrapper);
- }
- });
- }
-
- @Override
- protected void handleReceive(Object message) throws Exception {
- LOG.debug("Received message {}", message);
- if(message instanceof RoutingTableData) {
- syncRoutingTable((RoutingTableData) message);
- } else if(message instanceof GetRoutedRpc) {
- getRoutedRpc((GetRoutedRpc) message);
- } else if(message instanceof GetRpc) {
- getRpc((GetRpc) message);
- } else if(message instanceof AddRpc) {
- addRpc((AddRpc) message);
- } else if(message instanceof RemoveRpc) {
- removeRpc((RemoveRpc) message);
- } else if(message instanceof AddRoutedRpc) {
- addRoutedRpc((AddRoutedRpc) message);
- } else if(message instanceof RemoveRoutedRpc) {
- removeRoutedRpc((RemoveRoutedRpc) message);
+ public RpcRegistry() {
+ bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
}
- }
- private void getRoutedRpc(GetRoutedRpc rpcMsg){
- LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
- String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
- GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+ public RpcRegistry(ActorRef bucketStore) {
+ this.bucketStore = bucketStore;
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
- getSender().tell(routedRpcReply, self());
- }
+ log.debug("Received message: message [{}]", message);
- private void getRpc(GetRpc rpcMsg) {
- LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
- String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
- GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+ //TODO: if sender is remote, reject message
- getSender().tell(rpcReply, self());
- }
+ if (message instanceof SetLocalRouter)
+ receiveSetLocalRouter((SetLocalRouter) message);
- private void addRpc(AddRpc rpcMsg) {
- LOG.debug("Add Rpc to routing table {}", rpcMsg);
- routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+ if (message instanceof AddOrUpdateRoute)
+ receiveAddRoute((AddOrUpdateRoute) message);
- getSender().tell("Success", self());
- }
+ else if (message instanceof RemoveRoute)
+ receiveRemoveRoute((RemoveRoute) message);
- private void removeRpc(RemoveRpc rpcMsg) {
- LOG.debug("Removing Rpc to routing table {}", rpcMsg);
- routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+ else if (message instanceof Messages.FindRouters)
+ receiveGetRouter((Messages.FindRouters) message);
+
+ else
+ unhandled(message);
+ }
- getSender().tell("Success", self());
- }
+ /**
+ * Register's rpc broker
+ *
+ * @param message contains {@link akka.actor.ActorRef} for rpc broker
+ */
+ private void receiveSetLocalRouter(SetLocalRouter message) {
+ if (message == null || message.getRouter() == null)
+ return;//ignore
- private void addRoutedRpc(AddRoutedRpc rpcMsg) {
- routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
- getSender().tell("Success", self());
- }
+ localRouter = message.getRouter();
+ }
- private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
- routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
- getSender().tell("Success", self());
- }
+ /**
+ * //TODO: update this to accept multiple route registration
+ * @param msg
+ */
+ private void receiveAddRoute(AddOrUpdateRoute msg) {
+ if (msg.getRouteIdentifier() == null)
+ return;//ignore
- private void syncRoutingTable(RoutingTableData routingTableData) {
- LOG.debug("Syncing routing table {}", routingTableData);
+ Preconditions.checkState(localRouter != null, "Router must be set first");
- Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
- Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
- for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
+ futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher());
}
- Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
- routingTableData.getRoutedRpcMap();
- routeIds = newRoutedRpcMap.keySet();
+ /**
+ * //TODO: update this to accept multiple routes
+ * @param msg
+ */
+ private void receiveRemoveRoute(RemoveRoute msg) {
+ if (msg.getRouteIdentifier() == null)
+ return;//ignore
+
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
+ futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher());
- for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
- for(String routeAddress : routeAddresses) {
- routingTable.addRoutedRpc(routeId, routeAddress);
- }
}
- }
-
- private ActorSelection getRandomRegistryActor() {
- ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
- ActorSelection actor = null;
- Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
- int memberSize = members.size();
- // Don't select yourself
- if(memberSize > 1) {
- Address currentNodeAddress = clusterWrapper.getAddress();
- int index = new Random().nextInt(memberSize);
- int i = 0;
- // keeping previous member, in case when random index member is same as current actor
- // and current actor member is last in set
- Member previousMember = null;
- for(Member member : members){
- if(i == index-1) {
- previousMember = member;
+
+ /**
+ * Finds routers for the given rpc.
+ * @param msg
+ */
+ private void receiveGetRouter(Messages.FindRouters msg) {
+ final ActorRef sender = getSender();
+
+ //if empty message, return empty list
+ if (msg.getRouteIdentifier() == null) {
+ sender.tell(createEmptyReply(), getSelf());
+ return;
}
- if(i == index) {
- if(!currentNodeAddress.equals(member.address())) {
- actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
- break;
- } else if(index < memberSize-1){ // pick the next element in the set
- index++;
- }
+
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
+ futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
+
+ }
+
+ /**
+ * Helper to create empty reply when no routers are found
+ *
+ * @return
+ */
+ private Messages.FindRoutersReply createEmptyReply() {
+ List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
+ return new Messages.FindRoutersReply(routerWithUpdateTime);
+ }
+
+ /**
+ * Helper to create a reply when routers are found for the given rpc
+ * @param buckets
+ * @param routeId
+ * @return
+ */
+ private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+
+ List<Pair<ActorRef, Long>> routers = new ArrayList<>();
+
+ Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
+
+ for (Bucket bucket : buckets.values()) {
+
+ RoutingTable table = (RoutingTable) bucket.getData();
+
+ if (table == null)
+ continue;
+
+ routerWithUpdateTime = table.getRouterFor(routeId);
+
+ if (routerWithUpdateTime.isEmpty())
+ continue;
+
+ routers.add(routerWithUpdateTime.get());
}
- i++;
- }
- if(actor == null && previousMember != null) {
- actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
- }
+
+ return new Messages.FindRoutersReply(routers);
}
- return actor;
- }
- private class SendRoutingTable implements Runnable {
- @Override
- public void run() {
- RoutingTableData routingTableData =
- new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
- LOG.debug("Sending routing table for sync {}", routingTableData);
- ActorSelection actor = getRandomRegistryActor();
- if(actor != null) {
- actor.tell(routingTableData, self());
- }
+ ///
+ ///private factories to create Mapper
+ ///
+
+ /**
+ * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
+ *
+ * @param routeId the rpc
+ * @param sender client who asked to find the routers.
+ * @return
+ */
+ private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+
+ if (replyMessage instanceof GetAllBucketsReply) {
+
+ GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
+ Map<Address, Bucket> buckets = reply.getBuckets();
+
+ if (buckets == null || buckets.isEmpty()) {
+ sender.tell(createEmptyReply(), getSelf());
+ return null;
+ }
+
+ sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
+ * it updates the local bucket in bucket store.
+ *
+ * @param routeId rpc to remote
+ * @return
+ */
+ private Mapper<Object, Void> getMapperToRemoveRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+ if (replyMessage instanceof GetLocalBucketReply) {
+
+ GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
+ Bucket<RoutingTable> bucket = reply.getBucket();
+
+ if (bucket == null) {
+ log.debug("Local bucket is null");
+ return null;
+ }
+
+ RoutingTable table = bucket.getData();
+ if (table == null)
+ table = new RoutingTable();
+
+ table.setRouter(localRouter);
+ table.removeRoute(routeId);
+
+ bucket.setData(table);
+
+ UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
+ bucketStore.tell(updateBucketMessage, getSelf());
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
+ * it updates the local bucket in bucket store.
+ *
+ * @param routeId rpc to add
+ * @return
+ */
+ private Mapper<Object, Void> getMapperToAddRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+ if (replyMessage instanceof GetLocalBucketReply) {
+
+ GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
+ Bucket<RoutingTable> bucket = reply.getBucket();
+
+ if (bucket == null) {
+ log.debug("Local bucket is null");
+ return null;
+ }
+
+ RoutingTable table = bucket.getData();
+ if (table == null)
+ table = new RoutingTable();
+
+ table.setRouter(localRouter);
+ table.addRoute(routeId);
+
+ bucket.setData(table);
+
+ UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
+ bucketStore.tell(updateBucketMessage, getSelf());
+ }
+
+ return null;
+ }
+ };
+ }
+
+ /**
+ * All messages used by the RpcRegistry
+ */
+ public static class Messages {
+
+
+ public static class ContainsRoute {
+ final RpcRouter.RouteIdentifier<?,?,?> routeIdentifier;
+
+ public ContainsRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+ Preconditions.checkArgument(routeIdentifier != null);
+ this.routeIdentifier = routeIdentifier;
+ }
+
+ public RpcRouter.RouteIdentifier<?,?,?> getRouteIdentifier(){
+ return this.routeIdentifier;
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + "{" +
+ "routeIdentifier=" + routeIdentifier +
+ '}';
+ }
+ }
+
+ public static class AddOrUpdateRoute extends ContainsRoute{
+
+ public AddOrUpdateRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+ super(routeIdentifier);
+ }
+ }
+
+ public static class RemoveRoute extends ContainsRoute {
+
+ public RemoveRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+ super(routeIdentifier);
+ }
+ }
+
+ public static class SetLocalRouter{
+ private final ActorRef router;
+
+ public SetLocalRouter(ActorRef router) {
+ this.router = router;
+ }
+
+ public ActorRef getRouter(){
+ return this.router;
+ }
+
+ @Override
+ public String toString() {
+ return "SetLocalRouter{" +
+ "router=" + router +
+ '}';
+ }
+ }
+
+ public static class FindRouters extends ContainsRoute {
+ public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+ super(routeIdentifier);
+ }
+ }
+
+ public static class FindRoutersReply {
+ final List<Pair<ActorRef, Long>> routerWithUpdateTime;
+
+ public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
+ this.routerWithUpdateTime = routerWithUpdateTime;
+ }
+
+ public List<Pair<ActorRef, Long>> getRouterWithUpdateTime(){
+ return routerWithUpdateTime;
+ }
+
+ @Override
+ public String toString() {
+ return "FindRoutersReply{" +
+ "routerWithUpdateTime=" + routerWithUpdateTime +
+ '}';
+ }
+ }
}
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
+import akka.japi.Creator;
+import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.ActorConstants;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This Actor maintains the routing table state and sync it with other nodes in the cluster.
+ *
+ * A scheduler runs after an interval of time, which pick a random member from the cluster
+ * and send the current state of routing table to the member.
+ *
+ * when a message of routing table data is received, it gets merged with the local routing table
+ * to keep the latest data.
+ */
+
+public class RpcRegistryOld extends AbstractUntypedActor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RpcRegistryOld.class);
+ private RoutingTableOld<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ private final ClusterWrapper clusterWrapper;
+ private final ScheduledFuture<?> syncScheduler;
+
+ private RpcRegistryOld(ClusterWrapper clusterWrapper){
+ this.routingTable = new RoutingTableOld<>();
+ this.clusterWrapper = clusterWrapper;
+ this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
+ }
+
+ public static Props props(final ClusterWrapper clusterWrapper){
+ return Props.create(new Creator<RpcRegistryOld>(){
+
+ @Override
+ public RpcRegistryOld create() throws Exception {
+ return new RpcRegistryOld(clusterWrapper);
+ }
+ });
+ }
+
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ LOG.debug("Received message {}", message);
+ if(message instanceof RoutingTableData) {
+ syncRoutingTable((RoutingTableData) message);
+ } else if(message instanceof GetRoutedRpc) {
+ getRoutedRpc((GetRoutedRpc) message);
+ } else if(message instanceof GetRpc) {
+ getRpc((GetRpc) message);
+ } else if(message instanceof AddRpc) {
+ addRpc((AddRpc) message);
+ } else if(message instanceof RemoveRpc) {
+ removeRpc((RemoveRpc) message);
+ } else if(message instanceof AddRoutedRpc) {
+ addRoutedRpc((AddRoutedRpc) message);
+ } else if(message instanceof RemoveRoutedRpc) {
+ removeRoutedRpc((RemoveRoutedRpc) message);
+ }
+ }
+
+ private void getRoutedRpc(GetRoutedRpc rpcMsg){
+ LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
+ String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
+ GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+
+ getSender().tell(routedRpcReply, self());
+ }
+
+ private void getRpc(GetRpc rpcMsg) {
+ LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
+ String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
+ GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+
+ getSender().tell(rpcReply, self());
+ }
+
+ private void addRpc(AddRpc rpcMsg) {
+ LOG.debug("Add Rpc to routing table {}", rpcMsg);
+ routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+
+ getSender().tell("Success", self());
+ }
+
+ private void removeRpc(RemoveRpc rpcMsg) {
+ LOG.debug("Removing Rpc to routing table {}", rpcMsg);
+ routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+
+ getSender().tell("Success", self());
+ }
+
+ private void addRoutedRpc(AddRoutedRpc rpcMsg) {
+ routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
+ getSender().tell("Success", self());
+ }
+
+ private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
+ routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
+ getSender().tell("Success", self());
+ }
+
+ private void syncRoutingTable(RoutingTableData routingTableData) {
+ LOG.debug("Syncing routing table {}", routingTableData);
+
+ Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
+ for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+ }
+
+ Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
+ routingTableData.getRoutedRpcMap();
+ routeIds = newRoutedRpcMap.keySet();
+
+ for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
+ for(String routeAddress : routeAddresses) {
+ routingTable.addRoutedRpc(routeId, routeAddress);
+ }
+ }
+ }
+
+ private ActorSelection getRandomRegistryActor() {
+ ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
+ ActorSelection actor = null;
+ Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
+ int memberSize = members.size();
+ // Don't select yourself
+ if(memberSize > 1) {
+ Address currentNodeAddress = clusterWrapper.getAddress();
+ int index = new Random().nextInt(memberSize);
+ int i = 0;
+ // keeping previous member, in case when random index member is same as current actor
+ // and current actor member is last in set
+ Member previousMember = null;
+ for(Member member : members){
+ if(i == index-1) {
+ previousMember = member;
+ }
+ if(i == index) {
+ if(!currentNodeAddress.equals(member.address())) {
+ actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
+ break;
+ } else if(index < memberSize-1){ // pick the next element in the set
+ index++;
+ }
+ }
+ i++;
+ }
+ if(actor == null && previousMember != null) {
+ actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
+ }
+ }
+ return actor;
+ }
+
+ private class SendRoutingTable implements Runnable {
+
+ @Override
+ public void run() {
+ RoutingTableData routingTableData =
+ new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
+ LOG.debug("Sending routing table for sync {}", routingTableData);
+ ActorSelection actor = getRandomRegistryActor();
+ if(actor != null) {
+ actor.tell(routingTableData, self());
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+
+public interface Bucket<T extends Copier<T>> {
+ public Long getVersion();
+ public T getData();
+ public void setData(T data);
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import java.io.Serializable;
+
+public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
+
+ private Long version = System.currentTimeMillis();;
+
+ private T data;
+
+ @Override
+ public Long getVersion() {
+ return version;
+ }
+
+ @Override
+ public T getData() {
+ if (this.data == null)
+ return null;
+
+ return data.copy();
+ }
+
+ public void setData(T data){
+ this.version = System.currentTimeMillis()+1;
+ this.data = data;
+ }
+
+ @Override
+ public String toString() {
+ return "BucketImpl{" +
+ "version=" + version +
+ ", data=" + data +
+ '}';
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.ActorRef;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.cluster.Cluster;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+
+/**
+ * A store that syncs its data across nodes in the cluster.
+ * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
+ * A node can write ONLY to its bucket. This way, write conflicts are avoided.
+ * <p>
+ * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
+ * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
+ *
+ */
+public class BucketStore extends UntypedActor {
+
+ final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+ /**
+ * Bucket owned by the node
+ */
+ private BucketImpl localBucket = new BucketImpl();;
+
+ /**
+ * Buckets ownded by other known nodes in the cluster
+ */
+ private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+
+ /**
+ * Bucket version for every known node in the cluster including this node
+ */
+ private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+
+ /**
+ * Cluster address for this node
+ */
+ private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
+
+ /**
+ * Our private gossiper
+ */
+ private ActorRef gossiper;
+
+ public BucketStore(){
+ gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
+ }
+
+ /**
+ * This constructor is useful for testing.
+ * TODO: Pass Props instead of ActorRef
+ *
+ * @param gossiper
+ */
+ public BucketStore(ActorRef gossiper){
+ this.gossiper = gossiper;
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+
+ log.debug("Received message: node[{}], message[{}]", selfAddress, message);
+
+ if (message instanceof UpdateBucket)
+ receiveUpdateBucket(((UpdateBucket) message).getBucket());
+
+ else if (message instanceof GetAllBuckets)
+ receiveGetAllBucket();
+
+ else if (message instanceof GetLocalBucket)
+ receiveGetLocalBucket();
+
+ else if (message instanceof GetBucketsByMembers)
+ receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
+
+ else if (message instanceof GetBucketVersions)
+ receiveGetBucketVersions();
+
+ else if (message instanceof UpdateRemoteBuckets)
+ receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
+
+ else {
+ log.debug("Unhandled message [{}]", message);
+ unhandled(message);
+ }
+
+ }
+
+ /**
+ * Returns a copy of bucket owned by this node
+ */
+ private void receiveGetLocalBucket() {
+ final ActorRef sender = getSender();
+ GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
+ sender.tell(reply, getSelf());
+ }
+
+ /**
+ * Updates the bucket owned by this node
+ *
+ * @param updatedBucket
+ */
+ void receiveUpdateBucket(Bucket updatedBucket){
+
+ localBucket = (BucketImpl) updatedBucket;
+ versions.put(selfAddress, localBucket.getVersion());
+ }
+
+ /**
+ * Returns all the buckets the this node knows about, self owned + remote
+ */
+ void receiveGetAllBucket(){
+ final ActorRef sender = getSender();
+ sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
+ }
+
+ /**
+ * Helper to collect all known buckets
+ *
+ * @return self owned + remote buckets
+ */
+ Map<Address, Bucket> getAllBuckets(){
+ Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
+
+ //first add the local bucket
+ all.put(selfAddress, localBucket);
+
+ //then get all remote buckets
+ all.putAll(remoteBuckets);
+
+ return all;
+ }
+
+ /**
+ * Returns buckets for requested members that this node knows about
+ *
+ * @param members requested members
+ */
+ void receiveGetBucketsByMembers(Set<Address> members){
+ final ActorRef sender = getSender();
+ Map<Address, Bucket> buckets = getBucketsByMembers(members);
+ sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
+ }
+
+ /**
+ * Helper to collect buckets for requested memebers
+ *
+ * @param members requested members
+ * @return buckets for requested memebers
+ */
+ Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
+ Map<Address, Bucket> buckets = new HashMap<>();
+
+ //first add the local bucket if asked
+ if (members.contains(selfAddress))
+ buckets.put(selfAddress, localBucket);
+
+ //then get buckets for requested remote nodes
+ for (Address address : members){
+ if (remoteBuckets.containsKey(address))
+ buckets.put(address, remoteBuckets.get(address));
+ }
+
+ return buckets;
+ }
+
+ /**
+ * Returns versions for all buckets known
+ */
+ void receiveGetBucketVersions(){
+ final ActorRef sender = getSender();
+ GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
+ sender.tell(reply, getSelf());
+ }
+
+ /**
+ * Update local copy of remote buckets where local copy's version is older
+ *
+ * @param receivedBuckets buckets sent by remote
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
+ */
+ void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
+
+ if (receivedBuckets == null || receivedBuckets.isEmpty())
+ return; //nothing to do
+
+ //Remote cant update self's bucket
+ receivedBuckets.remove(selfAddress);
+
+ for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
+
+ Long localVersion = versions.get(entry.getKey());
+ if (localVersion == null) localVersion = -1L;
+
+ Bucket receivedBucket = entry.getValue();
+
+ if (receivedBucket == null)
+ continue;
+
+ Long remoteVersion = receivedBucket.getVersion();
+ if (remoteVersion == null) remoteVersion = -1L;
+
+ //update only if remote version is newer
+ if ( remoteVersion > localVersion ) {
+ remoteBuckets.put(entry.getKey(), receivedBucket);
+ versions.put(entry.getKey(), remoteVersion);
+ }
+ }
+
+ log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ }
+
+ ///
+ ///Getter Setters
+ ///
+
+ BucketImpl getLocalBucket() {
+ return localBucket;
+ }
+
+ void setLocalBucket(BucketImpl localBucket) {
+ this.localBucket = localBucket;
+ }
+
+ ConcurrentMap<Address, Bucket> getRemoteBuckets() {
+ return remoteBuckets;
+ }
+
+ void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
+ this.remoteBuckets = remoteBuckets;
+ }
+
+ ConcurrentMap<Address, Long> getVersions() {
+ return versions;
+ }
+
+ void setVersions(ConcurrentMap<Address, Long> versions) {
+ this.versions = versions;
+ }
+
+ Address getSelfAddress() {
+ return selfAddress;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+/**
+ * Type of data that goes in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
+ * The implementers should do deep cloning in copy() method.
+ */
+public interface Copier<T> {
+ public T copy();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.actor.Cancellable;
+import akka.actor.UntypedActor;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
+import akka.dispatch.Mapper;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.pattern.Patterns;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+
+/**
+ * Gossiper that syncs bucket store across nodes in the cluster.
+ * <p>
+ * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions
+ * to a randomly selected remote gossiper.
+ * <p>
+ * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions.
+ * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a
+ * gossip status is sent to remote gossiper so that it can send the newer buckets.
+ * <p>
+ * When a bucket is received from a remote gossiper, its sent to the bucket store for update.
+ *
+ */
+
+public class Gossiper extends UntypedActor {
+
+ final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+ Cluster cluster = Cluster.get(getContext().system());
+
+ /**
+ * ActorSystem's address for the current cluster node.
+ */
+ private Address selfAddress = cluster.selfAddress();
+
+ /**
+ * All known cluster members
+ */
+ private List<Address> clusterMembers = new ArrayList<>();
+
+ private Cancellable gossipTask;
+
+ private Boolean autoStartGossipTicks = true;
+
+ public Gossiper(){}
+
+ /**
+ * Helpful for testing
+ * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent.
+ */
+ public Gossiper(Boolean autoStartGossipTicks){
+ this.autoStartGossipTicks = autoStartGossipTicks;
+ }
+
+ @Override
+ public void preStart(){
+
+ cluster.subscribe(getSelf(),
+ ClusterEvent.initialStateAsEvents(),
+ ClusterEvent.MemberEvent.class,
+ ClusterEvent.UnreachableMember.class);
+
+ if (autoStartGossipTicks) {
+ gossipTask = getContext().system().scheduler().schedule(
+ new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
+ new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
+ getSelf(), //target
+ new Messages.GossiperMessages.GossipTick(), //message
+ getContext().dispatcher(), //execution context
+ getSelf() //sender
+ );
+ }
+ }
+
+ @Override
+ public void postStop(){
+ if (cluster != null)
+ cluster.unsubscribe(getSelf());
+ if (gossipTask != null)
+ gossipTask.cancel();
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+
+ log.debug("Received message: node[{}], message[{}]", selfAddress, message);
+
+ //Usually sent by self via gossip task defined above. But its not enforced.
+ //These ticks can be sent by another actor as well which is esp. useful while testing
+ if (message instanceof GossipTick)
+ receiveGossipTick();
+
+ //Message from remote gossiper with its bucket versions
+ else if (message instanceof GossipStatus)
+ receiveGossipStatus((GossipStatus) message);
+
+ //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
+ //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
+ //message with its local versions
+ else if (message instanceof GossipEnvelope)
+ receiveGossip((GossipEnvelope) message);
+
+ else if (message instanceof ClusterEvent.MemberUp) {
+ receiveMemberUp(((ClusterEvent.MemberUp) message).member());
+
+ } else if (message instanceof ClusterEvent.MemberRemoved) {
+ receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
+
+ } else if ( message instanceof ClusterEvent.UnreachableMember){
+ receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
+
+ } else
+ unhandled(message);
+ }
+
+ /**
+ * Remove member from local copy of member list. If member down is self, then stop the actor
+ *
+ * @param member who went down
+ */
+ void receiveMemberRemoveOrUnreachable(Member member) {
+ //if its self, then stop itself
+ if (selfAddress.equals(member.address())){
+ getContext().stop(getSelf());
+ return;
+ }
+
+ clusterMembers.remove(member.address());
+ log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
+
+ /**
+ * Add member to the local copy of member list if it doesnt already
+ * @param member
+ */
+ void receiveMemberUp(Member member) {
+
+ if (selfAddress.equals(member.address()))
+ return; //ignore up notification for self
+
+ if (!clusterMembers.contains(member.address()))
+ clusterMembers.add(member.address());
+
+ log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
+
+ /**
+ * Sends Gossip status to other members in the cluster. <br/>
+ * 1. If there are no member, ignore the tick. </br>
+ * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
+ * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
+ */
+ void receiveGossipTick(){
+ if (clusterMembers.size() == 0) return; //no members to send gossip status to
+
+ Address remoteMemberToGossipTo = null;
+
+ if (clusterMembers.size() == 1)
+ remoteMemberToGossipTo = clusterMembers.get(0);
+ else {
+ Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+ remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+ }
+
+ log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+ getLocalStatusAndSendTo(remoteMemberToGossipTo);
+ }
+
+ /**
+ * Process gossip status received from a remote gossiper. Remote versions are compared with
+ * the local copy. <p>
+ *
+ * For each bucket
+ * <ul>
+ * <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
+ * <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
+ * <li>If both are same, noop</li>
+ * </ul>
+ *
+ * @param status bucket versions from a remote member
+ */
+ void receiveGossipStatus(GossipStatus status){
+ //Dont want to accept messages from non-members
+ if (!clusterMembers.contains(status.from()))
+ return;
+
+ final ActorRef sender = getSender();
+
+ Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+
+ futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
+
+ }
+
+ /**
+ * Sends the received buckets in the envelope to the parent Bucket store.
+ *
+ * @param envelope contains buckets from a remote gossiper
+ */
+ void receiveGossip(GossipEnvelope envelope){
+ //TODO: Add more validations
+ if (!selfAddress.equals(envelope.to())) {
+ log.info("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ return;
+ }
+ if (envelope.getBuckets() == null)
+ return; //nothing to do
+
+ updateRemoteBuckets(envelope.getBuckets());
+
+ }
+
+ /**
+ * Helper to send received buckets to bucket store
+ *
+ * @param buckets
+ */
+ void updateRemoteBuckets(Map<Address, Bucket> buckets) {
+
+ if (buckets == null || buckets.isEmpty())
+ return; //nothing to merge
+
+ UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
+
+ getContext().parent().tell(updateRemoteBuckets, getSelf());
+ }
+
+ /**
+ * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
+ *
+ * @param remote remote node to send Buckets to
+ * @param addresses node addresses whose buckets needs to be sent
+ */
+ void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
+
+ Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
+
+ futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
+
+ }
+
+ /**
+ * Gets bucket versions from bucket store and sends to the supplied address
+ *
+ * @param remoteActorSystemAddress remote gossiper to send to
+ */
+ void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
+
+ //Get local status from bucket store and send to remote
+ Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+
+ ActorSelection remoteRef = getContext().system().actorSelection(
+ remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
+
+ log.debug("Sending bucket versions to [{}]", remoteRef);
+
+ futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
+
+ }
+
+ /**
+ * Helper to send bucket versions received from local store
+ * @param remote remote gossiper to send versions to
+ * @param localVersions bucket versions received from local store
+ */
+ void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
+
+ GossipStatus status = new GossipStatus(selfAddress, localVersions);
+ remote.tell(status, getSelf());
+ }
+
+ void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
+
+ GossipStatus status = new GossipStatus(selfAddress, localVersions);
+ remote.tell(status, getSelf());
+ }
+
+ ///
+ /// Private factories to create mappers
+ ///
+
+ private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
+
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+ if (replyMessage instanceof GetBucketVersionsReply) {
+ GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
+ Map<Address, Long> localVersions = reply.getVersions();
+
+ sendGossipStatusTo(remote, localVersions);
+
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Process bucket versions received from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
+ * Then this method compares remote bucket versions with local bucket versions.
+ * <ul>
+ * <li>The buckets that are newer locally, send
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} to remote
+ * <li>The buckets that are older locally, send
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} to remote so that
+ * remote sends GossipEnvelop.
+ * </ul>
+ *
+ * @param sender the remote member
+ * @param status bucket versions from a remote member
+ * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
+ *
+ */
+ private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
+
+ final Map<Address, Long> remoteVersions = status.getVersions();
+
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+ if (replyMessage instanceof GetBucketVersionsReply) {
+ GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
+ Map<Address, Long> localVersions = reply.getVersions();
+
+ //diff between remote list and local
+ Set<Address> localIsOlder = new HashSet<>();
+ localIsOlder.addAll(remoteVersions.keySet());
+ localIsOlder.removeAll(localVersions.keySet());
+
+ //diff between local list and remote
+ Set<Address> localIsNewer = new HashSet<>();
+ localIsNewer.addAll(localVersions.keySet());
+ localIsNewer.removeAll(remoteVersions.keySet());
+
+
+ for (Address address : remoteVersions.keySet()){
+
+ if (localVersions.get(address) == null || remoteVersions.get(address) == null)
+ continue; //this condition is taken care of by above diffs
+ if (localVersions.get(address) < remoteVersions.get(address))
+ localIsOlder.add(address);
+ else if (localVersions.get(address) > remoteVersions.get(address))
+ localIsNewer.add(address);
+ else
+ continue;
+ }
+
+ if (!localIsOlder.isEmpty())
+ sendGossipStatusTo(sender, localVersions );
+
+ if (!localIsNewer.isEmpty())
+ sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} that contains
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. These buckets are sent to a remote member encapsulated
+ * in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
+ *
+ * @param sender the remote member that sent
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
+ * in reply to which bucket is being sent back
+ * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
+ *
+ */
+ private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
+
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object msg) {
+ if (msg instanceof GetBucketsByMembersReply) {
+ Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
+ log.info("Buckets to send from {}: {}", selfAddress, buckets);
+ GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
+ sender.tell(envelope, getSelf());
+ }
+ return null;
+ }
+ };
+ }
+
+ ///
+ ///Getter Setters
+ ///
+ List<Address> getClusterMembers() {
+ return clusterMembers;
+ }
+
+ void setClusterMembers(List<Address> clusterMembers) {
+ this.clusterMembers = clusterMembers;
+ }
+
+ Address getSelfAddress() {
+ return selfAddress;
+ }
+
+ void setSelfAddress(Address selfAddress) {
+ this.selfAddress = selfAddress;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.Address;
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * These messages are used by {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} and
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} actors.
+ */
+public class Messages {
+
+ public static class BucketStoreMessages{
+
+ public static class GetLocalBucket implements Serializable{}
+
+ public static class ContainsBucket implements Serializable {
+ final private Bucket bucket;
+
+ public ContainsBucket(Bucket bucket){
+ Preconditions.checkArgument(bucket != null, "bucket can not be null");
+ this.bucket = bucket;
+ }
+
+ public Bucket getBucket(){
+ return bucket;
+ }
+
+ }
+
+ public static class UpdateBucket extends ContainsBucket implements Serializable {
+ public UpdateBucket(Bucket bucket){
+ super(bucket);
+ }
+ }
+
+ public static class GetLocalBucketReply extends ContainsBucket implements Serializable {
+ public GetLocalBucketReply(Bucket bucket){
+ super(bucket);
+ }
+ }
+
+ public static class GetAllBuckets implements Serializable{}
+
+ public static class GetBucketsByMembers implements Serializable{
+ private Set<Address> members;
+
+ public GetBucketsByMembers(Set<Address> members){
+ Preconditions.checkArgument(members != null, "members can not be null");
+ this.members = members;
+ }
+
+ public Set<Address> getMembers() {
+ return new HashSet<>(members);
+ }
+ }
+
+ public static class ContainsBuckets implements Serializable{
+ private Map<Address, Bucket> buckets;
+
+ public ContainsBuckets(Map<Address, Bucket> buckets){
+ Preconditions.checkArgument(buckets != null, "buckets can not be null");
+ this.buckets = buckets;
+ }
+
+ public Map<Address, Bucket> getBuckets() {
+ Map<Address, Bucket> copy = new HashMap<>(buckets.size());
+
+ for (Map.Entry<Address, Bucket> entry : buckets.entrySet()){
+ //ignore null entries
+ if ( (entry.getKey() == null) || (entry.getValue() == null) )
+ continue;
+ copy.put(entry.getKey(), entry.getValue());
+ }
+ return new HashMap<>(copy);
+ }
+ }
+
+ public static class GetAllBucketsReply extends ContainsBuckets implements Serializable{
+ public GetAllBucketsReply(Map<Address, Bucket> buckets) {
+ super(buckets);
+ }
+ }
+
+ public static class GetBucketsByMembersReply extends ContainsBuckets implements Serializable{
+ public GetBucketsByMembersReply(Map<Address, Bucket> buckets) {
+ super(buckets);
+ }
+ }
+
+ public static class GetBucketVersions implements Serializable{}
+
+ public static class ContainsBucketVersions implements Serializable{
+ Map<Address, Long> versions;
+
+ public ContainsBucketVersions(Map<Address, Long> versions) {
+ Preconditions.checkArgument(versions != null, "versions can not be null");
+ this.versions = versions;
+ }
+
+ public Map<Address, Long> getVersions() {
+ return Collections.unmodifiableMap(versions);
+ }
+
+ }
+
+ public static class GetBucketVersionsReply extends ContainsBucketVersions implements Serializable{
+ public GetBucketVersionsReply(Map<Address, Long> versions) {
+ super(versions);
+ }
+ }
+
+ public static class UpdateRemoteBuckets extends ContainsBuckets implements Serializable{
+ public UpdateRemoteBuckets(Map<Address, Bucket> buckets) {
+ super(buckets);
+ }
+ }
+ }
+
+ public static class GossiperMessages{
+ public static class Tick implements Serializable {}
+
+ public static final class GossipTick extends Tick {}
+
+ public static final class GossipStatus extends BucketStoreMessages.ContainsBucketVersions implements Serializable{
+ private Address from;
+
+ public GossipStatus(Address from, Map<Address, Long> versions) {
+ super(versions);
+ this.from = from;
+ }
+
+ public Address from() {
+ return from;
+ }
+ }
+
+ public static final class GossipEnvelope extends BucketStoreMessages.ContainsBuckets implements Serializable {
+ private final Address from;
+ private final Address to;
+
+ public GossipEnvelope(Address from, Address to, Map<Address, Bucket> buckets) {
+ super(buckets);
+ this.to = to;
+ this.from = from;
+ }
+
+ public Address from() {
+ return from;
+ }
+
+ public Address to() {
+ return to;
+ }
+ }
+ }
+}
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistryOld;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.core.api.Broker;
@Test
public void testInvokeRpcError() throws URISyntaxException {
new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+ ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class)));
Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
SchemaContext schemaContext = mock(SchemaContext.class);
ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
@Test
public void testInvokeRpc() throws URISyntaxException {
new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
+ ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(mock(ClusterWrapper.class)));
Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
SchemaContext schemaContext = mock(SchemaContext.class);
ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
@Test
public void testInvokeRoutedRpcError() throws URISyntaxException {
new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+ ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class)));
Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
SchemaContext schemaContext = mock(SchemaContext.class);
ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
@Test
public void testInvokeRoutedRpc() throws URISyntaxException {
new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
+ ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(mock(ClusterWrapper.class)));
Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
SchemaContext schemaContext = mock(SchemaContext.class);
ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
import java.util.HashSet;
import java.util.Set;
-public class RoutingTableTest {
+public class RoutingTableOldTest {
- private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable =
- new RoutingTable<>();
+ private RoutingTableOld<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable =
+ new RoutingTableOld<>();
@Test
public void addGlobalRouteNullRouteIdTest() {
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class RpcRegistryOldTest {
+
+ static ActorSystem system;
+
+
+ @BeforeClass
+ public static void setup() {
+ system = ActorSystem.create();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ /**
+ This test add, read and remove an entry in global rpc
+ */
+ @Test
+ public void testGlobalRpc() throws URISyntaxException {
+ new JavaTestKit(system) {{
+ ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class)));
+ QName type = new QName(new URI("actor1"), "actor1");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+ final String route = "actor1";
+
+ AddRpc rpcMsg = new AddRpc(routeId, route);
+ rpcRegistry.tell(rpcMsg, getRef());
+ expectMsgEquals(duration("2 second"), "Success");
+
+ GetRpc getRpc = new GetRpc(routeId);
+ rpcRegistry.tell(getRpc, getRef());
+
+ Boolean getMsg = new ExpectMsg<Boolean>("GetRpcReply") {
+ protected Boolean match(Object in) {
+ if (in instanceof GetRpcReply) {
+ GetRpcReply reply = (GetRpcReply)in;
+ return route.equals(reply.getRoutePath());
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(getMsg);
+
+ RemoveRpc removeMsg = new RemoveRpc(routeId);
+ rpcRegistry.tell(removeMsg, getRef());
+ expectMsgEquals(duration("2 second"), "Success");
+
+ rpcRegistry.tell(getRpc, getRef());
+
+ Boolean getNullMsg = new ExpectMsg<Boolean>("GetRpcReply") {
+ protected Boolean match(Object in) {
+ if (in instanceof GetRpcReply) {
+ GetRpcReply reply = (GetRpcReply)in;
+ return reply.getRoutePath() == null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+ Assert.assertTrue(getNullMsg);
+ }};
+
+ }
+
+ /**
+ This test add, read and remove an entry in routed rpc
+ */
+ @Test
+ public void testRoutedRpc() throws URISyntaxException {
+ new JavaTestKit(system) {{
+ ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class)));
+ QName type = new QName(new URI("actor1"), "actor1");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+ final String route = "actor1";
+
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
+ routeIds.add(routeId);
+
+ AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
+ rpcRegistry.tell(rpcMsg, getRef());
+ expectMsgEquals(duration("2 second"), "Success");
+
+ GetRoutedRpc getRpc = new GetRoutedRpc(routeId);
+ rpcRegistry.tell(getRpc, getRef());
+
+ Boolean getMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
+ protected Boolean match(Object in) {
+ if (in instanceof GetRoutedRpcReply) {
+ GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
+ return route.equals(reply.getRoutePath());
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(getMsg);
+
+ RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route);
+ rpcRegistry.tell(removeMsg, getRef());
+ expectMsgEquals(duration("2 second"), "Success");
+
+ rpcRegistry.tell(getRpc, getRef());
+
+ Boolean getNullMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
+ protected Boolean match(Object in) {
+ if (in instanceof GetRoutedRpcReply) {
+ GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
+ return reply.getRoutePath() == null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+ Assert.assertTrue(getNullMsg);
+ }};
+
+ }
+
+}
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
package org.opendaylight.controller.remote.rpc.registry;
+import akka.actor.ActorPath;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.ChildActorPath;
+import akka.actor.Props;
+import akka.japi.Pair;
import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+import com.typesafe.config.ConfigFactory;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.Mockito;
import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.common.QName;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
public class RpcRegistryTest {
- static ActorSystem system;
-
-
- @BeforeClass
- public static void setup() {
- system = ActorSystem.create();
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
-
- /**
- This test add, read and remove an entry in global rpc
- */
- @Test
- public void testGlobalRpc() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
- QName type = new QName(new URI("actor1"), "actor1");
- RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
- final String route = "actor1";
-
- AddRpc rpcMsg = new AddRpc(routeId, route);
- rpcRegistry.tell(rpcMsg, getRef());
- expectMsgEquals(duration("2 second"), "Success");
-
- GetRpc getRpc = new GetRpc(routeId);
- rpcRegistry.tell(getRpc, getRef());
-
- Boolean getMsg = new ExpectMsg<Boolean>("GetRpcReply") {
- protected Boolean match(Object in) {
- if (in instanceof GetRpcReply) {
- GetRpcReply reply = (GetRpcReply)in;
- return route.equals(reply.getRoutePath());
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ private static ActorSystem node1;
+ private static ActorSystem node2;
+ private static ActorSystem node3;
+
+ private ActorRef registry1;
+ private ActorRef registry2;
+ private ActorRef registry3;
+
+ @BeforeClass
+ public static void setup() throws InterruptedException {
+ Thread.sleep(1000); //give some time for previous test to close netty ports
+ node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
+ node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
+ node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
+ }
+
+ @AfterClass
+ public static void teardown(){
+ JavaTestKit.shutdownActorSystem(node1);
+ JavaTestKit.shutdownActorSystem(node2);
+ JavaTestKit.shutdownActorSystem(node3);
+ if (node1 != null)
+ node1.shutdown();
+ if (node2 != null)
+ node2.shutdown();
+ if (node3 != null)
+ node3.shutdown();
+
+ }
+
+ @Before
+ public void createRpcRegistry() throws InterruptedException {
+ registry1 = node1.actorOf(Props.create(RpcRegistry.class));
+ registry2 = node2.actorOf(Props.create(RpcRegistry.class));
+ registry3 = node3.actorOf(Props.create(RpcRegistry.class));
+ }
+
+ @After
+ public void stopRpcRegistry() throws InterruptedException {
+ if (registry1 != null)
+ node1.stop(registry1);
+ if (registry2 != null)
+ node2.stop(registry2);
+ if (registry3 != null)
+ node3.stop(registry3);
+ }
+
+ /**
+ * One node cluster.
+ * Register rpc. Ensure router can be found
+ *
+ * @throws URISyntaxException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testWhenRpcAddedOneNodeShouldAppearOnSameNode() throws URISyntaxException, InterruptedException {
+
+ final JavaTestKit mockBroker = new JavaTestKit(node1);
+
+ //Add rpc on node 1
+ registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
+ registry1.tell(getAddRouteMessage(), mockBroker.getRef());
+
+ Thread.sleep(1000);//
+
+ //find the route on node 1's registry
+ registry1.tell(new FindRouters(createRouteId()), mockBroker.getRef());
+ FindRoutersReply message = mockBroker.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
+ List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
+
+ validateRouterReceived(pairs, mockBroker.getRef());
+ }
+
+ /**
+ * Three node cluster.
+ * Register rpc on 1 node. Ensure its router can be found on other 2.
+ *
+ * @throws URISyntaxException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testWhenRpcAddedOneNodeShouldAppearOnAnother() throws URISyntaxException, InterruptedException {
+
+ validateSystemStartup();
+
+ final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+ final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+ final JavaTestKit mockBroker3 = new JavaTestKit(node3);
+
+ //Add rpc on node 1
+ registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+ registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+
+ Thread.sleep(5000);// give some time for bucket store data sync
+
+ //find the route in node 2's registry
+ registry2.tell(new FindRouters(createRouteId()), mockBroker2.getRef());
+ FindRoutersReply message = mockBroker2.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
+ List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
+
+ validateRouterReceived(pairs, mockBroker1.getRef());
+
+ //find the route in node 3's registry
+ registry3.tell(new FindRouters(createRouteId()), mockBroker3.getRef());
+ message = mockBroker3.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
+ pairs = message.getRouterWithUpdateTime();
+
+ validateRouterReceived(pairs, mockBroker1.getRef());
+
+ }
+
+ /**
+ * Three node cluster.
+ * Register rpc on 2 nodes. Ensure 2 routers are found on 3rd.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAnRpcAddedOnMultiNodesShouldReturnMultiRouter() throws Exception {
+
+ validateSystemStartup();
+
+ final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+ final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+ final JavaTestKit mockBroker3 = new JavaTestKit(node3);
+
+ //Thread.sleep(5000);//let system come up
+
+ //Add rpc on node 1
+ registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+ registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+
+ //Add same rpc on node 2
+ registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
+ registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+
+ registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
+ Thread.sleep(5000);// give some time for bucket store data sync
+
+ //find the route in node 3's registry
+ registry3.tell(new FindRouters(createRouteId()), mockBroker3.getRef());
+ FindRoutersReply message = mockBroker3.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
+ List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
- Assert.assertTrue(getMsg);
+ validateMultiRouterReceived(pairs, mockBroker1.getRef(), mockBroker2.getRef());
- RemoveRpc removeMsg = new RemoveRpc(routeId);
- rpcRegistry.tell(removeMsg, getRef());
- expectMsgEquals(duration("2 second"), "Success");
+ }
- rpcRegistry.tell(getRpc, getRef());
+ private void validateMultiRouterReceived(List<Pair<ActorRef, Long>> actual, ActorRef... expected) {
+ Assert.assertTrue(actual != null);
+ Assert.assertTrue(actual.size() == expected.length);
+ }
- Boolean getNullMsg = new ExpectMsg<Boolean>("GetRpcReply") {
- protected Boolean match(Object in) {
- if (in instanceof GetRpcReply) {
- GetRpcReply reply = (GetRpcReply)in;
- return reply.getRoutePath() == null;
- } else {
- throw noMatch();
- }
- }
- }.get();
- Assert.assertTrue(getNullMsg);
- }};
-
- }
-
- /**
- This test add, read and remove an entry in routed rpc
- */
- @Test
- public void testRoutedRpc() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
- QName type = new QName(new URI("actor1"), "actor1");
- RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
- final String route = "actor1";
-
- Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
- routeIds.add(routeId);
-
- AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
- rpcRegistry.tell(rpcMsg, getRef());
- expectMsgEquals(duration("2 second"), "Success");
-
- GetRoutedRpc getRpc = new GetRoutedRpc(routeId);
- rpcRegistry.tell(getRpc, getRef());
-
- Boolean getMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
- protected Boolean match(Object in) {
- if (in instanceof GetRoutedRpcReply) {
- GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
- return route.equals(reply.getRoutePath());
- } else {
- throw noMatch();
- }
+ private void validateRouterReceived(List<Pair<ActorRef, Long>> actual, ActorRef expected){
+ Assert.assertTrue(actual != null);
+ Assert.assertTrue(actual.size() == 1);
+
+ for (Pair<ActorRef, Long> pair : actual){
+ Assert.assertTrue(expected.path().uid() == pair.first().path().uid());
}
- }.get(); // this extracts the received message
+ }
- Assert.assertTrue(getMsg);
+ private void validateSystemStartup() throws InterruptedException {
- RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route);
- rpcRegistry.tell(removeMsg, getRef());
- expectMsgEquals(duration("2 second"), "Success");
+ Thread.sleep(5000);
+ ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper");
+ ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper");
+ ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper");
- rpcRegistry.tell(getRpc, getRef());
+ ActorSelection gossiper1 = node1.actorSelection(gossiper1Path);
+ ActorSelection gossiper2 = node2.actorSelection(gossiper2Path);
+ ActorSelection gossiper3 = node3.actorSelection(gossiper3Path);
- Boolean getNullMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
- protected Boolean match(Object in) {
- if (in instanceof GetRoutedRpcReply) {
- GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
- return reply.getRoutePath() == null;
- } else {
- throw noMatch();
- }
- }
- }.get();
- Assert.assertTrue(getNullMsg);
- }};
- }
+ if (!resolveReference(gossiper1, gossiper2, gossiper3))
+ Assert.fail("Could not find gossipers");
+ }
-}
+ private Boolean resolveReference(ActorSelection... gossipers) throws InterruptedException {
+
+ Boolean resolved = true;
+
+ for (int i=0; i< 5; i++) {
+ Thread.sleep(1000);
+ for (ActorSelection gossiper : gossipers) {
+ Future<ActorRef> future = gossiper.resolveOne(new FiniteDuration(5000, TimeUnit.MILLISECONDS));
+
+ ActorRef ref = null;
+ try {
+ ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if (ref == null)
+ resolved = false;
+ }
+
+ if (resolved) break;
+ }
+ return resolved;
+ }
+
+ private AddOrUpdateRoute getAddRouteMessage() throws URISyntaxException {
+ return new AddOrUpdateRoute(createRouteId());
+ }
+
+ private RpcRouter.RouteIdentifier<?,?,?> createRouteId() throws URISyntaxException {
+ QName type = new QName(new URI("/mockrpc"), "mockrpc");
+ return new RouteIdentifierImpl(null, type, null);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import com.typesafe.config.ConfigFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.TerminationMonitor;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+
+public class BucketStoreTest {
+
+ private static ActorSystem system;
+ private static BucketStore store;
+
+ private BucketStore mockStore;
+
+ @BeforeClass
+ public static void setup() {
+
+ system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
+ system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+
+ store = createStore();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ system.shutdown();
+ }
+
+ @Before
+ public void createMocks(){
+ mockStore = spy(store);
+ }
+
+ @After
+ public void resetMocks(){
+ reset(mockStore);
+ }
+
+ @Test
+ public void testReceiveUpdateBucket_WhenInputBucketShouldUpdateVersion(){
+ Bucket bucket = new BucketImpl();
+ Long expectedVersion = bucket.getVersion();
+
+ mockStore.receiveUpdateBucket(bucket);
+
+ Assert.assertEquals(bucket, mockStore.getLocalBucket());
+ Assert.assertEquals(expectedVersion, mockStore.getLocalBucket().getVersion());
+ }
+
+ /**
+ * Create BucketStore actor and returns the underlying instance of BucketStore class.
+ *
+ * @return instance of BucketStore class
+ */
+ private static BucketStore createStore(){
+ TestProbe mockActor = new TestProbe(system);
+ ActorRef mockGossiper = mockActor.ref();
+ final Props props = Props.create(BucketStore.class, mockGossiper);
+ final TestActorRef<BucketStore> testRef = TestActorRef.create(system, props, "testStore");
+
+ return testRef.underlyingActor();
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.typesafe.config.ConfigFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.TerminationMonitor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+
+
+public class GossiperTest {
+
+ private static ActorSystem system;
+ private static Gossiper gossiper;
+
+ private Gossiper mockGossiper;
+
+ @BeforeClass
+ public static void setup() throws InterruptedException {
+ Thread.sleep(1000);//give some time for previous test to stop the system. Netty port conflict arises otherwise.
+ system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
+ system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+
+ gossiper = createGossiper();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (system != null)
+ system.shutdown();
+ }
+
+ @Before
+ public void createMocks(){
+ mockGossiper = spy(gossiper);
+ }
+
+ @After
+ public void resetMocks(){
+ reset(mockGossiper);
+
+ }
+
+ @Test
+ public void testReceiveGossipTick_WhenNoRemoteMemberShouldIgnore(){
+
+ mockGossiper.setClusterMembers(Collections.EMPTY_LIST);
+ doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
+ mockGossiper.receiveGossipTick();
+ verify(mockGossiper, times(0)).getLocalStatusAndSendTo(any(Address.class));
+ }
+
+ @Test
+ public void testReceiveGossipTick_WhenRemoteMemberExistsShouldSendStatus(){
+ List<Address> members = new ArrayList<>();
+ Address remote = new Address("tcp", "member");
+ members.add(remote);
+
+ mockGossiper.setClusterMembers(members);
+ doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
+ mockGossiper.receiveGossipTick();
+ verify(mockGossiper, times(1)).getLocalStatusAndSendTo(any(Address.class));
+ }
+
+ @Test
+ public void testReceiveGossipStatus_WhenSenderIsNonMemberShouldIgnore(){
+
+ Address nonMember = new Address("tcp", "non-member");
+ GossipStatus remoteStatus = new GossipStatus(nonMember, mock(Map.class));
+
+ //add a member
+ List<Address> members = new ArrayList<>();
+ members.add(new Address("tcp", "member"));
+
+ mockGossiper.setClusterMembers(members);
+ mockGossiper.receiveGossipStatus(remoteStatus);
+ verify(mockGossiper, times(0)).getSender();
+ }
+
+ @Test
+ public void testReceiveGossip_WhenNotAddressedToSelfShouldIgnore(){
+ Address notSelf = new Address("tcp", "not-self");
+
+ GossipEnvelope envelope = new GossipEnvelope(notSelf, notSelf, mock(Map.class));
+ doNothing().when(mockGossiper).updateRemoteBuckets(anyMap());
+ mockGossiper.receiveGossip(envelope);
+ verify(mockGossiper, times(0)).updateRemoteBuckets(anyMap());
+ }
+
+ @Test
+ public void testUpdateRemoteBuckets_WhenNoBucketShouldIgnore(){
+
+ mockGossiper.updateRemoteBuckets(null);
+ verify(mockGossiper, times(0)).getContext();
+
+ Map<Address, Bucket> empty = Collections.emptyMap();
+ mockGossiper.updateRemoteBuckets(empty);
+ verify(mockGossiper, times(0)).getContext();
+ }
+
+ /**
+ * Create Gossiper actor and return the underlying instance of Gossiper class.
+ *
+ * @return instance of Gossiper class
+ */
+ private static Gossiper createGossiper(){
+
+ final Props props = Props.create(Gossiper.class, false);
+ final TestActorRef<Gossiper> testRef = TestActorRef.create(system, props, "testGossiper");
+
+ return testRef.underlyingActor();
+ }
+}
\ No newline at end of file
--- /dev/null
+odl-cluster{
+ akka {
+ loglevel = "INFO"
+ #log-config-on-start = on
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+ debug{
+ #autoreceive = on
+ #lifecycle = on
+
+ }
+ }
+ remote {
+ log-received-messages = on
+ log-sent-messages = on
+
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "localhost"
+ port = 2551
+ }
+ }
+
+ cluster {
+ seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"]
+
+ auto-down-unreachable-after = 10s
+ }
+ }
+}
+unit-test{
+ akka {
+ loglevel = "INFO"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+ }
+ }
+}
+
+memberA{
+ akka {
+ loglevel = "INFO"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+ }
+ remote {
+ log-received-messages = off
+ log-sent-messages = off
+
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "localhost"
+ port = 2551
+ }
+ }
+
+ cluster {
+ seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"]
+
+ auto-down-unreachable-after = 10s
+ }
+ }
+}
+memberB{
+ akka {
+ loglevel = "INFO"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+ }
+ remote {
+ log-received-messages = off
+ log-sent-messages = off
+
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "localhost"
+ port = 2552
+ }
+ }
+
+ cluster {
+ seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"]
+
+ auto-down-unreachable-after = 10s
+ }
+ }
+}
+memberC{
+ akka {
+ loglevel = "INFO"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+ }
+ remote {
+ log-received-messages = off
+ log-sent-messages = off
+
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "localhost"
+ port = 2553
+ }
+ }
+
+ cluster {
+ seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"]
+
+ auto-down-unreachable-after = 10s
+ }
+ }
+}
\ No newline at end of file