From: Moiz Raja Date: Tue, 6 Jan 2015 14:22:53 +0000 (+0000) Subject: Merge "Bug 2526: Race condition may cause missing routes in RPC BucketStore" X-Git-Tag: release/lithium~731 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f3fc2b47ca3e5be0b95a273664b6003d45fb9dd8;hp=65a42d265f05715437e760af145ebf9b864e13ba Merge "Bug 2526: Race condition may cause missing routes in RPC BucketStore" --- diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java index fe8c463d2e..52b1106c87 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -10,23 +10,22 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; import akka.japi.Option; import akka.japi.Pair; -import org.opendaylight.controller.remote.rpc.registry.gossip.Copier; -import org.opendaylight.controller.sal.connector.api.RpcRouter; - import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import org.opendaylight.controller.remote.rpc.registry.gossip.Copier; +import org.opendaylight.controller.sal.connector.api.RpcRouter; public class RoutingTable implements Copier, Serializable { private static final long serialVersionUID = 1L; - private Map, Long> table = new HashMap<>(); + private final Map, Long> table = new HashMap<>(); private ActorRef router; @Override public RoutingTable copy() { RoutingTable copy = new RoutingTable(); - copy.setTable(new HashMap<>(table)); + copy.table.putAll(table); copy.setRouter(this.getRouter()); return copy; @@ -35,10 +34,11 @@ public class RoutingTable implements Copier, Serializable { public Option> getRouterFor(RpcRouter.RouteIdentifier routeId){ Long updatedTime = table.get(routeId); - if (updatedTime == null || router == null) + if (updatedTime == null || router == null) { return Option.none(); - else + } else { return Option.option(new Pair<>(router, updatedTime)); + } } public void addRoute(RpcRouter.RouteIdentifier routeId){ @@ -49,23 +49,16 @@ public class RoutingTable implements Copier, Serializable { table.remove(routeId); } - public Boolean contains(RpcRouter.RouteIdentifier routeId){ + public boolean contains(RpcRouter.RouteIdentifier routeId){ return table.containsKey(routeId); } - public Boolean isEmpty(){ + public boolean isEmpty(){ return table.isEmpty(); } - /// - /// Getter, Setters - /// - //TODO: Remove public - public Map, Long> getTable() { - return table; - } - void setTable(Map, Long> table) { - this.table = table; + public int size() { + return table.size(); } public ActorRef getRouter() { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 095d70926b..845c1c819a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -8,36 +8,21 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; -import akka.actor.Address; -import akka.actor.Props; -import akka.dispatch.Mapper; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Option; import akka.japi.Pair; -import akka.pattern.Patterns; import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import java.util.ArrayList; +import java.util.List; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import scala.concurrent.Future; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; /** * Registry to look up cluster nodes that have registered for a given rpc. @@ -45,51 +30,29 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this * cluster wide information. */ -public class RpcRegistry extends AbstractUntypedActorWithMetering { +public class RpcRegistry extends BucketStore { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - /** - * Store to keep the registry. Bucket store sync's it across nodes in the cluster - */ - private ActorRef bucketStore; - - /** - * Rpc broker that would use the registry to route requests. - */ - private ActorRef localRouter; - - private RemoteRpcProviderConfig config; - public RpcRegistry() { - bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store"); - this.config = new RemoteRpcProviderConfig(getContext().system().settings().config()); - log.info("Bucket store path = {}", bucketStore.path().toString()); + getLocalBucket().setData(new RoutingTable()); } - public RpcRegistry(ActorRef bucketStore) { - this.bucketStore = bucketStore; - } - - @Override protected void handleReceive(Object message) throws Exception { //TODO: if sender is remote, reject message - if (message instanceof SetLocalRouter) + if (message instanceof SetLocalRouter) { receiveSetLocalRouter((SetLocalRouter) message); - - if (message instanceof AddOrUpdateRoutes) + } else if (message instanceof AddOrUpdateRoutes) { receiveAddRoutes((AddOrUpdateRoutes) message); - - else if (message instanceof RemoveRoutes) + } else if (message instanceof RemoveRoutes) { receiveRemoveRoutes((RemoveRoutes) message); - - else if (message instanceof Messages.FindRouters) + } else if (message instanceof Messages.FindRouters) { receiveGetRouter((FindRouters) message); - - else - unhandled(message); + } else { + super.handleReceive(message); + } } /** @@ -98,7 +61,7 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { * @param message contains {@link akka.actor.ActorRef} for rpc broker */ private void receiveSetLocalRouter(SetLocalRouter message) { - localRouter = message.getRouter(); + getLocalBucket().getData().setRouter(message.getRouter()); } /** @@ -106,10 +69,14 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { */ private void receiveAddRoutes(AddOrUpdateRoutes msg) { - Preconditions.checkState(localRouter != null, "Router must be set first"); + log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); + + RoutingTable table = getLocalBucket().getData().copy(); + for(RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.addRoute(routeId); + } - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); - futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); + updateLocalBucket(table); } /** @@ -117,9 +84,12 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { */ private void receiveRemoveRoutes(RemoveRoutes msg) { - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); - futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); + RoutingTable table = getLocalBucket().getData().copy(); + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.removeRoute(routeId); + } + updateLocalBucket(table); } /** @@ -128,168 +98,28 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { * @param msg */ private void receiveGetRouter(FindRouters msg) { - final ActorRef sender = getSender(); - - Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration()); - futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher()); - } - - /** - * Helper to create empty reply when no routers are found - * - * @return - */ - private Messages.FindRoutersReply createEmptyReply() { - List> routerWithUpdateTime = Collections.emptyList(); - return new Messages.FindRoutersReply(routerWithUpdateTime); - } - - /** - * Helper to create a reply when routers are found for the given rpc - * - * @param buckets - * @param routeId - * @return - */ - private Messages.FindRoutersReply createReplyWithRouters( - Map buckets, RpcRouter.RouteIdentifier routeId) { - List> routers = new ArrayList<>(); - Option> routerWithUpdateTime = null; - - for (Bucket bucket : buckets.values()) { - - RoutingTable table = (RoutingTable) bucket.getData(); - if (table == null) - continue; - routerWithUpdateTime = table.getRouterFor(routeId); - if (routerWithUpdateTime.isEmpty()) - continue; + RouteIdentifier routeId = msg.getRouteIdentifier(); + findRoutes(getLocalBucket().getData(), routeId, routers); - routers.add(routerWithUpdateTime.get()); + for(Bucket bucket : getRemoteBuckets().values()) { + findRoutes(bucket.getData(), routeId, routers); } - return new Messages.FindRoutersReply(routers); - } - - - /// - ///private factories to create Mapper - /// - - /** - * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found - * - * @param routeId the rpc - * @param sender client who asked to find the routers. - * @return - */ - private Mapper getMapperToGetRouter( - final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - - if (replyMessage instanceof GetAllBucketsReply) { - - GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage; - Map buckets = reply.getBuckets(); - - if (buckets == null || buckets.isEmpty()) { - sender.tell(createEmptyReply(), getSelf()); - return null; - } - - sender.tell(createReplyWithRouters(buckets, routeId), getSelf()); - } - return null; - } - }; - } - - /** - * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently, - * it updates the local bucket in bucket store. - * - * @param routeIds rpc to remote - * @return - */ - private Mapper getMapperToRemoveRoutes(final List> routeIds) { - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetLocalBucketReply) { - - GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; - Bucket bucket = reply.getBucket(); - - if (bucket == null) { - log.debug("Local bucket is null"); - return null; - } - - RoutingTable table = bucket.getData(); - if (table == null) - table = new RoutingTable(); - - table.setRouter(localRouter); - - if (!table.isEmpty()) { - for (RpcRouter.RouteIdentifier routeId : routeIds) { - table.removeRoute(routeId); - } - } - bucket.setData(table); - - UpdateBucket updateBucketMessage = new UpdateBucket(bucket); - bucketStore.tell(updateBucketMessage, getSelf()); - } - return null; - } - }; + getSender().tell(new Messages.FindRoutersReply(routers), getSelf()); } - /** - * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently, - * it updates the local bucket in bucket store. - * - * @param routeIds rpc to add - * @return - */ - private Mapper getMapperToAddRoutes(final List> routeIds) { - - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetLocalBucketReply) { - - GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; - Bucket bucket = reply.getBucket(); - - if (bucket == null) { - log.debug("Local bucket is null"); - return null; - } - - RoutingTable table = bucket.getData(); - if (table == null) - table = new RoutingTable(); - - table.setRouter(localRouter); - for (RpcRouter.RouteIdentifier routeId : routeIds) { - table.addRoute(routeId); - } - - bucket.setData(table); - - UpdateBucket updateBucketMessage = new UpdateBucket(bucket); - bucketStore.tell(updateBucketMessage, getSelf()); - } + private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, + List> routers) { + if (table == null) { + return; + } - return null; - } - }; + Option> routerWithUpdateTime = table.getRouterFor(routeId); + if(!routerWithUpdateTime.isEmpty()) { + routers.add(routerWithUpdateTime.get()); + } } /** diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java index f5dfbc5650..c40fc9349e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java @@ -11,5 +11,4 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; public interface Bucket> { public Long getVersion(); public T getData(); - public void setData(T data); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java index 01c77f1f08..b81175e9a2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java @@ -16,6 +16,23 @@ public class BucketImpl> implements Bucket, Serializable private T data; + public BucketImpl() { + } + + public BucketImpl(T data) { + this.data = data; + } + + public BucketImpl(Bucket other) { + this.version = other.getVersion(); + this.data = other.getData(); + } + + public void setData(T data) { + this.data = data; + this.version = System.currentTimeMillis()+1; + } + @Override public Long getVersion() { return version; @@ -23,15 +40,7 @@ public class BucketImpl> implements Bucket, Serializable @Override public T getData() { - if (this.data == null) - return null; - - return data.copy(); - } - - public void setData(T data){ - this.version = System.currentTimeMillis()+1; - this.data = data; + return data; } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 6ffe147e71..934609b7cf 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -15,11 +15,10 @@ import akka.actor.Props; import akka.cluster.ClusterActorRefProvider; import akka.event.Logging; import akka.event.LoggingAdapter; +import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; @@ -28,9 +27,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; import org.opendaylight.controller.utils.ConditionalProbe; @@ -43,24 +39,26 @@ import org.opendaylight.controller.utils.ConditionalProbe; * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. * */ -public class BucketStore extends AbstractUntypedActorWithMetering { +public class BucketStore> extends AbstractUntypedActorWithMetering { + + private static final Long NO_VERSION = -1L; final LoggingAdapter log = Logging.getLogger(getContext().system(), this); /** * Bucket owned by the node */ - private BucketImpl localBucket = new BucketImpl(); + private final BucketImpl localBucket = new BucketImpl<>(); /** * Buckets ownded by other known nodes in the cluster */ - private ConcurrentMap remoteBuckets = new ConcurrentHashMap<>(); + private final Map> remoteBuckets = new HashMap<>(); /** * Bucket version for every known node in the cluster including this node */ - private ConcurrentMap versions = new ConcurrentHashMap<>(); + private final Map versions = new HashMap<>(); /** * Cluster address for this node @@ -85,7 +83,6 @@ public class BucketStore extends AbstractUntypedActorWithMetering { } } - @Override protected void handleReceive(Object message) throws Exception { if (probe != null) { @@ -98,20 +95,14 @@ public class BucketStore extends AbstractUntypedActorWithMetering { probe = (ConditionalProbe) message; // Send back any message to tell the caller we got the probe. getSender().tell("Got it", getSelf()); - } else if (message instanceof UpdateBucket) { - receiveUpdateBucket(((UpdateBucket) message).getBucket()); } else if (message instanceof GetAllBuckets) { - receiveGetAllBucket(); - } else if (message instanceof GetLocalBucket) { - receiveGetLocalBucket(); + receiveGetAllBuckets(); } else if (message instanceof GetBucketsByMembers) { - receiveGetBucketsByMembers( - ((GetBucketsByMembers) message).getMembers()); + receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); } else if (message instanceof GetBucketVersions) { receiveGetBucketVersions(); } else if (message instanceof UpdateRemoteBuckets) { - receiveUpdateRemoteBuckets( - ((UpdateRemoteBuckets) message).getBuckets()); + receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); } else { if(log.isDebugEnabled()) { log.debug("Unhandled message [{}]", message); @@ -120,30 +111,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering { } } - /** - * Returns a copy of bucket owned by this node - */ - private void receiveGetLocalBucket() { - final ActorRef sender = getSender(); - GetLocalBucketReply reply = new GetLocalBucketReply(localBucket); - sender.tell(reply, getSelf()); - } - - /** - * Updates the bucket owned by this node - * - * @param updatedBucket - */ - void receiveUpdateBucket(Bucket updatedBucket){ - - localBucket = (BucketImpl) updatedBucket; - versions.put(selfAddress, localBucket.getVersion()); - } - /** * Returns all the buckets the this node knows about, self owned + remote */ - void receiveGetAllBucket(){ + void receiveGetAllBuckets(){ final ActorRef sender = getSender(); sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf()); } @@ -153,11 +124,12 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * * @return self owned + remote buckets */ + @SuppressWarnings("rawtypes") Map getAllBuckets(){ Map all = new HashMap<>(remoteBuckets.size() + 1); //first add the local bucket - all.put(selfAddress, localBucket); + all.put(selfAddress, new BucketImpl<>(localBucket)); //then get all remote buckets all.putAll(remoteBuckets); @@ -170,6 +142,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * * @param members requested members */ + @SuppressWarnings("rawtypes") void receiveGetBucketsByMembers(Set
members){ final ActorRef sender = getSender(); Map buckets = getBucketsByMembers(members); @@ -182,12 +155,13 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * @param members requested members * @return buckets for requested memebers */ + @SuppressWarnings("rawtypes") Map getBucketsByMembers(Set
members) { Map buckets = new HashMap<>(); //first add the local bucket if asked if (members.contains(selfAddress)) { - buckets.put(selfAddress, localBucket); + buckets.put(selfAddress, new BucketImpl<>(localBucket)); } //then get buckets for requested remote nodes @@ -215,8 +189,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * @param receivedBuckets buckets sent by remote * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} */ + @SuppressWarnings({ "rawtypes", "unchecked" }) void receiveUpdateRemoteBuckets(Map receivedBuckets){ - + log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); if (receivedBuckets == null || receivedBuckets.isEmpty()) { return; //nothing to do @@ -229,10 +204,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering { Long localVersion = versions.get(entry.getKey()); if (localVersion == null) { - localVersion = -1L; + localVersion = NO_VERSION; } - Bucket receivedBucket = entry.getValue(); + Bucket receivedBucket = entry.getValue(); if (receivedBucket == null) { continue; @@ -240,7 +215,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering { Long remoteVersion = receivedBucket.getVersion(); if (remoteVersion == null) { - remoteVersion = -1L; + remoteVersion = NO_VERSION; } //update only if remote version is newer @@ -249,40 +224,27 @@ public class BucketStore extends AbstractUntypedActorWithMetering { versions.put(entry.getKey(), remoteVersion); } } + if(log.isDebugEnabled()) { log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); } } - /// - ///Getter Setters - /// - - BucketImpl getLocalBucket() { + protected BucketImpl getLocalBucket() { return localBucket; } - void setLocalBucket(BucketImpl localBucket) { - this.localBucket = localBucket; + protected void updateLocalBucket(T data) { + localBucket.setData(data); + versions.put(selfAddress, localBucket.getVersion()); } - ConcurrentMap getRemoteBuckets() { + protected Map> getRemoteBuckets() { return remoteBuckets; } - void setRemoteBuckets(ConcurrentMap remoteBuckets) { - this.remoteBuckets = remoteBuckets; - } - - ConcurrentMap getVersions() { + @VisibleForTesting + Map getVersions() { return versions; } - - void setVersions(ConcurrentMap versions) { - this.versions = versions; - } - - Address getSelfAddress() { - return selfAddress; - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java index 4e8f2c61c9..b05bd7d0f6 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java @@ -9,16 +9,14 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import akka.actor.Address; import com.google.common.base.Preconditions; - import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets; /** @@ -29,46 +27,13 @@ public class Messages { public static class BucketStoreMessages{ - public static class GetLocalBucket implements Serializable { - private static final long serialVersionUID = 1L; - } - - public static class ContainsBucket implements Serializable { - private static final long serialVersionUID = 1L; - final private Bucket bucket; - - public ContainsBucket(Bucket bucket){ - Preconditions.checkArgument(bucket != null, "bucket can not be null"); - this.bucket = bucket; - } - - public Bucket getBucket(){ - return bucket; - } - - } - - public static class UpdateBucket extends ContainsBucket implements Serializable { - private static final long serialVersionUID = 1L; - public UpdateBucket(Bucket bucket){ - super(bucket); - } - } - - public static class GetLocalBucketReply extends ContainsBucket implements Serializable { - private static final long serialVersionUID = 1L; - public GetLocalBucketReply(Bucket bucket){ - super(bucket); - } - } - public static class GetAllBuckets implements Serializable { private static final long serialVersionUID = 1L; } public static class GetBucketsByMembers implements Serializable{ private static final long serialVersionUID = 1L; - private Set
members; + private final Set
members; public GetBucketsByMembers(Set
members){ Preconditions.checkArgument(members != null, "members can not be null"); @@ -82,7 +47,7 @@ public class Messages { public static class ContainsBuckets implements Serializable{ private static final long serialVersionUID = 1L; - private Map buckets; + private final Map buckets; public ContainsBuckets(Map buckets){ Preconditions.checkArgument(buckets != null, "buckets can not be null"); @@ -94,11 +59,12 @@ public class Messages { for (Map.Entry entry : buckets.entrySet()){ //ignore null entries - if ( (entry.getKey() == null) || (entry.getValue() == null) ) + if ( (entry.getKey() == null) || (entry.getValue() == null) ) { continue; + } copy.put(entry.getKey(), entry.getValue()); } - return new HashMap<>(copy); + return copy; } } @@ -162,7 +128,7 @@ public class Messages { public static final class GossipStatus extends ContainsBucketVersions implements Serializable{ private static final long serialVersionUID = 1L; - private Address from; + private final Address from; public GossipStatus(Address from, Map versions) { super(versions); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index e0d145dbe1..5b7b7e4fdc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -1,38 +1,41 @@ package org.opendaylight.controller.remote.rpc.registry; -import akka.actor.ActorPath; import akka.actor.ActorRef; -import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import akka.actor.ChildActorPath; +import akka.actor.Address; import akka.actor.Props; -import akka.pattern.Patterns; +import akka.japi.Pair; import akka.testkit.JavaTestKit; -import akka.util.Timeout; -import com.google.common.base.Predicate; +import com.google.common.util.concurrent.Uninterruptibles; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import javax.annotation.Nullable; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.utils.ConditionalProbe; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; import org.opendaylight.yangtools.yang.common.QName; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -46,6 +49,8 @@ public class RpcRegistryTest { private ActorRef registry2; private ActorRef registry3; + private int routeIdCounter = 1; + @BeforeClass public static void staticSetup() throws InterruptedException { RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); @@ -97,27 +102,30 @@ public class RpcRegistryTest { final JavaTestKit mockBroker = new JavaTestKit(node1); - final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store"); + Address nodeAddress = node1.provider().getDefaultAddress(); // Add rpc on node 1 registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef()); - // install probe - final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath, - Messages.BucketStoreMessages.UpdateBucket.class); + List> addedRouteIds = createRouteIds(); - registry1.tell(getAddRouteMessage(), mockBroker.getRef()); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef()); // Bucket store should get an update bucket message. Updated bucket contains added rpc. - probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateBucket.class); + + Map buckets = retrieveBuckets(registry1, mockBroker, nodeAddress); + verifyBucket(buckets.get(nodeAddress), addedRouteIds); + + Map versions = retrieveVersions(registry1, mockBroker); + Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(), + versions.get(nodeAddress)); // Now remove rpc - registry1.tell(getRemoveRouteMessage(), mockBroker.getRef()); + registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef()); // Bucket store should get an update bucket message. Rpc is removed in the updated bucket - probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateBucket.class); + + verifyEmptyBucket(mockBroker, registry1, nodeAddress); System.out.println("testAddRemoveRpcOnSameNode ending"); @@ -136,30 +144,52 @@ public class RpcRegistryTest { System.out.println("testRpcAddRemoveInCluster starting"); final JavaTestKit mockBroker1 = new JavaTestKit(node1); + final JavaTestKit mockBroker2 = new JavaTestKit(node2); + + List> addedRouteIds = createRouteIds(); - // install probe on node2's bucket store - final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store"); - final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath, - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + Address node1Address = node1.provider().getDefaultAddress(); // Add rpc on node 1 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); - registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef()); // Bucket store on node2 should get a message to update its local copy of remote buckets - probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + + Map buckets = retrieveBuckets(registry2, mockBroker2, node1Address); + verifyBucket(buckets.get(node1Address), addedRouteIds); // Now remove - registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef()); + registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef()); - // Bucket store on node2 should get a message to update its local copy of remote buckets - probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + // Bucket store on node2 should get a message to update its local copy of remote buckets. + // Wait for the bucket for node1 to be empty. + + verifyEmptyBucket(mockBroker2, registry2, node1Address); System.out.println("testRpcAddRemoveInCluster ending"); } + private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address) + throws AssertionError { + Map buckets; + int nTries = 0; + while(true) { + buckets = retrieveBuckets(registry1, testKit, address); + + try { + verifyBucket(buckets.get(address), Collections.>emptyList()); + break; + } catch (AssertionError e) { + if(++nTries >= 50) { + throw e; + } + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + } + /** * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated. * @@ -174,76 +204,142 @@ public class RpcRegistryTest { registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef()); - // install probe on node 3 - final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store"); - final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath, - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); - // Add rpc on node 1 + List> addedRouteIds1 = createRouteIds(); registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); - registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); - - probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef()); - // Add same rpc on node 2 + // Add rpc on node 2 + List> addedRouteIds2 = createRouteIds(); registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef()); - registry2.tell(getAddRouteMessage(), mockBroker2.getRef()); + registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef()); + + Address node1Address = node1.provider().getDefaultAddress(); + Address node2Address = node2.provider().getDefaultAddress(); + + Map buckets = retrieveBuckets(registry3, mockBroker3, node1Address, + node2Address); - probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + verifyBucket(buckets.get(node1Address), addedRouteIds1); + verifyBucket(buckets.get(node2Address), addedRouteIds2); + + Map versions = retrieveVersions(registry3, mockBroker3); + Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(), + versions.get(node1Address)); + Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(), + versions.get(node2Address)); + + RouteIdentifier routeID = addedRouteIds1.get(0); + registry3.tell(new FindRouters(routeID), mockBroker3.getRef()); + + FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + FindRoutersReply.class); + + List> respList = reply.getRouterWithUpdateTime(); + Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size()); + + respList.get(0).first().tell("hello", ActorRef.noSender()); + mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello"); } - private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) - throws Exception { - final JavaTestKit probe = new JavaTestKit(node); - - ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate() { - @Override - public boolean apply(@Nullable Object input) { - if (input != null) { - return clazz.equals(input.getClass()); - } else { - return false; - } + private Map retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) { + bucketStore.tell(new GetBucketVersions(), testKit.getRef()); + GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + GetBucketVersionsReply.class); + return reply.getVersions(); + } + + private void verifyBucket(Bucket bucket, List> expRouteIds) { + RoutingTable table = bucket.getData(); + Assert.assertNotNull("Bucket RoutingTable is null", table); + for(RouteIdentifier r: expRouteIds) { + if(!table.contains(r)) { + Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table); } - }); + } - FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS); - Timeout timeout = new Timeout(duration); - int maxTries = 30; - int i = 0; - while(true) { - ActorSelection subject = node.actorSelection(subjectPath); - Future future = Patterns.ask(subject, conditionalProbe, timeout); + Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size()); + } - try { - Await.ready(future, duration); - break; - } catch (TimeoutException | InterruptedException e) { - if(++i > maxTries) { - throw e; + private Map retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit, + Address... addresses) { + int nTries = 0; + while(true) { + bucketStore.tell(new GetAllBuckets(), testKit.getRef()); + GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + GetAllBucketsReply.class); + + Map buckets = reply.getBuckets(); + boolean foundAll = true; + for(Address addr: addresses) { + Bucket bucket = buckets.get(addr); + if(bucket == null) { + foundAll = false; + break; } } - } - return probe; + if(foundAll) { + return buckets; + } - } + if(++nTries >= 50) { + Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses) + + ", Actual: " + buckets); + } - private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException { - return new AddOrUpdateRoutes(createRouteIds()); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } } - private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException { - return new RemoveRoutes(createRouteIds()); + @SuppressWarnings("unchecked") + @Test + public void testAddRoutesConcurrency() throws Exception { + final JavaTestKit testKit = new JavaTestKit(node1); + + registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender()); + + final int nRoutes = 500; + final RouteIdentifier[] added = new RouteIdentifier[nRoutes]; + for(int i = 0; i < nRoutes; i++) { + final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, + new QName(new URI("/mockrpc"), "type" + i), null); + added[i] = routeId; + + //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + registry1.tell(new AddOrUpdateRoutes(Arrays.>asList(routeId)), + ActorRef.noSender()); + } + + GetAllBuckets getAllBuckets = new GetAllBuckets(); + FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS); + int nTries = 0; + while(true) { + registry1.tell(getAllBuckets, testKit.getRef()); + GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class); + + Bucket localBucket = reply.getBuckets().values().iterator().next(); + RoutingTable table = localBucket.getData(); + if(table != null && table.size() == nRoutes) { + for(RouteIdentifier r: added) { + Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r)); + } + + break; + } + + if(++nTries >= 50) { + Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size()); + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } } private List> createRouteIds() throws URISyntaxException { - QName type = new QName(new URI("/mockrpc"), "mockrpc"); + QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++); List> routeIds = new ArrayList<>(); routeIds.add(new RouteIdentifierImpl(null, type, null)); return routeIds; } - } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index 78fcbd3a14..ddd08a5f47 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -12,27 +12,23 @@ import akka.actor.Address; import akka.actor.Props; import akka.testkit.TestActorRef; import com.typesafe.config.ConfigFactory; +import java.util.HashMap; +import java.util.Map; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.TerminationMonitor; -import java.util.HashMap; -import java.util.Map; - public class BucketStoreTest { private static ActorSystem system; - private static BucketStore store; @BeforeClass public static void setup() { system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); - - store = createStore(); } @AfterClass @@ -40,21 +36,6 @@ public class BucketStoreTest { system.shutdown(); } - /** - * Given a new local bucket - * Should replace - */ - @Test - public void testReceiveUpdateBucket(){ - Bucket bucket = new BucketImpl(); - Long expectedVersion = bucket.getVersion(); - - store.receiveUpdateBucket(bucket); - - Assert.assertEquals(bucket, store.getLocalBucket()); - Assert.assertEquals(expectedVersion, store.getLocalBucket().getVersion()); - } - /** * Given remote buckets * Should merge with local copy of remote buckets @@ -62,6 +43,8 @@ public class BucketStoreTest { @Test public void testReceiveUpdateRemoteBuckets(){ + BucketStore store = createStore(); + Address localAddress = system.provider().getDefaultAddress(); Bucket localBucket = new BucketImpl(); @@ -84,7 +67,7 @@ public class BucketStoreTest { //Should NOT contain local bucket //Should contain ONLY 3 entries i.e a1, a2, a3 - Map remoteBucketsInStore = store.getRemoteBuckets(); + Map> remoteBucketsInStore = store.getRemoteBuckets(); Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress)); Assert.assertTrue(remoteBucketsInStore.size() == 3); @@ -122,11 +105,9 @@ public class BucketStoreTest { Assert.assertTrue(remoteBucketsInStore.size() == 4); //Should update versions map - //versions map contains versions for all remote buckets (4) + local bucket - //so it should have total 5. + //versions map contains versions for all remote buckets (4). Map versionsInStore = store.getVersions(); - Assert.assertTrue(String.format("Expected:%s, Actual:%s", 5, versionsInStore.size()), - versionsInStore.size() == 5); + Assert.assertEquals(4, versionsInStore.size()); Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1)); Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2)); Assert.assertEquals(b3_new.getVersion(), versionsInStore.get(a3));