Gossip based eventually consistent RPC Registry. 13/9713/12
authorAbhishek Kumar <abhishk2@cisco.com>
Thu, 31 Jul 2014 21:18:28 +0000 (14:18 -0700)
committerAbhishek Kumar <abhishk2@cisco.com>
Fri, 8 Aug 2014 16:40:29 +0000 (09:40 -0700)
There are 2 main components
1. Bucket Store:
   Stores data in buckets. Each cluster node gets a bucket. These buckets are sync'ed
   across nodes using Gossip protocol.
   Bucket Store uses a Gossiper that implements the protocol to sync data.

2. Rpc Registry:
   This uses the bucket store to store routing table. Routing table maintains mapping of
   RPC <--> Node
   Rpc Broker uses this regitry to route rpc requests to remote nodes.

Change-Id: Ifaf8955dbf6e3074d4d2951d6f503ecc0624d141
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
19 files changed:
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Copier.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOldTest.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java with 97% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOldTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf [new file with mode: 0644]

index a2bee8ffee7295c8c086365fafa1651235f4efdc..50af48effcf974aece75132c9a0866283b6ef76c 100644 (file)
       <scope>test</scope>
     </dependency>
 
-    <dependency>
+      <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
       <version>${slf4j.version}</version>
index 5c56455bd0c208a40709b4a221fb6a22ed0b65a1..514a2f141daea13e5e71ec1f0f0a8acf9eadcf9d 100644 (file)
@@ -17,7 +17,7 @@ import akka.japi.Creator;
 import akka.japi.Function;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistryOld;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -72,7 +72,7 @@ public class RpcManager extends AbstractUntypedActor {
   private void createRpcActors() {
     LOG.debug("Create rpc registry and broker actors");
 
-    rpcRegistry = getContext().actorOf(RpcRegistry.props(clusterWrapper), ActorConstants.RPC_REGISTRY);
+    rpcRegistry = getContext().actorOf(RpcRegistryOld.props(clusterWrapper), ActorConstants.RPC_REGISTRY);
     rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER);
   }
 
index 5e19653a22d21ed83ae3eec370290dc37f50c1ce..c25aa523e2e0823769a28e9dc01b32da711b0ae5 100644 (file)
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.remote.rpc.registry;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import akka.actor.ActorRef;
+import akka.japi.Option;
+import akka.japi.Pair;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
+import java.io.Serializable;
 import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class RoutingTable<I, R> {
-
-  private final Logger LOG = LoggerFactory.getLogger(RoutingTable.class);
-
-  private ConcurrentMap<I,R> globalRpcMap = new ConcurrentHashMap<>();
-  private ConcurrentMap<I, LinkedHashSet<R>> routedRpcMap = new ConcurrentHashMap<>();
-
-  public ConcurrentMap<I, R> getGlobalRpcMap() {
-    return globalRpcMap;
-  }
-
-  public ConcurrentMap<I, LinkedHashSet<R>> getRoutedRpcMap() {
-    return routedRpcMap;
-  }
-
-  public R getGlobalRoute(final I routeId) {
-    Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
-    return globalRpcMap.get(routeId);
-  }
-
-  public void addGlobalRoute(final I routeId, final R route) {
-    Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
-    Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
-    LOG.debug("addGlobalRoute: adding  a new route with id[{}] and value [{}]", routeId, route);
-    if(globalRpcMap.putIfAbsent(routeId, route) != null) {
-      LOG.debug("A route already exist for route id [{}] ", routeId);
-    }
-  }
+import java.util.HashMap;
+import java.util.Map;
 
-  public void removeGlobalRoute(final I routeId) {
-    Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
-    LOG.debug("removeGlobalRoute: removing  a new route with id [{}]", routeId);
-    globalRpcMap.remove(routeId);
-  }
+public class RoutingTable implements Copier<RoutingTable>, Serializable {
 
-  public Set<R> getRoutedRpc(final I routeId) {
-    Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
-    Set<R> routes = routedRpcMap.get(routeId);
-
-    if (routes == null) {
-      return Collections.emptySet();
-    }
+    private Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
+    private ActorRef router;
 
-    return ImmutableSet.copyOf(routes);
-  }
+    @Override
+    public RoutingTable copy() {
+        RoutingTable copy = new RoutingTable();
+        copy.setTable(Collections.unmodifiableMap(table));
+        copy.setRouter(this.getRouter());
 
-  public R getLastAddedRoutedRpc(final I routeId) {
+        return copy;
+    }
 
-    Set<R> routes = getRoutedRpc(routeId);
+    public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+        Long updatedTime = table.get(routeId);
 
-    if (routes.isEmpty()) {
-      return null;
+        if (updatedTime == null || router == null)
+            return Option.none();
+        else
+            return Option.option(new Pair<>(router, updatedTime));
     }
 
-    R route = null;
-    Iterator<R> iter = routes.iterator();
-    while (iter.hasNext()) {
-      route = iter.next();
+    public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
+        table.put(routeId, System.currentTimeMillis());
     }
 
-    return route;
-  }
-
-  public void addRoutedRpc(final I routeId, final R route)   {
-    Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
-    Preconditions.checkNotNull(route, "addRoute: route cannot be null");
-    LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
-    threadSafeAdd(routeId, route);
-  }
-
-  public void addRoutedRpcs(final Set<I> routeIds, final R route) {
-    Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
-    for (I routeId : routeIds){
-      addRoutedRpc(routeId, route);
+    public void removeRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+        table.remove(routeId);
     }
-  }
 
-  public void removeRoute(final I routeId, final R route) {
-    Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
-    Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
-
-    LinkedHashSet<R> routes = routedRpcMap.get(routeId);
-    if (routes == null) {
-      return;
-    }
-    LOG.debug("removeRoute: removing  a new route with k/v [{}/{}]", routeId, route);
-    threadSafeRemove(routeId, route);
-  }
-
-  public void removeRoutes(final Set<I> routeIds, final R route) {
-    Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
-    for (I routeId : routeIds){
-      removeRoute(routeId, route);
+    public Boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+        return table.containsKey(routeId);
     }
-  }
-
-  /**
-   * This method guarantees that no 2 thread over write each other's changes.
-   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
-   */
-  private void threadSafeAdd(final I routeId, final R route) {
 
-    for (int i=0;i<100;i++){
+    ///
+    /// Getter, Setters
+    ///
+    //TODO: Remove public
+    public Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> getTable() {
+        return table;
+    }
 
-      LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
-      updatedRoutes.add(route);
-      LinkedHashSet<R> oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes);
-      if (oldRoutes == null) {
-        return;
-      }
+    void setTable(Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table) {
+        this.table = table;
+    }
 
-      updatedRoutes = new LinkedHashSet<>(oldRoutes);
-      updatedRoutes.add(route);
+    public ActorRef getRouter() {
+        return router;
+    }
 
-      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
-        return;
-      }
+    public void setRouter(ActorRef router) {
+        this.router = router;
     }
-    //the method did not already return means it failed to add route in 100 attempts
-    throw new IllegalStateException("Failed to add route [" + routeId + "]");
-  }
-
-  /**
-   * This method guarantees that no 2 thread over write each other's changes.
-   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
-   */
-  private void threadSafeRemove(final I routeId, final R route) {
-    LinkedHashSet<R> updatedRoutes = null;
-    for (int i=0;i<100;i++){
-      LinkedHashSet<R> oldRoutes = routedRpcMap.get(routeId);
-
-      // if route to be deleted is the only entry in the set then remove routeId from the cache
-      if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
-        routedRpcMap.remove(routeId);
-        return;
-      }
-
-      // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
-      updatedRoutes = new LinkedHashSet<>(oldRoutes);
-      updatedRoutes.remove(route);
-      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
-        return;
-      }
 
+    @Override
+    public String toString() {
+        return "RoutingTable{" +
+                "table=" + table +
+                ", router=" + router +
+                '}';
     }
-    //the method did not already return means it failed to remove route in 100 attempts
-    throw new IllegalStateException("Failed to remove route [" + routeId + "]");
-  }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableOld.java
new file mode 100644 (file)
index 0000000..5951776
--- /dev/null
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class RoutingTableOld<I, R> {
+
+  private final Logger LOG = LoggerFactory.getLogger(RoutingTableOld.class);
+
+  private ConcurrentMap<I,R> globalRpcMap = new ConcurrentHashMap<>();
+  private ConcurrentMap<I, LinkedHashSet<R>> routedRpcMap = new ConcurrentHashMap<>();
+
+  public ConcurrentMap<I, R> getGlobalRpcMap() {
+    return globalRpcMap;
+  }
+
+  public ConcurrentMap<I, LinkedHashSet<R>> getRoutedRpcMap() {
+    return routedRpcMap;
+  }
+
+  public R getGlobalRoute(final I routeId) {
+    Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
+    return globalRpcMap.get(routeId);
+  }
+
+  public void addGlobalRoute(final I routeId, final R route) {
+    Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
+    Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
+    LOG.debug("addGlobalRoute: adding  a new route with id[{}] and value [{}]", routeId, route);
+    if(globalRpcMap.putIfAbsent(routeId, route) != null) {
+      LOG.debug("A route already exist for route id [{}] ", routeId);
+    }
+  }
+
+  public void removeGlobalRoute(final I routeId) {
+    Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
+    LOG.debug("removeGlobalRoute: removing  a new route with id [{}]", routeId);
+    globalRpcMap.remove(routeId);
+  }
+
+  public Set<R> getRoutedRpc(final I routeId) {
+    Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
+    Set<R> routes = routedRpcMap.get(routeId);
+
+    if (routes == null) {
+      return Collections.emptySet();
+    }
+
+    return ImmutableSet.copyOf(routes);
+  }
+
+  public R getLastAddedRoutedRpc(final I routeId) {
+
+    Set<R> routes = getRoutedRpc(routeId);
+
+    if (routes.isEmpty()) {
+      return null;
+    }
+
+    R route = null;
+    Iterator<R> iter = routes.iterator();
+    while (iter.hasNext()) {
+      route = iter.next();
+    }
+
+    return route;
+  }
+
+  public void addRoutedRpc(final I routeId, final R route)   {
+    Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
+    Preconditions.checkNotNull(route, "addRoute: route cannot be null");
+    LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
+    threadSafeAdd(routeId, route);
+  }
+
+  public void addRoutedRpcs(final Set<I> routeIds, final R route) {
+    Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
+    for (I routeId : routeIds){
+      addRoutedRpc(routeId, route);
+    }
+  }
+
+  public void removeRoute(final I routeId, final R route) {
+    Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
+    Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
+
+    LinkedHashSet<R> routes = routedRpcMap.get(routeId);
+    if (routes == null) {
+      return;
+    }
+    LOG.debug("removeRoute: removing  a new route with k/v [{}/{}]", routeId, route);
+    threadSafeRemove(routeId, route);
+  }
+
+  public void removeRoutes(final Set<I> routeIds, final R route) {
+    Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
+    for (I routeId : routeIds){
+      removeRoute(routeId, route);
+    }
+  }
+
+  /**
+   * This method guarantees that no 2 thread over write each other's changes.
+   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+   */
+  private void threadSafeAdd(final I routeId, final R route) {
+
+    for (int i=0;i<100;i++){
+
+      LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
+      updatedRoutes.add(route);
+      LinkedHashSet<R> oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes);
+      if (oldRoutes == null) {
+        return;
+      }
+
+      updatedRoutes = new LinkedHashSet<>(oldRoutes);
+      updatedRoutes.add(route);
+
+      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
+        return;
+      }
+    }
+    //the method did not already return means it failed to add route in 100 attempts
+    throw new IllegalStateException("Failed to add route [" + routeId + "]");
+  }
+
+  /**
+   * This method guarantees that no 2 thread over write each other's changes.
+   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+   */
+  private void threadSafeRemove(final I routeId, final R route) {
+    LinkedHashSet<R> updatedRoutes = null;
+    for (int i=0;i<100;i++){
+      LinkedHashSet<R> oldRoutes = routedRpcMap.get(routeId);
+
+      // if route to be deleted is the only entry in the set then remove routeId from the cache
+      if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
+        routedRpcMap.remove(routeId);
+        return;
+      }
+
+      // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
+      updatedRoutes = new LinkedHashSet<>(oldRoutes);
+      updatedRoutes.remove(route);
+      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
+        return;
+      }
+
+    }
+    //the method did not already return means it failed to remove route in 100 attempts
+    throw new IllegalStateException("Failed to remove route [" + routeId + "]");
+  }
+}
index e36060cc13ece309f04f10adebdb74a62f158146..51609870cc4aad1c8789dfdcd0b68f04563b5cdf 100644 (file)
  */
 package org.opendaylight.controller.remote.rpc.registry;
 
-import akka.actor.ActorSelection;
+import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.cluster.ClusterEvent;
-import akka.cluster.Member;
-import akka.japi.Creator;
-import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.ActorConstants;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
-import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
+import akka.actor.UntypedActor;
+import akka.dispatch.Mapper;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Option;
+import akka.japi.Pair;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
+import scala.concurrent.Future;
 
-import java.util.LinkedHashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
 
 /**
- * This Actor maintains the routing table state and sync it with other nodes in the cluster.
- *
- * A scheduler runs after an interval of time, which pick a random member from the cluster
- * and send the current state of routing table to the member.
+ * Registry to look up cluster nodes that have registered for a given rpc.
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * cluster wide information.
  *
- * when a message of routing table data is received, it gets merged with the local routing table
- * to keep the latest data.
  */
+public class RpcRegistry extends UntypedActor {
+
+    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+    /**
+     * Store to keep the registry. Bucket store sync's it across nodes in the cluster
+     */
+    private ActorRef bucketStore;
+
+    /**
+     * Rpc broker that would use the registry to route requests.
+     */
+    private ActorRef localRouter;
 
-public class RpcRegistry extends AbstractUntypedActor {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
-  private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
-  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-  private final ClusterWrapper clusterWrapper;
-  private final ScheduledFuture<?> syncScheduler;
-
-  private RpcRegistry(ClusterWrapper clusterWrapper){
-    this.routingTable = new RoutingTable<>();
-    this.clusterWrapper = clusterWrapper;
-    this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
-  }
-
-  public static Props props(final ClusterWrapper clusterWrapper){
-    return Props.create(new Creator<RpcRegistry>(){
-
-      @Override
-      public RpcRegistry create() throws Exception {
-        return new RpcRegistry(clusterWrapper);
-      }
-    });
-  }
-
-  @Override
-  protected void handleReceive(Object message) throws Exception {
-    LOG.debug("Received message {}", message);
-    if(message instanceof RoutingTableData) {
-      syncRoutingTable((RoutingTableData) message);
-    } else if(message instanceof GetRoutedRpc) {
-      getRoutedRpc((GetRoutedRpc) message);
-    } else if(message instanceof GetRpc) {
-      getRpc((GetRpc) message);
-    } else if(message instanceof AddRpc) {
-      addRpc((AddRpc) message);
-    } else if(message instanceof RemoveRpc) {
-      removeRpc((RemoveRpc) message);
-    } else if(message instanceof AddRoutedRpc) {
-      addRoutedRpc((AddRoutedRpc) message);
-    } else if(message instanceof RemoveRoutedRpc) {
-      removeRoutedRpc((RemoveRoutedRpc) message);
+    public RpcRegistry() {
+        bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
     }
-  }
 
-  private void getRoutedRpc(GetRoutedRpc rpcMsg){
-    LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
-    String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
-    GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+    public RpcRegistry(ActorRef bucketStore) {
+        this.bucketStore = bucketStore;
+    }
+
+    @Override
+    public void onReceive(Object message) throws Exception {
 
-    getSender().tell(routedRpcReply, self());
-  }
+        log.debug("Received message: message [{}]", message);
 
-  private void getRpc(GetRpc rpcMsg) {
-    LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
-    String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
-    GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+        //TODO: if sender is remote, reject message
 
-    getSender().tell(rpcReply, self());
-  }
+        if (message instanceof SetLocalRouter)
+            receiveSetLocalRouter((SetLocalRouter) message);
 
-  private void addRpc(AddRpc rpcMsg) {
-    LOG.debug("Add Rpc to routing table {}", rpcMsg);
-    routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+        if (message instanceof AddOrUpdateRoute)
+            receiveAddRoute((AddOrUpdateRoute) message);
 
-    getSender().tell("Success", self());
-  }
+        else if (message instanceof RemoveRoute)
+            receiveRemoveRoute((RemoveRoute) message);
 
-  private void removeRpc(RemoveRpc rpcMsg) {
-    LOG.debug("Removing Rpc to routing table {}", rpcMsg);
-    routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+        else if (message instanceof Messages.FindRouters)
+            receiveGetRouter((Messages.FindRouters) message);
+
+        else
+            unhandled(message);
+    }
 
-    getSender().tell("Success", self());
-  }
+    /**
+     * Register's rpc broker
+     *
+     * @param message contains {@link akka.actor.ActorRef} for rpc broker
+     */
+    private void receiveSetLocalRouter(SetLocalRouter message) {
+        if (message == null || message.getRouter() == null)
+            return;//ignore
 
-  private void addRoutedRpc(AddRoutedRpc rpcMsg) {
-    routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
-    getSender().tell("Success", self());
-  }
+        localRouter = message.getRouter();
+    }
 
-  private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
-    routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
-    getSender().tell("Success", self());
-  }
+    /**
+     * //TODO: update this to accept multiple route registration
+     * @param msg
+     */
+    private void receiveAddRoute(AddOrUpdateRoute msg) {
+        if (msg.getRouteIdentifier() == null)
+            return;//ignore
 
-  private void syncRoutingTable(RoutingTableData routingTableData) {
-    LOG.debug("Syncing routing table {}", routingTableData);
+        Preconditions.checkState(localRouter != null, "Router must be set first");
 
-    Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
-    Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
-    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-      routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
+        futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher());
     }
 
-    Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
-        routingTableData.getRoutedRpcMap();
-    routeIds = newRoutedRpcMap.keySet();
+    /**
+     * //TODO: update this to accept multiple routes
+     * @param msg
+     */
+    private void receiveRemoveRoute(RemoveRoute msg) {
+        if (msg.getRouteIdentifier() == null)
+            return;//ignore
+
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
+        futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher());
 
-    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-      Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
-      for(String routeAddress : routeAddresses) {
-        routingTable.addRoutedRpc(routeId, routeAddress);
-      }
     }
-  }
-
-  private ActorSelection getRandomRegistryActor() {
-    ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
-    ActorSelection actor = null;
-    Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
-    int memberSize = members.size();
-    // Don't select yourself
-    if(memberSize > 1) {
-      Address currentNodeAddress = clusterWrapper.getAddress();
-      int index = new Random().nextInt(memberSize);
-      int i = 0;
-      // keeping previous member, in case when random index member is same as current actor
-      // and current actor member is last in set
-      Member previousMember = null;
-      for(Member member : members){
-        if(i == index-1) {
-          previousMember = member;
+
+    /**
+     * Finds routers for the given rpc.
+     * @param msg
+     */
+    private void receiveGetRouter(Messages.FindRouters msg) {
+        final ActorRef sender = getSender();
+
+        //if empty message, return empty list
+        if (msg.getRouteIdentifier() == null) {
+            sender.tell(createEmptyReply(), getSelf());
+            return;
         }
-        if(i == index) {
-          if(!currentNodeAddress.equals(member.address())) {
-            actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
-            break;
-          } else if(index < memberSize-1){ // pick the next element in the set
-            index++;
-          }
+
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
+        futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
+
+    }
+
+    /**
+     * Helper to create empty reply when no routers are found
+     *
+     * @return
+     */
+    private Messages.FindRoutersReply createEmptyReply() {
+        List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
+        return new Messages.FindRoutersReply(routerWithUpdateTime);
+    }
+
+    /**
+     * Helper to create a reply when routers are found for the given rpc
+     * @param buckets
+     * @param routeId
+     * @return
+     */
+    private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+
+        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
+
+        Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
+
+        for (Bucket bucket : buckets.values()) {
+
+            RoutingTable table = (RoutingTable) bucket.getData();
+
+            if (table == null)
+                continue;
+
+            routerWithUpdateTime = table.getRouterFor(routeId);
+
+            if (routerWithUpdateTime.isEmpty())
+                continue;
+
+            routers.add(routerWithUpdateTime.get());
         }
-        i++;
-      }
-      if(actor == null && previousMember != null) {
-        actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
-      }
+
+        return new Messages.FindRoutersReply(routers);
     }
-    return actor;
-  }
 
-  private class SendRoutingTable implements Runnable {
 
-    @Override
-    public void run() {
-      RoutingTableData routingTableData =
-          new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
-      LOG.debug("Sending routing table for sync {}", routingTableData);
-      ActorSelection actor = getRandomRegistryActor();
-      if(actor != null) {
-        actor.tell(routingTableData, self());
-      }
+    ///
+    ///private factories to create Mapper
+    ///
+
+    /**
+     *  Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
+     *
+     * @param routeId the rpc
+     * @param sender  client who asked to find the routers.
+     * @return
+     */
+    private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
+        return new Mapper<Object, Void>() {
+            @Override
+            public Void apply(Object replyMessage) {
+
+                if (replyMessage instanceof GetAllBucketsReply) {
+
+                    GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
+                    Map<Address, Bucket> buckets = reply.getBuckets();
+
+                    if (buckets == null || buckets.isEmpty()) {
+                        sender.tell(createEmptyReply(), getSelf());
+                        return null;
+                    }
+
+                    sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
+                }
+                return null;
+            }
+        };
+    }
+
+    /**
+     * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
+     * it updates the local bucket in bucket store.
+     *
+     * @param routeId rpc to remote
+     * @return
+     */
+    private Mapper<Object, Void> getMapperToRemoveRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+        return new Mapper<Object, Void>() {
+            @Override
+            public Void apply(Object replyMessage) {
+                if (replyMessage instanceof GetLocalBucketReply) {
+
+                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
+                    Bucket<RoutingTable> bucket = reply.getBucket();
+
+                    if (bucket == null) {
+                        log.debug("Local bucket is null");
+                        return null;
+                    }
+
+                    RoutingTable table = bucket.getData();
+                    if (table == null)
+                        table = new RoutingTable();
+
+                    table.setRouter(localRouter);
+                    table.removeRoute(routeId);
+
+                    bucket.setData(table);
+
+                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
+                    bucketStore.tell(updateBucketMessage, getSelf());
+                }
+                return null;
+            }
+        };
+    }
+
+    /**
+     * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
+     * it updates the local bucket in bucket store.
+     *
+     * @param routeId rpc to add
+     * @return
+     */
+    private Mapper<Object, Void> getMapperToAddRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+
+        return new Mapper<Object, Void>() {
+            @Override
+            public Void apply(Object replyMessage) {
+                if (replyMessage instanceof GetLocalBucketReply) {
+
+                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
+                    Bucket<RoutingTable> bucket = reply.getBucket();
+
+                    if (bucket == null) {
+                        log.debug("Local bucket is null");
+                        return null;
+                    }
+
+                    RoutingTable table = bucket.getData();
+                    if (table == null)
+                        table = new RoutingTable();
+
+                    table.setRouter(localRouter);
+                    table.addRoute(routeId);
+
+                    bucket.setData(table);
+
+                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
+                    bucketStore.tell(updateBucketMessage, getSelf());
+                }
+
+                return null;
+            }
+        };
+    }
+
+    /**
+     * All messages used by the RpcRegistry
+     */
+    public static class Messages {
+
+
+        public static class ContainsRoute {
+            final RpcRouter.RouteIdentifier<?,?,?> routeIdentifier;
+
+            public ContainsRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+                Preconditions.checkArgument(routeIdentifier != null);
+                this.routeIdentifier = routeIdentifier;
+            }
+
+            public RpcRouter.RouteIdentifier<?,?,?> getRouteIdentifier(){
+                return this.routeIdentifier;
+            }
+
+            @Override
+            public String toString() {
+                return this.getClass().getSimpleName() + "{" +
+                        "routeIdentifier=" + routeIdentifier +
+                        '}';
+            }
+        }
+
+        public static class AddOrUpdateRoute extends ContainsRoute{
+
+            public AddOrUpdateRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+                super(routeIdentifier);
+            }
+        }
+
+        public static class RemoveRoute extends ContainsRoute {
+
+            public RemoveRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+                super(routeIdentifier);
+            }
+        }
+
+        public static class SetLocalRouter{
+            private final ActorRef router;
+
+            public SetLocalRouter(ActorRef router) {
+                this.router = router;
+            }
+
+            public ActorRef getRouter(){
+                return this.router;
+            }
+
+            @Override
+            public String toString() {
+                return "SetLocalRouter{" +
+                        "router=" + router +
+                        '}';
+            }
+        }
+
+        public static class FindRouters extends ContainsRoute {
+            public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+                super(routeIdentifier);
+            }
+        }
+
+        public static class FindRoutersReply {
+            final List<Pair<ActorRef, Long>> routerWithUpdateTime;
+
+            public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
+                this.routerWithUpdateTime = routerWithUpdateTime;
+            }
+
+            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime(){
+                return routerWithUpdateTime;
+            }
+
+            @Override
+            public String toString() {
+                return "FindRoutersReply{" +
+                        "routerWithUpdateTime=" + routerWithUpdateTime +
+                        '}';
+            }
+        }
     }
-  }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOld.java
new file mode 100644 (file)
index 0000000..96c8802
--- /dev/null
@@ -0,0 +1,203 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
+import akka.japi.Creator;
+import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.ActorConstants;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This Actor maintains the routing table state and sync it with other nodes in the cluster.
+ *
+ * A scheduler runs after an interval of time, which pick a random member from the cluster
+ * and send the current state of routing table to the member.
+ *
+ * when a message of routing table data is received, it gets merged with the local routing table
+ * to keep the latest data.
+ */
+
+public class RpcRegistryOld extends AbstractUntypedActor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RpcRegistryOld.class);
+  private RoutingTableOld<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
+  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+  private final ClusterWrapper clusterWrapper;
+  private final ScheduledFuture<?> syncScheduler;
+
+  private RpcRegistryOld(ClusterWrapper clusterWrapper){
+    this.routingTable = new RoutingTableOld<>();
+    this.clusterWrapper = clusterWrapper;
+    this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
+  }
+
+  public static Props props(final ClusterWrapper clusterWrapper){
+    return Props.create(new Creator<RpcRegistryOld>(){
+
+      @Override
+      public RpcRegistryOld create() throws Exception {
+        return new RpcRegistryOld(clusterWrapper);
+      }
+    });
+  }
+
+  @Override
+  protected void handleReceive(Object message) throws Exception {
+    LOG.debug("Received message {}", message);
+    if(message instanceof RoutingTableData) {
+      syncRoutingTable((RoutingTableData) message);
+    } else if(message instanceof GetRoutedRpc) {
+      getRoutedRpc((GetRoutedRpc) message);
+    } else if(message instanceof GetRpc) {
+      getRpc((GetRpc) message);
+    } else if(message instanceof AddRpc) {
+      addRpc((AddRpc) message);
+    } else if(message instanceof RemoveRpc) {
+      removeRpc((RemoveRpc) message);
+    } else if(message instanceof AddRoutedRpc) {
+      addRoutedRpc((AddRoutedRpc) message);
+    } else if(message instanceof RemoveRoutedRpc) {
+      removeRoutedRpc((RemoveRoutedRpc) message);
+    }
+  }
+
+  private void getRoutedRpc(GetRoutedRpc rpcMsg){
+    LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
+    String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
+    GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+
+    getSender().tell(routedRpcReply, self());
+  }
+
+  private void getRpc(GetRpc rpcMsg) {
+    LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
+    String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
+    GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+
+    getSender().tell(rpcReply, self());
+  }
+
+  private void addRpc(AddRpc rpcMsg) {
+    LOG.debug("Add Rpc to routing table {}", rpcMsg);
+    routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+
+    getSender().tell("Success", self());
+  }
+
+  private void removeRpc(RemoveRpc rpcMsg) {
+    LOG.debug("Removing Rpc to routing table {}", rpcMsg);
+    routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+
+    getSender().tell("Success", self());
+  }
+
+  private void addRoutedRpc(AddRoutedRpc rpcMsg) {
+    routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
+    getSender().tell("Success", self());
+  }
+
+  private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
+    routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
+    getSender().tell("Success", self());
+  }
+
+  private void syncRoutingTable(RoutingTableData routingTableData) {
+    LOG.debug("Syncing routing table {}", routingTableData);
+
+    Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
+    Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
+    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+      routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+    }
+
+    Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
+        routingTableData.getRoutedRpcMap();
+    routeIds = newRoutedRpcMap.keySet();
+
+    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+      Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
+      for(String routeAddress : routeAddresses) {
+        routingTable.addRoutedRpc(routeId, routeAddress);
+      }
+    }
+  }
+
+  private ActorSelection getRandomRegistryActor() {
+    ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
+    ActorSelection actor = null;
+    Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
+    int memberSize = members.size();
+    // Don't select yourself
+    if(memberSize > 1) {
+      Address currentNodeAddress = clusterWrapper.getAddress();
+      int index = new Random().nextInt(memberSize);
+      int i = 0;
+      // keeping previous member, in case when random index member is same as current actor
+      // and current actor member is last in set
+      Member previousMember = null;
+      for(Member member : members){
+        if(i == index-1) {
+          previousMember = member;
+        }
+        if(i == index) {
+          if(!currentNodeAddress.equals(member.address())) {
+            actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
+            break;
+          } else if(index < memberSize-1){ // pick the next element in the set
+            index++;
+          }
+        }
+        i++;
+      }
+      if(actor == null && previousMember != null) {
+        actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
+      }
+    }
+    return actor;
+  }
+
+  private class SendRoutingTable implements Runnable {
+
+    @Override
+    public void run() {
+      RoutingTableData routingTableData =
+          new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
+      LOG.debug("Sending routing table for sync {}", routingTableData);
+      ActorSelection actor = getRandomRegistryActor();
+      if(actor != null) {
+        actor.tell(routingTableData, self());
+      }
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java
new file mode 100644 (file)
index 0000000..f5dfbc5
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+
+public interface Bucket<T extends Copier<T>> {
+    public Long getVersion();
+    public T getData();
+    public void setData(T data);
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
new file mode 100644 (file)
index 0000000..3cdd924
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import java.io.Serializable;
+
+public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
+
+    private Long version = System.currentTimeMillis();;
+
+    private T data;
+
+    @Override
+    public Long getVersion() {
+        return version;
+    }
+
+    @Override
+    public T getData() {
+        if (this.data == null)
+            return null;
+
+        return data.copy();
+    }
+
+    public void setData(T data){
+        this.version = System.currentTimeMillis()+1;
+        this.data = data;
+    }
+
+    @Override
+    public String toString() {
+        return "BucketImpl{" +
+                "version=" + version +
+                ", data=" + data +
+                '}';
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
new file mode 100644 (file)
index 0000000..2f634ce
--- /dev/null
@@ -0,0 +1,273 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.ActorRef;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.cluster.Cluster;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+
+/**
+ * A store that syncs its data across nodes in the cluster.
+ * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
+ * A node can write ONLY to its bucket. This way, write conflicts are avoided.
+ * <p>
+ * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
+ * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
+ *
+ */
+public class BucketStore extends UntypedActor {
+
+    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+    /**
+     * Bucket owned by the node
+     */
+    private BucketImpl localBucket = new BucketImpl();;
+
+    /**
+     * Buckets ownded by other known nodes in the cluster
+     */
+    private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+
+    /**
+     * Bucket version for every known node in the cluster including this node
+     */
+    private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+
+    /**
+     * Cluster address for this node
+     */
+    private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
+
+    /**
+     * Our private gossiper
+     */
+    private ActorRef gossiper;
+
+    public BucketStore(){
+        gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
+    }
+
+    /**
+     * This constructor is useful for testing.
+     * TODO: Pass Props instead of ActorRef
+     *
+     * @param gossiper
+     */
+    public BucketStore(ActorRef gossiper){
+        this.gossiper = gossiper;
+    }
+
+    @Override
+    public void onReceive(Object message) throws Exception {
+
+        log.debug("Received message: node[{}], message[{}]", selfAddress, message);
+
+        if (message instanceof UpdateBucket)
+            receiveUpdateBucket(((UpdateBucket) message).getBucket());
+
+        else if (message instanceof GetAllBuckets)
+            receiveGetAllBucket();
+
+        else if (message instanceof GetLocalBucket)
+            receiveGetLocalBucket();
+
+        else if (message instanceof GetBucketsByMembers)
+            receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
+
+        else if (message instanceof GetBucketVersions)
+            receiveGetBucketVersions();
+
+        else if (message instanceof UpdateRemoteBuckets)
+            receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
+
+        else {
+            log.debug("Unhandled message [{}]", message);
+            unhandled(message);
+        }
+
+    }
+
+    /**
+     * Returns a copy of bucket owned by this node
+     */
+    private void receiveGetLocalBucket() {
+        final ActorRef sender = getSender();
+        GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
+        sender.tell(reply, getSelf());
+    }
+
+    /**
+     * Updates the bucket owned by this node
+     *
+     * @param updatedBucket
+     */
+    void receiveUpdateBucket(Bucket updatedBucket){
+
+        localBucket = (BucketImpl) updatedBucket;
+        versions.put(selfAddress, localBucket.getVersion());
+    }
+
+    /**
+     * Returns all the buckets the this node knows about, self owned + remote
+     */
+    void receiveGetAllBucket(){
+        final ActorRef sender = getSender();
+        sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
+    }
+
+    /**
+     * Helper to collect all known buckets
+     *
+     * @return self owned + remote buckets
+     */
+    Map<Address, Bucket> getAllBuckets(){
+        Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
+
+        //first add the local bucket
+        all.put(selfAddress, localBucket);
+
+        //then get all remote buckets
+        all.putAll(remoteBuckets);
+
+        return all;
+    }
+
+    /**
+     * Returns buckets for requested members that this node knows about
+     *
+     * @param members requested members
+     */
+    void receiveGetBucketsByMembers(Set<Address> members){
+        final ActorRef sender = getSender();
+        Map<Address, Bucket> buckets = getBucketsByMembers(members);
+        sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
+    }
+
+    /**
+     * Helper to collect buckets for requested memebers
+     *
+     * @param members requested members
+     * @return buckets for requested memebers
+     */
+    Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
+        Map<Address, Bucket> buckets = new HashMap<>();
+
+        //first add the local bucket if asked
+        if (members.contains(selfAddress))
+            buckets.put(selfAddress, localBucket);
+
+        //then get buckets for requested remote nodes
+        for (Address address : members){
+            if (remoteBuckets.containsKey(address))
+                buckets.put(address, remoteBuckets.get(address));
+        }
+
+        return buckets;
+    }
+
+    /**
+     * Returns versions for all buckets known
+     */
+    void receiveGetBucketVersions(){
+        final ActorRef sender = getSender();
+        GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
+        sender.tell(reply, getSelf());
+    }
+
+    /**
+     * Update local copy of remote buckets where local copy's version is older
+     *
+     * @param receivedBuckets buckets sent by remote
+     *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
+     */
+    void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
+
+        if (receivedBuckets == null || receivedBuckets.isEmpty())
+            return; //nothing to do
+
+        //Remote cant update self's bucket
+        receivedBuckets.remove(selfAddress);
+
+        for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
+
+            Long localVersion = versions.get(entry.getKey());
+            if (localVersion == null) localVersion = -1L;
+
+            Bucket receivedBucket = entry.getValue();
+
+            if (receivedBucket == null)
+                continue;
+
+            Long remoteVersion = receivedBucket.getVersion();
+            if (remoteVersion == null) remoteVersion = -1L;
+
+            //update only if remote version is newer
+            if ( remoteVersion > localVersion ) {
+                remoteBuckets.put(entry.getKey(), receivedBucket);
+                versions.put(entry.getKey(), remoteVersion);
+            }
+        }
+
+        log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+    }
+
+    ///
+    ///Getter Setters
+    ///
+
+    BucketImpl getLocalBucket() {
+        return localBucket;
+    }
+
+    void setLocalBucket(BucketImpl localBucket) {
+        this.localBucket = localBucket;
+    }
+
+    ConcurrentMap<Address, Bucket> getRemoteBuckets() {
+        return remoteBuckets;
+    }
+
+    void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
+        this.remoteBuckets = remoteBuckets;
+    }
+
+    ConcurrentMap<Address, Long> getVersions() {
+        return versions;
+    }
+
+    void setVersions(ConcurrentMap<Address, Long> versions) {
+        this.versions = versions;
+    }
+
+    Address getSelfAddress() {
+        return selfAddress;
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Copier.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Copier.java
new file mode 100644 (file)
index 0000000..45279eb
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+/**
+ * Type of data that goes in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
+ * The implementers should do deep cloning in copy() method.
+ */
+public interface Copier<T> {
+    public T copy();
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
new file mode 100644 (file)
index 0000000..0b64136
--- /dev/null
@@ -0,0 +1,437 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.actor.Cancellable;
+import akka.actor.UntypedActor;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
+import akka.dispatch.Mapper;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.pattern.Patterns;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+
+/**
+ * Gossiper that syncs bucket store across nodes in the cluster.
+ * <p>
+ * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions
+ * to a randomly selected remote gossiper.
+ * <p>
+ * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions.
+ * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a
+ * gossip status is sent to remote gossiper so that it can send the newer buckets.
+ * <p>
+ * When a bucket is received from a remote gossiper, its sent to the bucket store for update.
+ *
+ */
+
+public class Gossiper extends UntypedActor {
+
+    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+    Cluster cluster = Cluster.get(getContext().system());
+
+    /**
+     * ActorSystem's address for the current cluster node.
+     */
+    private Address selfAddress = cluster.selfAddress();
+
+    /**
+     * All known cluster members
+     */
+    private List<Address> clusterMembers = new ArrayList<>();
+
+    private Cancellable gossipTask;
+
+    private Boolean autoStartGossipTicks = true;
+
+    public Gossiper(){}
+
+    /**
+     * Helpful for testing
+     * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent.
+     */
+    public Gossiper(Boolean autoStartGossipTicks){
+        this.autoStartGossipTicks = autoStartGossipTicks;
+    }
+
+    @Override
+    public void preStart(){
+
+        cluster.subscribe(getSelf(),
+                          ClusterEvent.initialStateAsEvents(),
+                          ClusterEvent.MemberEvent.class,
+                          ClusterEvent.UnreachableMember.class);
+
+        if (autoStartGossipTicks) {
+            gossipTask = getContext().system().scheduler().schedule(
+                    new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
+                    new FiniteDuration(500, TimeUnit.MILLISECONDS),         //interval
+                    getSelf(),                                       //target
+                    new Messages.GossiperMessages.GossipTick(),      //message
+                    getContext().dispatcher(),                       //execution context
+                    getSelf()                                        //sender
+            );
+        }
+    }
+
+    @Override
+    public void postStop(){
+        if (cluster != null)
+            cluster.unsubscribe(getSelf());
+        if (gossipTask != null)
+            gossipTask.cancel();
+    }
+
+    @Override
+    public void onReceive(Object message) throws Exception {
+
+        log.debug("Received message: node[{}], message[{}]", selfAddress, message);
+
+        //Usually sent by self via gossip task defined above. But its not enforced.
+        //These ticks can be sent by another actor as well which is esp. useful while testing
+        if (message instanceof GossipTick)
+            receiveGossipTick();
+
+        //Message from remote gossiper with its bucket versions
+        else if (message instanceof GossipStatus)
+            receiveGossipStatus((GossipStatus) message);
+
+        //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
+        //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
+        //message with its local versions
+        else if (message instanceof GossipEnvelope)
+            receiveGossip((GossipEnvelope) message);
+
+        else if (message instanceof ClusterEvent.MemberUp) {
+            receiveMemberUp(((ClusterEvent.MemberUp) message).member());
+
+        } else if (message instanceof ClusterEvent.MemberRemoved) {
+            receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
+
+        } else if ( message instanceof ClusterEvent.UnreachableMember){
+            receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
+
+        } else
+            unhandled(message);
+    }
+
+    /**
+     * Remove member from local copy of member list. If member down is self, then stop the actor
+     *
+     * @param member who went down
+     */
+    void receiveMemberRemoveOrUnreachable(Member member) {
+        //if its self, then stop itself
+        if (selfAddress.equals(member.address())){
+            getContext().stop(getSelf());
+            return;
+        }
+
+        clusterMembers.remove(member.address());
+        log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+    }
+
+    /**
+     * Add member to the local copy of member list if it doesnt already
+     * @param member
+     */
+    void receiveMemberUp(Member member) {
+
+        if (selfAddress.equals(member.address()))
+            return; //ignore up notification for self
+
+        if (!clusterMembers.contains(member.address()))
+            clusterMembers.add(member.address());
+
+        log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+    }
+
+    /**
+     * Sends Gossip status to other members in the cluster. <br/>
+     * 1. If there are no member, ignore the tick. </br>
+     * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
+     * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
+     */
+    void receiveGossipTick(){
+        if (clusterMembers.size() == 0) return; //no members to send gossip status to
+
+        Address remoteMemberToGossipTo = null;
+
+        if (clusterMembers.size() == 1)
+            remoteMemberToGossipTo = clusterMembers.get(0);
+        else {
+            Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+            remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+        }
+
+        log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+        getLocalStatusAndSendTo(remoteMemberToGossipTo);
+    }
+
+    /**
+     * Process gossip status received from a remote gossiper. Remote versions are compared with
+     * the local copy. <p>
+     *
+     * For each bucket
+     * <ul>
+     *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
+     *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
+     *  <li>If both are same, noop</li>
+     * </ul>
+     *
+     * @param status bucket versions from a remote member
+     */
+    void receiveGossipStatus(GossipStatus status){
+        //Dont want to accept messages from non-members
+        if (!clusterMembers.contains(status.from()))
+            return;
+
+        final ActorRef sender = getSender();
+
+        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+
+        futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
+
+    }
+
+    /**
+     * Sends the received buckets in the envelope to the parent Bucket store.
+     *
+     * @param envelope contains buckets from a remote gossiper
+     */
+    void receiveGossip(GossipEnvelope envelope){
+        //TODO: Add more validations
+        if (!selfAddress.equals(envelope.to())) {
+            log.info("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+            return;
+        }
+        if (envelope.getBuckets() == null)
+            return; //nothing to do
+
+        updateRemoteBuckets(envelope.getBuckets());
+
+    }
+
+    /**
+     * Helper to send received buckets to bucket store
+     *
+     * @param buckets
+     */
+    void updateRemoteBuckets(Map<Address, Bucket> buckets) {
+
+        if (buckets == null || buckets.isEmpty())
+            return; //nothing to merge
+
+        UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
+
+        getContext().parent().tell(updateRemoteBuckets, getSelf());
+    }
+
+    /**
+     * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
+     *
+     * @param remote     remote node to send Buckets to
+     * @param addresses  node addresses whose buckets needs to be sent
+     */
+    void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
+
+        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
+
+        futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
+
+    }
+
+    /**
+     * Gets bucket versions from bucket store and sends to the supplied address
+     *
+     * @param remoteActorSystemAddress remote gossiper to send to
+     */
+    void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
+
+        //Get local status from bucket store and send to remote
+        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+
+        ActorSelection remoteRef = getContext().system().actorSelection(
+                remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
+
+        log.debug("Sending bucket versions to [{}]", remoteRef);
+
+        futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
+
+    }
+
+    /**
+     * Helper to send bucket versions received from local store
+     * @param remote        remote gossiper to send versions to
+     * @param localVersions bucket versions received from local store
+     */
+    void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
+
+        GossipStatus status = new GossipStatus(selfAddress, localVersions);
+        remote.tell(status, getSelf());
+    }
+
+    void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
+
+        GossipStatus status = new GossipStatus(selfAddress, localVersions);
+        remote.tell(status, getSelf());
+    }
+
+    ///
+    /// Private factories to create mappers
+    ///
+
+    private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
+
+        return new Mapper<Object, Void>() {
+            @Override
+            public Void apply(Object replyMessage) {
+                if (replyMessage instanceof GetBucketVersionsReply) {
+                    GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
+                    Map<Address, Long> localVersions = reply.getVersions();
+
+                    sendGossipStatusTo(remote, localVersions);
+
+                }
+                return null;
+            }
+        };
+    }
+
+    /**
+     * Process bucket versions received from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
+     * Then this method compares remote bucket versions with local bucket versions.
+     * <ul>
+     *     <li>The buckets that are newer locally, send
+     *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} to remote
+     *     <li>The buckets that are older locally, send
+     *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} to remote so that
+     *     remote sends GossipEnvelop.
+     * </ul>
+     *
+     * @param sender the remote member
+     * @param status bucket versions from a remote member
+     * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
+     *
+     */
+    private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
+
+        final Map<Address, Long> remoteVersions = status.getVersions();
+
+        return new Mapper<Object, Void>() {
+            @Override
+            public Void apply(Object replyMessage) {
+                if (replyMessage instanceof GetBucketVersionsReply) {
+                    GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
+                    Map<Address, Long> localVersions = reply.getVersions();
+
+                    //diff between remote list and local
+                    Set<Address> localIsOlder = new HashSet<>();
+                    localIsOlder.addAll(remoteVersions.keySet());
+                    localIsOlder.removeAll(localVersions.keySet());
+
+                    //diff between local list and remote
+                    Set<Address> localIsNewer = new HashSet<>();
+                    localIsNewer.addAll(localVersions.keySet());
+                    localIsNewer.removeAll(remoteVersions.keySet());
+
+
+                    for (Address address : remoteVersions.keySet()){
+
+                        if (localVersions.get(address) == null || remoteVersions.get(address) == null)
+                            continue; //this condition is taken care of by above diffs
+                        if (localVersions.get(address) <  remoteVersions.get(address))
+                            localIsOlder.add(address);
+                        else if (localVersions.get(address) > remoteVersions.get(address))
+                            localIsNewer.add(address);
+                        else
+                            continue;
+                    }
+
+                    if (!localIsOlder.isEmpty())
+                        sendGossipStatusTo(sender, localVersions );
+
+                    if (!localIsNewer.isEmpty())
+                        sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+
+                }
+                return null;
+            }
+        };
+    }
+
+    /**
+     * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} that contains
+     * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. These buckets are sent to a remote member encapsulated
+     * in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
+     *
+     * @param sender the remote member that sent
+     *               {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
+     *               in reply to which bucket is being sent back
+     * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
+     *
+     */
+    private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
+
+        return new Mapper<Object, Void>() {
+            @Override
+            public Void apply(Object msg) {
+                if (msg instanceof GetBucketsByMembersReply) {
+                    Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
+                    log.info("Buckets to send from {}: {}", selfAddress, buckets);
+                    GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
+                    sender.tell(envelope, getSelf());
+                }
+                return null;
+            }
+        };
+    }
+
+    ///
+    ///Getter Setters
+    ///
+    List<Address> getClusterMembers() {
+        return clusterMembers;
+    }
+
+    void setClusterMembers(List<Address> clusterMembers) {
+        this.clusterMembers = clusterMembers;
+    }
+
+    Address getSelfAddress() {
+        return selfAddress;
+    }
+
+    void setSelfAddress(Address selfAddress) {
+        this.selfAddress = selfAddress;
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java
new file mode 100644 (file)
index 0000000..9a247d9
--- /dev/null
@@ -0,0 +1,170 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.Address;
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * These messages are used by {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} and
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} actors.
+ */
+public class Messages {
+
+    public static class BucketStoreMessages{
+
+        public static class GetLocalBucket implements Serializable{}
+
+        public static class ContainsBucket implements Serializable {
+            final private Bucket bucket;
+
+            public ContainsBucket(Bucket bucket){
+                Preconditions.checkArgument(bucket != null, "bucket can not be null");
+                this.bucket = bucket;
+            }
+
+            public Bucket getBucket(){
+                return bucket;
+            }
+
+        }
+
+        public static class UpdateBucket extends ContainsBucket implements Serializable {
+            public UpdateBucket(Bucket bucket){
+                super(bucket);
+            }
+        }
+
+        public static class GetLocalBucketReply extends ContainsBucket implements Serializable {
+            public GetLocalBucketReply(Bucket bucket){
+                super(bucket);
+            }
+        }
+
+        public static class GetAllBuckets implements Serializable{}
+
+        public static class GetBucketsByMembers implements Serializable{
+            private Set<Address> members;
+
+            public GetBucketsByMembers(Set<Address> members){
+                Preconditions.checkArgument(members != null, "members can not be null");
+                this.members = members;
+            }
+
+            public Set<Address> getMembers() {
+                return new HashSet<>(members);
+            }
+        }
+
+        public static class ContainsBuckets implements Serializable{
+            private Map<Address, Bucket> buckets;
+
+            public ContainsBuckets(Map<Address, Bucket> buckets){
+                Preconditions.checkArgument(buckets != null, "buckets can not be null");
+                this.buckets = buckets;
+            }
+
+            public Map<Address, Bucket> getBuckets() {
+                Map<Address, Bucket> copy = new HashMap<>(buckets.size());
+
+                for (Map.Entry<Address, Bucket> entry : buckets.entrySet()){
+                    //ignore null entries
+                    if ( (entry.getKey() == null) || (entry.getValue() == null) )
+                        continue;
+                    copy.put(entry.getKey(), entry.getValue());
+                }
+                return new HashMap<>(copy);
+            }
+        }
+
+        public static class GetAllBucketsReply extends ContainsBuckets implements Serializable{
+            public GetAllBucketsReply(Map<Address, Bucket> buckets) {
+                super(buckets);
+            }
+        }
+
+        public static class GetBucketsByMembersReply extends ContainsBuckets implements Serializable{
+            public GetBucketsByMembersReply(Map<Address, Bucket> buckets) {
+                super(buckets);
+            }
+        }
+
+        public static class GetBucketVersions implements Serializable{}
+
+        public static class ContainsBucketVersions implements Serializable{
+            Map<Address, Long> versions;
+
+            public ContainsBucketVersions(Map<Address, Long> versions) {
+                Preconditions.checkArgument(versions != null, "versions can not be null");
+                this.versions = versions;
+            }
+
+            public Map<Address, Long> getVersions() {
+                return Collections.unmodifiableMap(versions);
+            }
+
+        }
+
+        public static class GetBucketVersionsReply extends ContainsBucketVersions implements Serializable{
+            public GetBucketVersionsReply(Map<Address, Long> versions) {
+                super(versions);
+            }
+        }
+
+        public static class UpdateRemoteBuckets extends ContainsBuckets implements Serializable{
+            public UpdateRemoteBuckets(Map<Address, Bucket> buckets) {
+                super(buckets);
+            }
+        }
+    }
+
+    public static class GossiperMessages{
+        public static class Tick implements Serializable {}
+
+        public static final class GossipTick extends Tick {}
+
+        public static final class GossipStatus extends BucketStoreMessages.ContainsBucketVersions implements Serializable{
+            private Address from;
+
+            public GossipStatus(Address from, Map<Address, Long> versions) {
+                super(versions);
+                this.from = from;
+            }
+
+            public Address from() {
+                return from;
+            }
+        }
+
+        public static final class GossipEnvelope extends BucketStoreMessages.ContainsBuckets implements Serializable {
+            private final Address from;
+            private final Address to;
+
+            public GossipEnvelope(Address from, Address to, Map<Address, Bucket> buckets) {
+                super(buckets);
+                this.to = to;
+                this.from = from;
+            }
+
+            public Address from() {
+                return from;
+            }
+
+            public Address to() {
+                return to;
+            }
+        }
+    }
+}
index 392c1e637d848e1ccc47d43b3a951011a2e25a72..55aa1d6c871d252d0a89ce1772fd078e45cac101 100644 (file)
@@ -25,7 +25,7 @@ import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
 import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistryOld;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.core.api.Broker;
@@ -69,7 +69,7 @@ public class RpcBrokerTest {
   @Test
   public void testInvokeRpcError() throws URISyntaxException {
     new JavaTestKit(system) {{
-      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+      ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class)));
       Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
       SchemaContext schemaContext = mock(SchemaContext.class);
       ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
@@ -100,7 +100,7 @@ public class RpcBrokerTest {
   @Test
   public void testInvokeRpc() throws URISyntaxException {
     new JavaTestKit(system) {{
-      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
+      ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(mock(ClusterWrapper.class)));
       Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
       SchemaContext schemaContext = mock(SchemaContext.class);
       ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
@@ -141,7 +141,7 @@ public class RpcBrokerTest {
   @Test
   public void testInvokeRoutedRpcError() throws URISyntaxException {
     new JavaTestKit(system) {{
-      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+      ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class)));
       Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
       SchemaContext schemaContext = mock(SchemaContext.class);
       ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
@@ -172,7 +172,7 @@ public class RpcBrokerTest {
   @Test
   public void testInvokeRoutedRpc() throws URISyntaxException {
     new JavaTestKit(system) {{
-      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
+      ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(mock(ClusterWrapper.class)));
       Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
       SchemaContext schemaContext = mock(SchemaContext.class);
       ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
@@ -19,10 +19,10 @@ import java.net.URISyntaxException;
 import java.util.HashSet;
 import java.util.Set;
 
-public class RoutingTableTest {
+public class RoutingTableOldTest {
 
-  private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable =
-      new RoutingTable<>();
+  private RoutingTableOld<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable =
+      new RoutingTableOld<>();
 
   @Test
   public void addGlobalRouteNullRouteIdTest() {
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOldTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryOldTest.java
new file mode 100644 (file)
index 0000000..0f711b4
--- /dev/null
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class RpcRegistryOldTest {
+
+  static ActorSystem system;
+
+
+  @BeforeClass
+  public static void setup() {
+    system = ActorSystem.create();
+  }
+
+  @AfterClass
+  public static void teardown() {
+    JavaTestKit.shutdownActorSystem(system);
+    system = null;
+  }
+
+  /**
+   This test add, read and remove an entry in global rpc
+   */
+  @Test
+  public void testGlobalRpc() throws URISyntaxException {
+    new JavaTestKit(system) {{
+      ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class)));
+      QName type = new QName(new URI("actor1"), "actor1");
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+      final String route = "actor1";
+
+      AddRpc rpcMsg = new AddRpc(routeId, route);
+      rpcRegistry.tell(rpcMsg, getRef());
+      expectMsgEquals(duration("2 second"), "Success");
+
+      GetRpc getRpc = new GetRpc(routeId);
+      rpcRegistry.tell(getRpc, getRef());
+
+      Boolean getMsg = new ExpectMsg<Boolean>("GetRpcReply") {
+        protected Boolean match(Object in) {
+          if (in instanceof GetRpcReply) {
+            GetRpcReply reply = (GetRpcReply)in;
+            return route.equals(reply.getRoutePath());
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get(); // this extracts the received message
+
+      Assert.assertTrue(getMsg);
+
+      RemoveRpc removeMsg = new RemoveRpc(routeId);
+      rpcRegistry.tell(removeMsg, getRef());
+      expectMsgEquals(duration("2 second"), "Success");
+
+      rpcRegistry.tell(getRpc, getRef());
+
+      Boolean getNullMsg = new ExpectMsg<Boolean>("GetRpcReply") {
+        protected Boolean match(Object in) {
+          if (in instanceof GetRpcReply) {
+            GetRpcReply reply = (GetRpcReply)in;
+            return reply.getRoutePath() == null;
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get();
+      Assert.assertTrue(getNullMsg);
+    }};
+
+  }
+
+  /**
+   This test add, read and remove an entry in routed rpc
+   */
+  @Test
+  public void testRoutedRpc() throws URISyntaxException {
+    new JavaTestKit(system) {{
+      ActorRef rpcRegistry = system.actorOf(RpcRegistryOld.props(Mockito.mock(ClusterWrapper.class)));
+      QName type = new QName(new URI("actor1"), "actor1");
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+      final String route = "actor1";
+
+      Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
+      routeIds.add(routeId);
+
+      AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
+      rpcRegistry.tell(rpcMsg, getRef());
+      expectMsgEquals(duration("2 second"), "Success");
+
+      GetRoutedRpc getRpc = new GetRoutedRpc(routeId);
+      rpcRegistry.tell(getRpc, getRef());
+
+      Boolean getMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
+        protected Boolean match(Object in) {
+          if (in instanceof GetRoutedRpcReply) {
+            GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
+            return route.equals(reply.getRoutePath());
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get(); // this extracts the received message
+
+      Assert.assertTrue(getMsg);
+
+      RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route);
+      rpcRegistry.tell(removeMsg, getRef());
+      expectMsgEquals(duration("2 second"), "Success");
+
+      rpcRegistry.tell(getRpc, getRef());
+
+      Boolean getNullMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
+        protected Boolean match(Object in) {
+          if (in instanceof GetRoutedRpcReply) {
+            GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
+            return reply.getRoutePath() == null;
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get();
+      Assert.assertTrue(getNullMsg);
+    }};
+
+  }
+
+}
index d011d331a684e4f0bcbbbf395e18145364dfdf4a..ab609413dd82731466495365d0c46f28731fcdc0 100644 (file)
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
 package org.opendaylight.controller.remote.rpc.registry;
 
+import akka.actor.ActorPath;
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.actor.ChildActorPath;
+import akka.actor.Props;
+import akka.japi.Pair;
 import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+import com.typesafe.config.ConfigFactory;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.yangtools.yang.common.QName;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 
 public class RpcRegistryTest {
 
-  static ActorSystem system;
-
-
-  @BeforeClass
-  public static void setup() {
-    system = ActorSystem.create();
-  }
-
-  @AfterClass
-  public static void teardown() {
-    JavaTestKit.shutdownActorSystem(system);
-    system = null;
-  }
-
-  /**
-   This test add, read and remove an entry in global rpc
-   */
-  @Test
-  public void testGlobalRpc() throws URISyntaxException {
-    new JavaTestKit(system) {{
-      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
-      QName type = new QName(new URI("actor1"), "actor1");
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
-      final String route = "actor1";
-
-      AddRpc rpcMsg = new AddRpc(routeId, route);
-      rpcRegistry.tell(rpcMsg, getRef());
-      expectMsgEquals(duration("2 second"), "Success");
-
-      GetRpc getRpc = new GetRpc(routeId);
-      rpcRegistry.tell(getRpc, getRef());
-
-      Boolean getMsg = new ExpectMsg<Boolean>("GetRpcReply") {
-        protected Boolean match(Object in) {
-          if (in instanceof GetRpcReply) {
-            GetRpcReply reply = (GetRpcReply)in;
-            return route.equals(reply.getRoutePath());
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get(); // this extracts the received message
+    private static ActorSystem node1;
+    private static ActorSystem node2;
+    private static ActorSystem node3;
+
+    private ActorRef registry1;
+    private ActorRef registry2;
+    private ActorRef registry3;
+
+    @BeforeClass
+    public static void setup() throws InterruptedException {
+        Thread.sleep(1000); //give some time for previous test to close netty ports
+        node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
+        node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
+        node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
+    }
+
+    @AfterClass
+    public static void teardown(){
+        JavaTestKit.shutdownActorSystem(node1);
+        JavaTestKit.shutdownActorSystem(node2);
+        JavaTestKit.shutdownActorSystem(node3);
+        if (node1 != null)
+            node1.shutdown();
+        if (node2 != null)
+            node2.shutdown();
+        if (node3 != null)
+            node3.shutdown();
+
+    }
+
+    @Before
+    public void createRpcRegistry() throws InterruptedException {
+        registry1 = node1.actorOf(Props.create(RpcRegistry.class));
+        registry2 = node2.actorOf(Props.create(RpcRegistry.class));
+        registry3 = node3.actorOf(Props.create(RpcRegistry.class));
+    }
+
+    @After
+    public void stopRpcRegistry() throws InterruptedException {
+        if (registry1 != null)
+            node1.stop(registry1);
+        if (registry2 != null)
+            node2.stop(registry2);
+        if (registry3 != null)
+            node3.stop(registry3);
+    }
+
+    /**
+     * One node cluster.
+     * Register rpc. Ensure router can be found
+     *
+     * @throws URISyntaxException
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWhenRpcAddedOneNodeShouldAppearOnSameNode() throws URISyntaxException, InterruptedException {
+
+        final JavaTestKit mockBroker = new JavaTestKit(node1);
+
+        //Add rpc on node 1
+        registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
+        registry1.tell(getAddRouteMessage(), mockBroker.getRef());
+
+        Thread.sleep(1000);//
+
+        //find the route on node 1's registry
+        registry1.tell(new FindRouters(createRouteId()), mockBroker.getRef());
+        FindRoutersReply message = mockBroker.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
+        List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
+
+        validateRouterReceived(pairs, mockBroker.getRef());
+    }
+
+    /**
+     * Three node cluster.
+     * Register rpc on 1 node. Ensure its router can be found on other 2.
+     *
+     * @throws URISyntaxException
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWhenRpcAddedOneNodeShouldAppearOnAnother() throws URISyntaxException, InterruptedException {
+
+        validateSystemStartup();
+
+        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+        final JavaTestKit mockBroker3 = new JavaTestKit(node3);
+
+        //Add rpc on node 1
+        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+        registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+
+        Thread.sleep(5000);// give some time for bucket store data sync
+
+        //find the route in node 2's registry
+        registry2.tell(new FindRouters(createRouteId()), mockBroker2.getRef());
+        FindRoutersReply message = mockBroker2.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
+        List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
+
+        validateRouterReceived(pairs, mockBroker1.getRef());
+
+        //find the route in node 3's registry
+        registry3.tell(new FindRouters(createRouteId()), mockBroker3.getRef());
+        message = mockBroker3.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
+        pairs = message.getRouterWithUpdateTime();
+
+        validateRouterReceived(pairs, mockBroker1.getRef());
+
+    }
+
+    /**
+     * Three node cluster.
+     * Register rpc on 2 nodes. Ensure 2 routers are found on 3rd.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testAnRpcAddedOnMultiNodesShouldReturnMultiRouter() throws Exception {
+
+        validateSystemStartup();
+
+        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+        final JavaTestKit mockBroker3 = new JavaTestKit(node3);
+
+        //Thread.sleep(5000);//let system come up
+
+        //Add rpc on node 1
+        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+        registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+
+        //Add same rpc on node 2
+        registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
+        registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+
+        registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
+        Thread.sleep(5000);// give some time for bucket store data sync
+
+        //find the route in node 3's registry
+        registry3.tell(new FindRouters(createRouteId()), mockBroker3.getRef());
+        FindRoutersReply message = mockBroker3.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
+        List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
 
-      Assert.assertTrue(getMsg);
+        validateMultiRouterReceived(pairs, mockBroker1.getRef(), mockBroker2.getRef());
 
-      RemoveRpc removeMsg = new RemoveRpc(routeId);
-      rpcRegistry.tell(removeMsg, getRef());
-      expectMsgEquals(duration("2 second"), "Success");
+    }
 
-      rpcRegistry.tell(getRpc, getRef());
+    private void validateMultiRouterReceived(List<Pair<ActorRef, Long>> actual, ActorRef... expected) {
+        Assert.assertTrue(actual != null);
+        Assert.assertTrue(actual.size() == expected.length);
+    }
 
-      Boolean getNullMsg = new ExpectMsg<Boolean>("GetRpcReply") {
-        protected Boolean match(Object in) {
-          if (in instanceof GetRpcReply) {
-            GetRpcReply reply = (GetRpcReply)in;
-            return reply.getRoutePath() == null;
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get();
-      Assert.assertTrue(getNullMsg);
-    }};
-
-  }
-
-  /**
-   This test add, read and remove an entry in routed rpc
-   */
-  @Test
-  public void testRoutedRpc() throws URISyntaxException {
-    new JavaTestKit(system) {{
-      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
-      QName type = new QName(new URI("actor1"), "actor1");
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
-      final String route = "actor1";
-
-      Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
-      routeIds.add(routeId);
-
-      AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
-      rpcRegistry.tell(rpcMsg, getRef());
-      expectMsgEquals(duration("2 second"), "Success");
-
-      GetRoutedRpc getRpc = new GetRoutedRpc(routeId);
-      rpcRegistry.tell(getRpc, getRef());
-
-      Boolean getMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
-        protected Boolean match(Object in) {
-          if (in instanceof GetRoutedRpcReply) {
-            GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
-            return route.equals(reply.getRoutePath());
-          } else {
-            throw noMatch();
-          }
+    private void validateRouterReceived(List<Pair<ActorRef, Long>> actual, ActorRef expected){
+        Assert.assertTrue(actual != null);
+        Assert.assertTrue(actual.size() == 1);
+
+        for (Pair<ActorRef, Long> pair : actual){
+            Assert.assertTrue(expected.path().uid() == pair.first().path().uid());
         }
-      }.get(); // this extracts the received message
+    }
 
-      Assert.assertTrue(getMsg);
+    private void validateSystemStartup() throws InterruptedException {
 
-      RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route);
-      rpcRegistry.tell(removeMsg, getRef());
-      expectMsgEquals(duration("2 second"), "Success");
+        Thread.sleep(5000);
+        ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper");
+        ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper");
+        ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper");
 
-      rpcRegistry.tell(getRpc, getRef());
+        ActorSelection gossiper1 = node1.actorSelection(gossiper1Path);
+        ActorSelection gossiper2 = node2.actorSelection(gossiper2Path);
+        ActorSelection gossiper3 = node3.actorSelection(gossiper3Path);
 
-      Boolean getNullMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
-        protected Boolean match(Object in) {
-          if (in instanceof GetRoutedRpcReply) {
-            GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
-            return reply.getRoutePath() == null;
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get();
-      Assert.assertTrue(getNullMsg);
-    }};
 
-  }
+        if (!resolveReference(gossiper1, gossiper2, gossiper3))
+            Assert.fail("Could not find gossipers");
+    }
 
-}
+    private Boolean resolveReference(ActorSelection... gossipers) throws InterruptedException {
+
+        Boolean resolved = true;
+
+        for (int i=0; i< 5; i++) {
+            Thread.sleep(1000);
+            for (ActorSelection gossiper : gossipers) {
+                Future<ActorRef> future = gossiper.resolveOne(new FiniteDuration(5000, TimeUnit.MILLISECONDS));
+
+                ActorRef ref = null;
+                try {
+                    ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+
+                if (ref == null)
+                    resolved = false;
+            }
+
+            if (resolved) break;
+        }
+        return resolved;
+    }
+
+    private AddOrUpdateRoute getAddRouteMessage() throws URISyntaxException {
+        return new AddOrUpdateRoute(createRouteId());
+    }
+
+    private RpcRouter.RouteIdentifier<?,?,?> createRouteId() throws URISyntaxException {
+        QName type = new QName(new URI("/mockrpc"), "mockrpc");
+        return new RouteIdentifierImpl(null, type, null);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java
new file mode 100644 (file)
index 0000000..7e87da0
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import com.typesafe.config.ConfigFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.TerminationMonitor;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+
+public class BucketStoreTest {
+
+    private static ActorSystem system;
+    private static BucketStore store;
+
+    private BucketStore mockStore;
+
+    @BeforeClass
+    public static void setup() {
+
+        system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
+        system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+
+        store = createStore();
+    }
+
+    @AfterClass
+    public static void teardown() {
+        system.shutdown();
+    }
+
+    @Before
+    public void createMocks(){
+        mockStore = spy(store);
+    }
+
+    @After
+    public void resetMocks(){
+        reset(mockStore);
+    }
+
+    @Test
+    public void testReceiveUpdateBucket_WhenInputBucketShouldUpdateVersion(){
+        Bucket bucket = new BucketImpl();
+        Long expectedVersion = bucket.getVersion();
+
+        mockStore.receiveUpdateBucket(bucket);
+
+        Assert.assertEquals(bucket, mockStore.getLocalBucket());
+        Assert.assertEquals(expectedVersion, mockStore.getLocalBucket().getVersion());
+    }
+
+    /**
+     * Create BucketStore actor and returns the underlying instance of BucketStore class.
+     *
+     * @return instance of BucketStore class
+     */
+    private static BucketStore createStore(){
+        TestProbe mockActor = new TestProbe(system);
+        ActorRef mockGossiper = mockActor.ref();
+        final Props props = Props.create(BucketStore.class, mockGossiper);
+        final TestActorRef<BucketStore> testRef = TestActorRef.create(system, props, "testStore");
+
+        return testRef.underlyingActor();
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java
new file mode 100644 (file)
index 0000000..d862dcb
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.typesafe.config.ConfigFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.TerminationMonitor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+
+
+public class GossiperTest {
+
+    private static ActorSystem system;
+    private static Gossiper gossiper;
+
+    private Gossiper mockGossiper;
+
+    @BeforeClass
+    public static void setup() throws InterruptedException {
+        Thread.sleep(1000);//give some time for previous test to stop the system. Netty port conflict arises otherwise.
+        system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
+        system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+
+        gossiper = createGossiper();
+    }
+
+    @AfterClass
+    public static void teardown() {
+        if (system != null)
+            system.shutdown();
+    }
+
+    @Before
+    public void createMocks(){
+        mockGossiper = spy(gossiper);
+    }
+
+    @After
+    public void resetMocks(){
+        reset(mockGossiper);
+
+    }
+
+    @Test
+    public void testReceiveGossipTick_WhenNoRemoteMemberShouldIgnore(){
+
+        mockGossiper.setClusterMembers(Collections.EMPTY_LIST);
+        doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
+        mockGossiper.receiveGossipTick();
+        verify(mockGossiper, times(0)).getLocalStatusAndSendTo(any(Address.class));
+    }
+
+    @Test
+    public void testReceiveGossipTick_WhenRemoteMemberExistsShouldSendStatus(){
+        List<Address> members = new ArrayList<>();
+        Address remote = new Address("tcp", "member");
+        members.add(remote);
+
+        mockGossiper.setClusterMembers(members);
+        doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
+        mockGossiper.receiveGossipTick();
+        verify(mockGossiper, times(1)).getLocalStatusAndSendTo(any(Address.class));
+    }
+
+    @Test
+    public void testReceiveGossipStatus_WhenSenderIsNonMemberShouldIgnore(){
+
+        Address nonMember = new Address("tcp", "non-member");
+        GossipStatus remoteStatus = new GossipStatus(nonMember, mock(Map.class));
+
+        //add a member
+        List<Address> members = new ArrayList<>();
+        members.add(new Address("tcp", "member"));
+
+        mockGossiper.setClusterMembers(members);
+        mockGossiper.receiveGossipStatus(remoteStatus);
+        verify(mockGossiper, times(0)).getSender();
+    }
+
+    @Test
+    public void testReceiveGossip_WhenNotAddressedToSelfShouldIgnore(){
+        Address notSelf = new Address("tcp", "not-self");
+
+        GossipEnvelope envelope = new GossipEnvelope(notSelf, notSelf, mock(Map.class));
+        doNothing().when(mockGossiper).updateRemoteBuckets(anyMap());
+        mockGossiper.receiveGossip(envelope);
+        verify(mockGossiper, times(0)).updateRemoteBuckets(anyMap());
+    }
+
+    @Test
+    public void testUpdateRemoteBuckets_WhenNoBucketShouldIgnore(){
+
+        mockGossiper.updateRemoteBuckets(null);
+        verify(mockGossiper, times(0)).getContext();
+
+        Map<Address, Bucket> empty = Collections.emptyMap();
+        mockGossiper.updateRemoteBuckets(empty);
+        verify(mockGossiper, times(0)).getContext();
+    }
+
+    /**
+     * Create Gossiper actor and return the underlying instance of Gossiper class.
+     *
+     * @return instance of Gossiper class
+     */
+    private static Gossiper createGossiper(){
+
+        final Props props = Props.create(Gossiper.class, false);
+        final TestActorRef<Gossiper> testRef = TestActorRef.create(system, props, "testGossiper");
+
+        return testRef.underlyingActor();
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf
new file mode 100644 (file)
index 0000000..fbfb0e1
--- /dev/null
@@ -0,0 +1,116 @@
+odl-cluster{
+  akka {
+    loglevel = "INFO"
+    #log-config-on-start = on
+
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      debug{
+        #autoreceive = on
+        #lifecycle = on
+
+      }
+    }
+    remote {
+      log-received-messages = on
+      log-sent-messages = on
+
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "localhost"
+        port = 2551
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
+unit-test{
+  akka {
+    loglevel = "INFO"
+
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+    }
+  }
+}
+
+memberA{
+  akka {
+    loglevel = "INFO"
+
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+    }
+    remote {
+      log-received-messages = off
+      log-sent-messages = off
+
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "localhost"
+        port = 2551
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
+memberB{
+  akka {
+    loglevel = "INFO"
+
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+    }
+    remote {
+      log-received-messages = off
+      log-sent-messages = off
+
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "localhost"
+        port = 2552
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
+memberC{
+  akka {
+    loglevel = "INFO"
+
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+    }
+    remote {
+      log-received-messages = off
+      log-sent-messages = off
+
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "localhost"
+        port = 2553
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-rpc@localhost:2551"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
\ No newline at end of file