import akka.actor.ActorRef;
import akka.japi.Option;
import akka.japi.Pair;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
public class RoutingTable implements Copier<RoutingTable>, Serializable {
private static final long serialVersionUID = 1L;
- private Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
+ private final Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
private ActorRef router;
@Override
public RoutingTable copy() {
RoutingTable copy = new RoutingTable();
- copy.setTable(new HashMap<>(table));
+ copy.table.putAll(table);
copy.setRouter(this.getRouter());
return copy;
public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
Long updatedTime = table.get(routeId);
- if (updatedTime == null || router == null)
+ if (updatedTime == null || router == null) {
return Option.none();
- else
+ } else {
return Option.option(new Pair<>(router, updatedTime));
+ }
}
public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
table.remove(routeId);
}
- public Boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+ public boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
return table.containsKey(routeId);
}
- public Boolean isEmpty(){
+ public boolean isEmpty(){
return table.isEmpty();
}
- ///
- /// Getter, Setters
- ///
- //TODO: Remove public
- public Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> getTable() {
- return table;
- }
- void setTable(Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table) {
- this.table = table;
+ public int size() {
+ return table.size();
}
public ActorRef getRouter() {
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
-import akka.actor.Address;
-import akka.actor.Props;
-import akka.dispatch.Mapper;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Option;
import akka.japi.Pair;
-import akka.pattern.Patterns;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
/**
* Registry to look up cluster nodes that have registered for a given rpc.
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
-public class RpcRegistry extends AbstractUntypedActorWithMetering {
+public class RpcRegistry extends BucketStore<RoutingTable> {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
- /**
- * Store to keep the registry. Bucket store sync's it across nodes in the cluster
- */
- private ActorRef bucketStore;
-
- /**
- * Rpc broker that would use the registry to route requests.
- */
- private ActorRef localRouter;
-
- private RemoteRpcProviderConfig config;
-
public RpcRegistry() {
- bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
- this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
- log.info("Bucket store path = {}", bucketStore.path().toString());
+ getLocalBucket().setData(new RoutingTable());
}
- public RpcRegistry(ActorRef bucketStore) {
- this.bucketStore = bucketStore;
- }
-
-
@Override
protected void handleReceive(Object message) throws Exception {
//TODO: if sender is remote, reject message
- if (message instanceof SetLocalRouter)
+ if (message instanceof SetLocalRouter) {
receiveSetLocalRouter((SetLocalRouter) message);
-
- if (message instanceof AddOrUpdateRoutes)
+ } else if (message instanceof AddOrUpdateRoutes) {
receiveAddRoutes((AddOrUpdateRoutes) message);
-
- else if (message instanceof RemoveRoutes)
+ } else if (message instanceof RemoveRoutes) {
receiveRemoveRoutes((RemoveRoutes) message);
-
- else if (message instanceof Messages.FindRouters)
+ } else if (message instanceof Messages.FindRouters) {
receiveGetRouter((FindRouters) message);
-
- else
- unhandled(message);
+ } else {
+ super.handleReceive(message);
+ }
}
/**
* @param message contains {@link akka.actor.ActorRef} for rpc broker
*/
private void receiveSetLocalRouter(SetLocalRouter message) {
- localRouter = message.getRouter();
+ getLocalBucket().getData().setRouter(message.getRouter());
}
/**
*/
private void receiveAddRoutes(AddOrUpdateRoutes msg) {
- Preconditions.checkState(localRouter != null, "Router must be set first");
+ log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+
+ RoutingTable table = getLocalBucket().getData().copy();
+ for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ table.addRoute(routeId);
+ }
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
- futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+ updateLocalBucket(table);
}
/**
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
- futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+ RoutingTable table = getLocalBucket().getData().copy();
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ table.removeRoute(routeId);
+ }
+ updateLocalBucket(table);
}
/**
* @param msg
*/
private void receiveGetRouter(FindRouters msg) {
- final ActorRef sender = getSender();
-
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
- futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
- }
-
- /**
- * Helper to create empty reply when no routers are found
- *
- * @return
- */
- private Messages.FindRoutersReply createEmptyReply() {
- List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
- return new Messages.FindRoutersReply(routerWithUpdateTime);
- }
-
- /**
- * Helper to create a reply when routers are found for the given rpc
- *
- * @param buckets
- * @param routeId
- * @return
- */
- private Messages.FindRoutersReply createReplyWithRouters(
- Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
List<Pair<ActorRef, Long>> routers = new ArrayList<>();
- Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
-
- for (Bucket bucket : buckets.values()) {
-
- RoutingTable table = (RoutingTable) bucket.getData();
- if (table == null)
- continue;
- routerWithUpdateTime = table.getRouterFor(routeId);
- if (routerWithUpdateTime.isEmpty())
- continue;
+ RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+ findRoutes(getLocalBucket().getData(), routeId, routers);
- routers.add(routerWithUpdateTime.get());
+ for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+ findRoutes(bucket.getData(), routeId, routers);
}
- return new Messages.FindRoutersReply(routers);
- }
-
-
- ///
- ///private factories to create Mapper
- ///
-
- /**
- * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
- *
- * @param routeId the rpc
- * @param sender client who asked to find the routers.
- * @return
- */
- private Mapper<Object, Void> getMapperToGetRouter(
- final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
-
- if (replyMessage instanceof GetAllBucketsReply) {
-
- GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
- Map<Address, Bucket> buckets = reply.getBuckets();
-
- if (buckets == null || buckets.isEmpty()) {
- sender.tell(createEmptyReply(), getSelf());
- return null;
- }
-
- sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
- }
- return null;
- }
- };
- }
-
- /**
- * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
- * it updates the local bucket in bucket store.
- *
- * @param routeIds rpc to remote
- * @return
- */
- private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetLocalBucketReply) {
-
- GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
- Bucket<RoutingTable> bucket = reply.getBucket();
-
- if (bucket == null) {
- log.debug("Local bucket is null");
- return null;
- }
-
- RoutingTable table = bucket.getData();
- if (table == null)
- table = new RoutingTable();
-
- table.setRouter(localRouter);
-
- if (!table.isEmpty()) {
- for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- table.removeRoute(routeId);
- }
- }
- bucket.setData(table);
-
- UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
- bucketStore.tell(updateBucketMessage, getSelf());
- }
- return null;
- }
- };
+ getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
}
- /**
- * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
- * it updates the local bucket in bucket store.
- *
- * @param routeIds rpc to add
- * @return
- */
- private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetLocalBucketReply) {
-
- GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
- Bucket<RoutingTable> bucket = reply.getBucket();
-
- if (bucket == null) {
- log.debug("Local bucket is null");
- return null;
- }
-
- RoutingTable table = bucket.getData();
- if (table == null)
- table = new RoutingTable();
-
- table.setRouter(localRouter);
- for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- table.addRoute(routeId);
- }
-
- bucket.setData(table);
-
- UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
- bucketStore.tell(updateBucketMessage, getSelf());
- }
+ private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
+ List<Pair<ActorRef, Long>> routers) {
+ if (table == null) {
+ return;
+ }
- return null;
- }
- };
+ Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
+ if(!routerWithUpdateTime.isEmpty()) {
+ routers.add(routerWithUpdateTime.get());
+ }
}
/**
public interface Bucket<T extends Copier<T>> {
public Long getVersion();
public T getData();
- public void setData(T data);
}
private T data;
+ public BucketImpl() {
+ }
+
+ public BucketImpl(T data) {
+ this.data = data;
+ }
+
+ public BucketImpl(Bucket<T> other) {
+ this.version = other.getVersion();
+ this.data = other.getData();
+ }
+
+ public void setData(T data) {
+ this.data = data;
+ this.version = System.currentTimeMillis()+1;
+ }
+
@Override
public Long getVersion() {
return version;
@Override
public T getData() {
- if (this.data == null)
- return null;
-
- return data.copy();
- }
-
- public void setData(T data){
- this.version = System.currentTimeMillis()+1;
- this.data = data;
+ return data;
}
@Override
import akka.cluster.ClusterActorRefProvider;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
import org.opendaylight.controller.utils.ConditionalProbe;
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
-public class BucketStore extends AbstractUntypedActorWithMetering {
+public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
+
+ private static final Long NO_VERSION = -1L;
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
/**
* Bucket owned by the node
*/
- private BucketImpl localBucket = new BucketImpl();
+ private final BucketImpl<T> localBucket = new BucketImpl<>();
/**
* Buckets ownded by other known nodes in the cluster
*/
- private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+ private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
/**
* Bucket version for every known node in the cluster including this node
*/
- private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+ private final Map<Address, Long> versions = new HashMap<>();
/**
* Cluster address for this node
}
}
-
@Override
protected void handleReceive(Object message) throws Exception {
if (probe != null) {
probe = (ConditionalProbe) message;
// Send back any message to tell the caller we got the probe.
getSender().tell("Got it", getSelf());
- } else if (message instanceof UpdateBucket) {
- receiveUpdateBucket(((UpdateBucket) message).getBucket());
} else if (message instanceof GetAllBuckets) {
- receiveGetAllBucket();
- } else if (message instanceof GetLocalBucket) {
- receiveGetLocalBucket();
+ receiveGetAllBuckets();
} else if (message instanceof GetBucketsByMembers) {
- receiveGetBucketsByMembers(
- ((GetBucketsByMembers) message).getMembers());
+ receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
} else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
} else if (message instanceof UpdateRemoteBuckets) {
- receiveUpdateRemoteBuckets(
- ((UpdateRemoteBuckets) message).getBuckets());
+ receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
} else {
if(log.isDebugEnabled()) {
log.debug("Unhandled message [{}]", message);
}
}
- /**
- * Returns a copy of bucket owned by this node
- */
- private void receiveGetLocalBucket() {
- final ActorRef sender = getSender();
- GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
- sender.tell(reply, getSelf());
- }
-
- /**
- * Updates the bucket owned by this node
- *
- * @param updatedBucket
- */
- void receiveUpdateBucket(Bucket updatedBucket){
-
- localBucket = (BucketImpl) updatedBucket;
- versions.put(selfAddress, localBucket.getVersion());
- }
-
/**
* Returns all the buckets the this node knows about, self owned + remote
*/
- void receiveGetAllBucket(){
+ void receiveGetAllBuckets(){
final ActorRef sender = getSender();
sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
}
*
* @return self owned + remote buckets
*/
+ @SuppressWarnings("rawtypes")
Map<Address, Bucket> getAllBuckets(){
Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
- all.put(selfAddress, localBucket);
+ all.put(selfAddress, new BucketImpl<>(localBucket));
//then get all remote buckets
all.putAll(remoteBuckets);
*
* @param members requested members
*/
+ @SuppressWarnings("rawtypes")
void receiveGetBucketsByMembers(Set<Address> members){
final ActorRef sender = getSender();
Map<Address, Bucket> buckets = getBucketsByMembers(members);
* @param members requested members
* @return buckets for requested memebers
*/
+ @SuppressWarnings("rawtypes")
Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
Map<Address, Bucket> buckets = new HashMap<>();
//first add the local bucket if asked
if (members.contains(selfAddress)) {
- buckets.put(selfAddress, localBucket);
+ buckets.put(selfAddress, new BucketImpl<>(localBucket));
}
//then get buckets for requested remote nodes
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
+ @SuppressWarnings({ "rawtypes", "unchecked" })
void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
-
+ log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
if (receivedBuckets == null || receivedBuckets.isEmpty())
{
return; //nothing to do
Long localVersion = versions.get(entry.getKey());
if (localVersion == null) {
- localVersion = -1L;
+ localVersion = NO_VERSION;
}
- Bucket receivedBucket = entry.getValue();
+ Bucket<T> receivedBucket = entry.getValue();
if (receivedBucket == null) {
continue;
Long remoteVersion = receivedBucket.getVersion();
if (remoteVersion == null) {
- remoteVersion = -1L;
+ remoteVersion = NO_VERSION;
}
//update only if remote version is newer
versions.put(entry.getKey(), remoteVersion);
}
}
+
if(log.isDebugEnabled()) {
log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
}
}
- ///
- ///Getter Setters
- ///
-
- BucketImpl getLocalBucket() {
+ protected BucketImpl<T> getLocalBucket() {
return localBucket;
}
- void setLocalBucket(BucketImpl localBucket) {
- this.localBucket = localBucket;
+ protected void updateLocalBucket(T data) {
+ localBucket.setData(data);
+ versions.put(selfAddress, localBucket.getVersion());
}
- ConcurrentMap<Address, Bucket> getRemoteBuckets() {
+ protected Map<Address, Bucket<T>> getRemoteBuckets() {
return remoteBuckets;
}
- void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
- this.remoteBuckets = remoteBuckets;
- }
-
- ConcurrentMap<Address, Long> getVersions() {
+ @VisibleForTesting
+ Map<Address, Long> getVersions() {
return versions;
}
-
- void setVersions(ConcurrentMap<Address, Long> versions) {
- this.versions = versions;
- }
-
- Address getSelfAddress() {
- return selfAddress;
- }
}
import akka.actor.Address;
import com.google.common.base.Preconditions;
-
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
/**
public static class BucketStoreMessages{
- public static class GetLocalBucket implements Serializable {
- private static final long serialVersionUID = 1L;
- }
-
- public static class ContainsBucket implements Serializable {
- private static final long serialVersionUID = 1L;
- final private Bucket bucket;
-
- public ContainsBucket(Bucket bucket){
- Preconditions.checkArgument(bucket != null, "bucket can not be null");
- this.bucket = bucket;
- }
-
- public Bucket getBucket(){
- return bucket;
- }
-
- }
-
- public static class UpdateBucket extends ContainsBucket implements Serializable {
- private static final long serialVersionUID = 1L;
- public UpdateBucket(Bucket bucket){
- super(bucket);
- }
- }
-
- public static class GetLocalBucketReply extends ContainsBucket implements Serializable {
- private static final long serialVersionUID = 1L;
- public GetLocalBucketReply(Bucket bucket){
- super(bucket);
- }
- }
-
public static class GetAllBuckets implements Serializable {
private static final long serialVersionUID = 1L;
}
public static class GetBucketsByMembers implements Serializable{
private static final long serialVersionUID = 1L;
- private Set<Address> members;
+ private final Set<Address> members;
public GetBucketsByMembers(Set<Address> members){
Preconditions.checkArgument(members != null, "members can not be null");
public static class ContainsBuckets implements Serializable{
private static final long serialVersionUID = 1L;
- private Map<Address, Bucket> buckets;
+ private final Map<Address, Bucket> buckets;
public ContainsBuckets(Map<Address, Bucket> buckets){
Preconditions.checkArgument(buckets != null, "buckets can not be null");
for (Map.Entry<Address, Bucket> entry : buckets.entrySet()){
//ignore null entries
- if ( (entry.getKey() == null) || (entry.getValue() == null) )
+ if ( (entry.getKey() == null) || (entry.getValue() == null) ) {
continue;
+ }
copy.put(entry.getKey(), entry.getValue());
}
- return new HashMap<>(copy);
+ return copy;
}
}
public static final class GossipStatus extends ContainsBucketVersions implements Serializable{
private static final long serialVersionUID = 1L;
- private Address from;
+ private final Address from;
public GossipStatus(Address from, Map<Address, Long> versions) {
super(versions);
package org.opendaylight.controller.remote.rpc.registry;
-import akka.actor.ActorPath;
import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import akka.actor.ChildActorPath;
+import akka.actor.Address;
import akka.actor.Props;
-import akka.pattern.Patterns;
+import akka.japi.Pair;
import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import com.google.common.base.Predicate;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.annotation.Nullable;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.utils.ConditionalProbe;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private ActorRef registry2;
private ActorRef registry3;
+ private int routeIdCounter = 1;
+
@BeforeClass
public static void staticSetup() throws InterruptedException {
RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
final JavaTestKit mockBroker = new JavaTestKit(node1);
- final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
+ Address nodeAddress = node1.provider().getDefaultAddress();
// Add rpc on node 1
registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
- // install probe
- final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath,
- Messages.BucketStoreMessages.UpdateBucket.class);
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
- registry1.tell(getAddRouteMessage(), mockBroker.getRef());
+ registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef());
// Bucket store should get an update bucket message. Updated bucket contains added rpc.
- probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateBucket.class);
+
+ Map<Address, Bucket> buckets = retrieveBuckets(registry1, mockBroker, nodeAddress);
+ verifyBucket(buckets.get(nodeAddress), addedRouteIds);
+
+ Map<Address, Long> versions = retrieveVersions(registry1, mockBroker);
+ Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(),
+ versions.get(nodeAddress));
// Now remove rpc
- registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
+ registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef());
// Bucket store should get an update bucket message. Rpc is removed in the updated bucket
- probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateBucket.class);
+
+ verifyEmptyBucket(mockBroker, registry1, nodeAddress);
System.out.println("testAddRemoveRpcOnSameNode ending");
System.out.println("testRpcAddRemoveInCluster starting");
final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+ final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
- // install probe on node2's bucket store
- final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
- final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath,
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ Address node1Address = node1.provider().getDefaultAddress();
// Add rpc on node 1
registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+ registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef());
// Bucket store on node2 should get a message to update its local copy of remote buckets
- probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+ Map<Address, Bucket> buckets = retrieveBuckets(registry2, mockBroker2, node1Address);
+ verifyBucket(buckets.get(node1Address), addedRouteIds);
// Now remove
- registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
+ registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef());
- // Bucket store on node2 should get a message to update its local copy of remote buckets
- probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ // Bucket store on node2 should get a message to update its local copy of remote buckets.
+ // Wait for the bucket for node1 to be empty.
+
+ verifyEmptyBucket(mockBroker2, registry2, node1Address);
System.out.println("testRpcAddRemoveInCluster ending");
}
+ private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address)
+ throws AssertionError {
+ Map<Address, Bucket> buckets;
+ int nTries = 0;
+ while(true) {
+ buckets = retrieveBuckets(registry1, testKit, address);
+
+ try {
+ verifyBucket(buckets.get(address), Collections.<RouteIdentifier<?, ?, ?>>emptyList());
+ break;
+ } catch (AssertionError e) {
+ if(++nTries >= 50) {
+ throw e;
+ }
+ }
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
/**
* Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
*
registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
- // install probe on node 3
- final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
- final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath,
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
// Add rpc on node 1
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds1 = createRouteIds();
registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
-
- probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef());
- // Add same rpc on node 2
+ // Add rpc on node 2
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds2 = createRouteIds();
registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
- registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+ registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef());
+
+ Address node1Address = node1.provider().getDefaultAddress();
+ Address node2Address = node2.provider().getDefaultAddress();
+
+ Map<Address, Bucket> buckets = retrieveBuckets(registry3, mockBroker3, node1Address,
+ node2Address);
- probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ verifyBucket(buckets.get(node1Address), addedRouteIds1);
+ verifyBucket(buckets.get(node2Address), addedRouteIds2);
+
+ Map<Address, Long> versions = retrieveVersions(registry3, mockBroker3);
+ Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(),
+ versions.get(node1Address));
+ Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(),
+ versions.get(node2Address));
+
+ RouteIdentifier<?, ?, ?> routeID = addedRouteIds1.get(0);
+ registry3.tell(new FindRouters(routeID), mockBroker3.getRef());
+
+ FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+ FindRoutersReply.class);
+
+ List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
+ Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size());
+
+ respList.get(0).first().tell("hello", ActorRef.noSender());
+ mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello");
}
- private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz)
- throws Exception {
- final JavaTestKit probe = new JavaTestKit(node);
-
- ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
- @Override
- public boolean apply(@Nullable Object input) {
- if (input != null) {
- return clazz.equals(input.getClass());
- } else {
- return false;
- }
+ private Map<Address, Long> retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) {
+ bucketStore.tell(new GetBucketVersions(), testKit.getRef());
+ GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+ GetBucketVersionsReply.class);
+ return reply.getVersions();
+ }
+
+ private void verifyBucket(Bucket<RoutingTable> bucket, List<RouteIdentifier<?, ?, ?>> expRouteIds) {
+ RoutingTable table = bucket.getData();
+ Assert.assertNotNull("Bucket RoutingTable is null", table);
+ for(RouteIdentifier<?, ?, ?> r: expRouteIds) {
+ if(!table.contains(r)) {
+ Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table);
}
- });
+ }
- FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
- Timeout timeout = new Timeout(duration);
- int maxTries = 30;
- int i = 0;
- while(true) {
- ActorSelection subject = node.actorSelection(subjectPath);
- Future<Object> future = Patterns.ask(subject, conditionalProbe, timeout);
+ Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size());
+ }
- try {
- Await.ready(future, duration);
- break;
- } catch (TimeoutException | InterruptedException e) {
- if(++i > maxTries) {
- throw e;
+ private Map<Address, Bucket> retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit,
+ Address... addresses) {
+ int nTries = 0;
+ while(true) {
+ bucketStore.tell(new GetAllBuckets(), testKit.getRef());
+ GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+ GetAllBucketsReply.class);
+
+ Map<Address, Bucket> buckets = reply.getBuckets();
+ boolean foundAll = true;
+ for(Address addr: addresses) {
+ Bucket bucket = buckets.get(addr);
+ if(bucket == null) {
+ foundAll = false;
+ break;
}
}
- }
- return probe;
+ if(foundAll) {
+ return buckets;
+ }
- }
+ if(++nTries >= 50) {
+ Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses)
+ + ", Actual: " + buckets);
+ }
- private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
- return new AddOrUpdateRoutes(createRouteIds());
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
}
- private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
- return new RemoveRoutes(createRouteIds());
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAddRoutesConcurrency() throws Exception {
+ final JavaTestKit testKit = new JavaTestKit(node1);
+
+ registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender());
+
+ final int nRoutes = 500;
+ final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
+ for(int i = 0; i < nRoutes; i++) {
+ final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null,
+ new QName(new URI("/mockrpc"), "type" + i), null);
+ added[i] = routeId;
+
+ //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ registry1.tell(new AddOrUpdateRoutes(Arrays.<RouteIdentifier<?, ?, ?>>asList(routeId)),
+ ActorRef.noSender());
+ }
+
+ GetAllBuckets getAllBuckets = new GetAllBuckets();
+ FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
+ int nTries = 0;
+ while(true) {
+ registry1.tell(getAllBuckets, testKit.getRef());
+ GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
+
+ Bucket<RoutingTable> localBucket = reply.getBuckets().values().iterator().next();
+ RoutingTable table = localBucket.getData();
+ if(table != null && table.size() == nRoutes) {
+ for(RouteIdentifier<?, ?, ?> r: added) {
+ Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r));
+ }
+
+ break;
+ }
+
+ if(++nTries >= 50) {
+ Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
+ }
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
}
private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
- QName type = new QName(new URI("/mockrpc"), "mockrpc");
+ QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++);
List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
routeIds.add(new RouteIdentifierImpl(null, type, null));
return routeIds;
}
-
}
import akka.actor.Props;
import akka.testkit.TestActorRef;
import com.typesafe.config.ConfigFactory;
+import java.util.HashMap;
+import java.util.Map;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
-import java.util.HashMap;
-import java.util.Map;
-
public class BucketStoreTest {
private static ActorSystem system;
- private static BucketStore store;
@BeforeClass
public static void setup() {
system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-
- store = createStore();
}
@AfterClass
system.shutdown();
}
- /**
- * Given a new local bucket
- * Should replace
- */
- @Test
- public void testReceiveUpdateBucket(){
- Bucket bucket = new BucketImpl();
- Long expectedVersion = bucket.getVersion();
-
- store.receiveUpdateBucket(bucket);
-
- Assert.assertEquals(bucket, store.getLocalBucket());
- Assert.assertEquals(expectedVersion, store.getLocalBucket().getVersion());
- }
-
/**
* Given remote buckets
* Should merge with local copy of remote buckets
@Test
public void testReceiveUpdateRemoteBuckets(){
+ BucketStore store = createStore();
+
Address localAddress = system.provider().getDefaultAddress();
Bucket localBucket = new BucketImpl();
//Should NOT contain local bucket
//Should contain ONLY 3 entries i.e a1, a2, a3
- Map<Address, Bucket> remoteBucketsInStore = store.getRemoteBuckets();
+ Map<Address, Bucket<?>> remoteBucketsInStore = store.getRemoteBuckets();
Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
Assert.assertTrue(remoteBucketsInStore.size() == 3);
Assert.assertTrue(remoteBucketsInStore.size() == 4);
//Should update versions map
- //versions map contains versions for all remote buckets (4) + local bucket
- //so it should have total 5.
+ //versions map contains versions for all remote buckets (4).
Map<Address, Long> versionsInStore = store.getVersions();
- Assert.assertTrue(String.format("Expected:%s, Actual:%s", 5, versionsInStore.size()),
- versionsInStore.size() == 5);
+ Assert.assertEquals(4, versionsInStore.size());
Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1));
Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2));
Assert.assertEquals(b3_new.getVersion(), versionsInStore.get(a3));