From c083d5c6ef874e38eebca402105e740f3614d6f7 Mon Sep 17 00:00:00 2001 From: tpantelis Date: Fri, 19 Dec 2014 03:45:43 -0500 Subject: [PATCH] Bug 2526: Race condition may cause missing routes in RPC BucketStore Changed the RpcRegistry class to derive from BucketStore instead of creating the BucketStore as a separate actor. This was done because the copy-on-write update operations in the BucketStore need to be done atomically so the RpcRegistry needs direct access to the BucketStore in order to do this safely and efficiently. The RpcRegistry handles the sematics of the Bucket data - this keeps the BucketStore data agnostic. Change-Id: Ief96c28775b729f459d324971403222e5a578029 Signed-off-by: tpantelis --- .../remote/rpc/registry/RoutingTable.java | 29 +- .../remote/rpc/registry/RpcRegistry.java | 256 +++-------------- .../remote/rpc/registry/gossip/Bucket.java | 1 - .../rpc/registry/gossip/BucketImpl.java | 27 +- .../rpc/registry/gossip/BucketStore.java | 96 ++----- .../remote/rpc/registry/gossip/Messages.java | 50 +--- .../remote/rpc/registry/RpcRegistryTest.java | 258 ++++++++++++------ .../rpc/registry/gossip/BucketStoreTest.java | 33 +-- 8 files changed, 293 insertions(+), 457 deletions(-) diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java index fe8c463d2e..52b1106c87 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -10,23 +10,22 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; import akka.japi.Option; import akka.japi.Pair; -import org.opendaylight.controller.remote.rpc.registry.gossip.Copier; -import org.opendaylight.controller.sal.connector.api.RpcRouter; - import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import org.opendaylight.controller.remote.rpc.registry.gossip.Copier; +import org.opendaylight.controller.sal.connector.api.RpcRouter; public class RoutingTable implements Copier, Serializable { private static final long serialVersionUID = 1L; - private Map, Long> table = new HashMap<>(); + private final Map, Long> table = new HashMap<>(); private ActorRef router; @Override public RoutingTable copy() { RoutingTable copy = new RoutingTable(); - copy.setTable(new HashMap<>(table)); + copy.table.putAll(table); copy.setRouter(this.getRouter()); return copy; @@ -35,10 +34,11 @@ public class RoutingTable implements Copier, Serializable { public Option> getRouterFor(RpcRouter.RouteIdentifier routeId){ Long updatedTime = table.get(routeId); - if (updatedTime == null || router == null) + if (updatedTime == null || router == null) { return Option.none(); - else + } else { return Option.option(new Pair<>(router, updatedTime)); + } } public void addRoute(RpcRouter.RouteIdentifier routeId){ @@ -49,23 +49,16 @@ public class RoutingTable implements Copier, Serializable { table.remove(routeId); } - public Boolean contains(RpcRouter.RouteIdentifier routeId){ + public boolean contains(RpcRouter.RouteIdentifier routeId){ return table.containsKey(routeId); } - public Boolean isEmpty(){ + public boolean isEmpty(){ return table.isEmpty(); } - /// - /// Getter, Setters - /// - //TODO: Remove public - public Map, Long> getTable() { - return table; - } - void setTable(Map, Long> table) { - this.table = table; + public int size() { + return table.size(); } public ActorRef getRouter() { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 095d70926b..845c1c819a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -8,36 +8,21 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; -import akka.actor.Address; -import akka.actor.Props; -import akka.dispatch.Mapper; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Option; import akka.japi.Pair; -import akka.pattern.Patterns; import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import java.util.ArrayList; +import java.util.List; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import scala.concurrent.Future; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; /** * Registry to look up cluster nodes that have registered for a given rpc. @@ -45,51 +30,29 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this * cluster wide information. */ -public class RpcRegistry extends AbstractUntypedActorWithMetering { +public class RpcRegistry extends BucketStore { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - /** - * Store to keep the registry. Bucket store sync's it across nodes in the cluster - */ - private ActorRef bucketStore; - - /** - * Rpc broker that would use the registry to route requests. - */ - private ActorRef localRouter; - - private RemoteRpcProviderConfig config; - public RpcRegistry() { - bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store"); - this.config = new RemoteRpcProviderConfig(getContext().system().settings().config()); - log.info("Bucket store path = {}", bucketStore.path().toString()); + getLocalBucket().setData(new RoutingTable()); } - public RpcRegistry(ActorRef bucketStore) { - this.bucketStore = bucketStore; - } - - @Override protected void handleReceive(Object message) throws Exception { //TODO: if sender is remote, reject message - if (message instanceof SetLocalRouter) + if (message instanceof SetLocalRouter) { receiveSetLocalRouter((SetLocalRouter) message); - - if (message instanceof AddOrUpdateRoutes) + } else if (message instanceof AddOrUpdateRoutes) { receiveAddRoutes((AddOrUpdateRoutes) message); - - else if (message instanceof RemoveRoutes) + } else if (message instanceof RemoveRoutes) { receiveRemoveRoutes((RemoveRoutes) message); - - else if (message instanceof Messages.FindRouters) + } else if (message instanceof Messages.FindRouters) { receiveGetRouter((FindRouters) message); - - else - unhandled(message); + } else { + super.handleReceive(message); + } } /** @@ -98,7 +61,7 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { * @param message contains {@link akka.actor.ActorRef} for rpc broker */ private void receiveSetLocalRouter(SetLocalRouter message) { - localRouter = message.getRouter(); + getLocalBucket().getData().setRouter(message.getRouter()); } /** @@ -106,10 +69,14 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { */ private void receiveAddRoutes(AddOrUpdateRoutes msg) { - Preconditions.checkState(localRouter != null, "Router must be set first"); + log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); + + RoutingTable table = getLocalBucket().getData().copy(); + for(RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.addRoute(routeId); + } - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); - futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); + updateLocalBucket(table); } /** @@ -117,9 +84,12 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { */ private void receiveRemoveRoutes(RemoveRoutes msg) { - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); - futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); + RoutingTable table = getLocalBucket().getData().copy(); + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.removeRoute(routeId); + } + updateLocalBucket(table); } /** @@ -128,168 +98,28 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { * @param msg */ private void receiveGetRouter(FindRouters msg) { - final ActorRef sender = getSender(); - - Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration()); - futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher()); - } - - /** - * Helper to create empty reply when no routers are found - * - * @return - */ - private Messages.FindRoutersReply createEmptyReply() { - List> routerWithUpdateTime = Collections.emptyList(); - return new Messages.FindRoutersReply(routerWithUpdateTime); - } - - /** - * Helper to create a reply when routers are found for the given rpc - * - * @param buckets - * @param routeId - * @return - */ - private Messages.FindRoutersReply createReplyWithRouters( - Map buckets, RpcRouter.RouteIdentifier routeId) { - List> routers = new ArrayList<>(); - Option> routerWithUpdateTime = null; - - for (Bucket bucket : buckets.values()) { - - RoutingTable table = (RoutingTable) bucket.getData(); - if (table == null) - continue; - routerWithUpdateTime = table.getRouterFor(routeId); - if (routerWithUpdateTime.isEmpty()) - continue; + RouteIdentifier routeId = msg.getRouteIdentifier(); + findRoutes(getLocalBucket().getData(), routeId, routers); - routers.add(routerWithUpdateTime.get()); + for(Bucket bucket : getRemoteBuckets().values()) { + findRoutes(bucket.getData(), routeId, routers); } - return new Messages.FindRoutersReply(routers); - } - - - /// - ///private factories to create Mapper - /// - - /** - * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found - * - * @param routeId the rpc - * @param sender client who asked to find the routers. - * @return - */ - private Mapper getMapperToGetRouter( - final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - - if (replyMessage instanceof GetAllBucketsReply) { - - GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage; - Map buckets = reply.getBuckets(); - - if (buckets == null || buckets.isEmpty()) { - sender.tell(createEmptyReply(), getSelf()); - return null; - } - - sender.tell(createReplyWithRouters(buckets, routeId), getSelf()); - } - return null; - } - }; - } - - /** - * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently, - * it updates the local bucket in bucket store. - * - * @param routeIds rpc to remote - * @return - */ - private Mapper getMapperToRemoveRoutes(final List> routeIds) { - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetLocalBucketReply) { - - GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; - Bucket bucket = reply.getBucket(); - - if (bucket == null) { - log.debug("Local bucket is null"); - return null; - } - - RoutingTable table = bucket.getData(); - if (table == null) - table = new RoutingTable(); - - table.setRouter(localRouter); - - if (!table.isEmpty()) { - for (RpcRouter.RouteIdentifier routeId : routeIds) { - table.removeRoute(routeId); - } - } - bucket.setData(table); - - UpdateBucket updateBucketMessage = new UpdateBucket(bucket); - bucketStore.tell(updateBucketMessage, getSelf()); - } - return null; - } - }; + getSender().tell(new Messages.FindRoutersReply(routers), getSelf()); } - /** - * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently, - * it updates the local bucket in bucket store. - * - * @param routeIds rpc to add - * @return - */ - private Mapper getMapperToAddRoutes(final List> routeIds) { - - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetLocalBucketReply) { - - GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; - Bucket bucket = reply.getBucket(); - - if (bucket == null) { - log.debug("Local bucket is null"); - return null; - } - - RoutingTable table = bucket.getData(); - if (table == null) - table = new RoutingTable(); - - table.setRouter(localRouter); - for (RpcRouter.RouteIdentifier routeId : routeIds) { - table.addRoute(routeId); - } - - bucket.setData(table); - - UpdateBucket updateBucketMessage = new UpdateBucket(bucket); - bucketStore.tell(updateBucketMessage, getSelf()); - } + private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, + List> routers) { + if (table == null) { + return; + } - return null; - } - }; + Option> routerWithUpdateTime = table.getRouterFor(routeId); + if(!routerWithUpdateTime.isEmpty()) { + routers.add(routerWithUpdateTime.get()); + } } /** diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java index f5dfbc5650..c40fc9349e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java @@ -11,5 +11,4 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; public interface Bucket> { public Long getVersion(); public T getData(); - public void setData(T data); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java index 01c77f1f08..b81175e9a2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java @@ -16,6 +16,23 @@ public class BucketImpl> implements Bucket, Serializable private T data; + public BucketImpl() { + } + + public BucketImpl(T data) { + this.data = data; + } + + public BucketImpl(Bucket other) { + this.version = other.getVersion(); + this.data = other.getData(); + } + + public void setData(T data) { + this.data = data; + this.version = System.currentTimeMillis()+1; + } + @Override public Long getVersion() { return version; @@ -23,15 +40,7 @@ public class BucketImpl> implements Bucket, Serializable @Override public T getData() { - if (this.data == null) - return null; - - return data.copy(); - } - - public void setData(T data){ - this.version = System.currentTimeMillis()+1; - this.data = data; + return data; } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 6ffe147e71..934609b7cf 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -15,11 +15,10 @@ import akka.actor.Props; import akka.cluster.ClusterActorRefProvider; import akka.event.Logging; import akka.event.LoggingAdapter; +import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; @@ -28,9 +27,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; import org.opendaylight.controller.utils.ConditionalProbe; @@ -43,24 +39,26 @@ import org.opendaylight.controller.utils.ConditionalProbe; * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. * */ -public class BucketStore extends AbstractUntypedActorWithMetering { +public class BucketStore> extends AbstractUntypedActorWithMetering { + + private static final Long NO_VERSION = -1L; final LoggingAdapter log = Logging.getLogger(getContext().system(), this); /** * Bucket owned by the node */ - private BucketImpl localBucket = new BucketImpl(); + private final BucketImpl localBucket = new BucketImpl<>(); /** * Buckets ownded by other known nodes in the cluster */ - private ConcurrentMap remoteBuckets = new ConcurrentHashMap<>(); + private final Map> remoteBuckets = new HashMap<>(); /** * Bucket version for every known node in the cluster including this node */ - private ConcurrentMap versions = new ConcurrentHashMap<>(); + private final Map versions = new HashMap<>(); /** * Cluster address for this node @@ -85,7 +83,6 @@ public class BucketStore extends AbstractUntypedActorWithMetering { } } - @Override protected void handleReceive(Object message) throws Exception { if (probe != null) { @@ -98,20 +95,14 @@ public class BucketStore extends AbstractUntypedActorWithMetering { probe = (ConditionalProbe) message; // Send back any message to tell the caller we got the probe. getSender().tell("Got it", getSelf()); - } else if (message instanceof UpdateBucket) { - receiveUpdateBucket(((UpdateBucket) message).getBucket()); } else if (message instanceof GetAllBuckets) { - receiveGetAllBucket(); - } else if (message instanceof GetLocalBucket) { - receiveGetLocalBucket(); + receiveGetAllBuckets(); } else if (message instanceof GetBucketsByMembers) { - receiveGetBucketsByMembers( - ((GetBucketsByMembers) message).getMembers()); + receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); } else if (message instanceof GetBucketVersions) { receiveGetBucketVersions(); } else if (message instanceof UpdateRemoteBuckets) { - receiveUpdateRemoteBuckets( - ((UpdateRemoteBuckets) message).getBuckets()); + receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); } else { if(log.isDebugEnabled()) { log.debug("Unhandled message [{}]", message); @@ -120,30 +111,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering { } } - /** - * Returns a copy of bucket owned by this node - */ - private void receiveGetLocalBucket() { - final ActorRef sender = getSender(); - GetLocalBucketReply reply = new GetLocalBucketReply(localBucket); - sender.tell(reply, getSelf()); - } - - /** - * Updates the bucket owned by this node - * - * @param updatedBucket - */ - void receiveUpdateBucket(Bucket updatedBucket){ - - localBucket = (BucketImpl) updatedBucket; - versions.put(selfAddress, localBucket.getVersion()); - } - /** * Returns all the buckets the this node knows about, self owned + remote */ - void receiveGetAllBucket(){ + void receiveGetAllBuckets(){ final ActorRef sender = getSender(); sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf()); } @@ -153,11 +124,12 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * * @return self owned + remote buckets */ + @SuppressWarnings("rawtypes") Map getAllBuckets(){ Map all = new HashMap<>(remoteBuckets.size() + 1); //first add the local bucket - all.put(selfAddress, localBucket); + all.put(selfAddress, new BucketImpl<>(localBucket)); //then get all remote buckets all.putAll(remoteBuckets); @@ -170,6 +142,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * * @param members requested members */ + @SuppressWarnings("rawtypes") void receiveGetBucketsByMembers(Set
members){ final ActorRef sender = getSender(); Map buckets = getBucketsByMembers(members); @@ -182,12 +155,13 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * @param members requested members * @return buckets for requested memebers */ + @SuppressWarnings("rawtypes") Map getBucketsByMembers(Set
members) { Map buckets = new HashMap<>(); //first add the local bucket if asked if (members.contains(selfAddress)) { - buckets.put(selfAddress, localBucket); + buckets.put(selfAddress, new BucketImpl<>(localBucket)); } //then get buckets for requested remote nodes @@ -215,8 +189,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * @param receivedBuckets buckets sent by remote * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} */ + @SuppressWarnings({ "rawtypes", "unchecked" }) void receiveUpdateRemoteBuckets(Map receivedBuckets){ - + log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); if (receivedBuckets == null || receivedBuckets.isEmpty()) { return; //nothing to do @@ -229,10 +204,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering { Long localVersion = versions.get(entry.getKey()); if (localVersion == null) { - localVersion = -1L; + localVersion = NO_VERSION; } - Bucket receivedBucket = entry.getValue(); + Bucket receivedBucket = entry.getValue(); if (receivedBucket == null) { continue; @@ -240,7 +215,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering { Long remoteVersion = receivedBucket.getVersion(); if (remoteVersion == null) { - remoteVersion = -1L; + remoteVersion = NO_VERSION; } //update only if remote version is newer @@ -249,40 +224,27 @@ public class BucketStore extends AbstractUntypedActorWithMetering { versions.put(entry.getKey(), remoteVersion); } } + if(log.isDebugEnabled()) { log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); } } - /// - ///Getter Setters - /// - - BucketImpl getLocalBucket() { + protected BucketImpl getLocalBucket() { return localBucket; } - void setLocalBucket(BucketImpl localBucket) { - this.localBucket = localBucket; + protected void updateLocalBucket(T data) { + localBucket.setData(data); + versions.put(selfAddress, localBucket.getVersion()); } - ConcurrentMap getRemoteBuckets() { + protected Map> getRemoteBuckets() { return remoteBuckets; } - void setRemoteBuckets(ConcurrentMap remoteBuckets) { - this.remoteBuckets = remoteBuckets; - } - - ConcurrentMap getVersions() { + @VisibleForTesting + Map getVersions() { return versions; } - - void setVersions(ConcurrentMap versions) { - this.versions = versions; - } - - Address getSelfAddress() { - return selfAddress; - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java index 4e8f2c61c9..b05bd7d0f6 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java @@ -9,16 +9,14 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import akka.actor.Address; import com.google.common.base.Preconditions; - import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets; /** @@ -29,46 +27,13 @@ public class Messages { public static class BucketStoreMessages{ - public static class GetLocalBucket implements Serializable { - private static final long serialVersionUID = 1L; - } - - public static class ContainsBucket implements Serializable { - private static final long serialVersionUID = 1L; - final private Bucket bucket; - - public ContainsBucket(Bucket bucket){ - Preconditions.checkArgument(bucket != null, "bucket can not be null"); - this.bucket = bucket; - } - - public Bucket getBucket(){ - return bucket; - } - - } - - public static class UpdateBucket extends ContainsBucket implements Serializable { - private static final long serialVersionUID = 1L; - public UpdateBucket(Bucket bucket){ - super(bucket); - } - } - - public static class GetLocalBucketReply extends ContainsBucket implements Serializable { - private static final long serialVersionUID = 1L; - public GetLocalBucketReply(Bucket bucket){ - super(bucket); - } - } - public static class GetAllBuckets implements Serializable { private static final long serialVersionUID = 1L; } public static class GetBucketsByMembers implements Serializable{ private static final long serialVersionUID = 1L; - private Set
members; + private final Set
members; public GetBucketsByMembers(Set
members){ Preconditions.checkArgument(members != null, "members can not be null"); @@ -82,7 +47,7 @@ public class Messages { public static class ContainsBuckets implements Serializable{ private static final long serialVersionUID = 1L; - private Map buckets; + private final Map buckets; public ContainsBuckets(Map buckets){ Preconditions.checkArgument(buckets != null, "buckets can not be null"); @@ -94,11 +59,12 @@ public class Messages { for (Map.Entry entry : buckets.entrySet()){ //ignore null entries - if ( (entry.getKey() == null) || (entry.getValue() == null) ) + if ( (entry.getKey() == null) || (entry.getValue() == null) ) { continue; + } copy.put(entry.getKey(), entry.getValue()); } - return new HashMap<>(copy); + return copy; } } @@ -162,7 +128,7 @@ public class Messages { public static final class GossipStatus extends ContainsBucketVersions implements Serializable{ private static final long serialVersionUID = 1L; - private Address from; + private final Address from; public GossipStatus(Address from, Map versions) { super(versions); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index e0d145dbe1..5b7b7e4fdc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -1,38 +1,41 @@ package org.opendaylight.controller.remote.rpc.registry; -import akka.actor.ActorPath; import akka.actor.ActorRef; -import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import akka.actor.ChildActorPath; +import akka.actor.Address; import akka.actor.Props; -import akka.pattern.Patterns; +import akka.japi.Pair; import akka.testkit.JavaTestKit; -import akka.util.Timeout; -import com.google.common.base.Predicate; +import com.google.common.util.concurrent.Uninterruptibles; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import javax.annotation.Nullable; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.utils.ConditionalProbe; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; import org.opendaylight.yangtools.yang.common.QName; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -46,6 +49,8 @@ public class RpcRegistryTest { private ActorRef registry2; private ActorRef registry3; + private int routeIdCounter = 1; + @BeforeClass public static void staticSetup() throws InterruptedException { RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); @@ -97,27 +102,30 @@ public class RpcRegistryTest { final JavaTestKit mockBroker = new JavaTestKit(node1); - final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store"); + Address nodeAddress = node1.provider().getDefaultAddress(); // Add rpc on node 1 registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef()); - // install probe - final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath, - Messages.BucketStoreMessages.UpdateBucket.class); + List> addedRouteIds = createRouteIds(); - registry1.tell(getAddRouteMessage(), mockBroker.getRef()); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef()); // Bucket store should get an update bucket message. Updated bucket contains added rpc. - probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateBucket.class); + + Map buckets = retrieveBuckets(registry1, mockBroker, nodeAddress); + verifyBucket(buckets.get(nodeAddress), addedRouteIds); + + Map versions = retrieveVersions(registry1, mockBroker); + Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(), + versions.get(nodeAddress)); // Now remove rpc - registry1.tell(getRemoveRouteMessage(), mockBroker.getRef()); + registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef()); // Bucket store should get an update bucket message. Rpc is removed in the updated bucket - probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateBucket.class); + + verifyEmptyBucket(mockBroker, registry1, nodeAddress); System.out.println("testAddRemoveRpcOnSameNode ending"); @@ -136,30 +144,52 @@ public class RpcRegistryTest { System.out.println("testRpcAddRemoveInCluster starting"); final JavaTestKit mockBroker1 = new JavaTestKit(node1); + final JavaTestKit mockBroker2 = new JavaTestKit(node2); + + List> addedRouteIds = createRouteIds(); - // install probe on node2's bucket store - final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store"); - final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath, - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + Address node1Address = node1.provider().getDefaultAddress(); // Add rpc on node 1 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); - registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef()); // Bucket store on node2 should get a message to update its local copy of remote buckets - probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + + Map buckets = retrieveBuckets(registry2, mockBroker2, node1Address); + verifyBucket(buckets.get(node1Address), addedRouteIds); // Now remove - registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef()); + registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef()); - // Bucket store on node2 should get a message to update its local copy of remote buckets - probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + // Bucket store on node2 should get a message to update its local copy of remote buckets. + // Wait for the bucket for node1 to be empty. + + verifyEmptyBucket(mockBroker2, registry2, node1Address); System.out.println("testRpcAddRemoveInCluster ending"); } + private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address) + throws AssertionError { + Map buckets; + int nTries = 0; + while(true) { + buckets = retrieveBuckets(registry1, testKit, address); + + try { + verifyBucket(buckets.get(address), Collections.>emptyList()); + break; + } catch (AssertionError e) { + if(++nTries >= 50) { + throw e; + } + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + } + /** * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated. * @@ -174,76 +204,142 @@ public class RpcRegistryTest { registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef()); - // install probe on node 3 - final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store"); - final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath, - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); - // Add rpc on node 1 + List> addedRouteIds1 = createRouteIds(); registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); - registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); - - probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef()); - // Add same rpc on node 2 + // Add rpc on node 2 + List> addedRouteIds2 = createRouteIds(); registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef()); - registry2.tell(getAddRouteMessage(), mockBroker2.getRef()); + registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef()); + + Address node1Address = node1.provider().getDefaultAddress(); + Address node2Address = node2.provider().getDefaultAddress(); + + Map buckets = retrieveBuckets(registry3, mockBroker3, node1Address, + node2Address); - probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + verifyBucket(buckets.get(node1Address), addedRouteIds1); + verifyBucket(buckets.get(node2Address), addedRouteIds2); + + Map versions = retrieveVersions(registry3, mockBroker3); + Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(), + versions.get(node1Address)); + Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(), + versions.get(node2Address)); + + RouteIdentifier routeID = addedRouteIds1.get(0); + registry3.tell(new FindRouters(routeID), mockBroker3.getRef()); + + FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + FindRoutersReply.class); + + List> respList = reply.getRouterWithUpdateTime(); + Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size()); + + respList.get(0).first().tell("hello", ActorRef.noSender()); + mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello"); } - private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) - throws Exception { - final JavaTestKit probe = new JavaTestKit(node); - - ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate() { - @Override - public boolean apply(@Nullable Object input) { - if (input != null) { - return clazz.equals(input.getClass()); - } else { - return false; - } + private Map retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) { + bucketStore.tell(new GetBucketVersions(), testKit.getRef()); + GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + GetBucketVersionsReply.class); + return reply.getVersions(); + } + + private void verifyBucket(Bucket bucket, List> expRouteIds) { + RoutingTable table = bucket.getData(); + Assert.assertNotNull("Bucket RoutingTable is null", table); + for(RouteIdentifier r: expRouteIds) { + if(!table.contains(r)) { + Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table); } - }); + } - FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS); - Timeout timeout = new Timeout(duration); - int maxTries = 30; - int i = 0; - while(true) { - ActorSelection subject = node.actorSelection(subjectPath); - Future future = Patterns.ask(subject, conditionalProbe, timeout); + Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size()); + } - try { - Await.ready(future, duration); - break; - } catch (TimeoutException | InterruptedException e) { - if(++i > maxTries) { - throw e; + private Map retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit, + Address... addresses) { + int nTries = 0; + while(true) { + bucketStore.tell(new GetAllBuckets(), testKit.getRef()); + GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + GetAllBucketsReply.class); + + Map buckets = reply.getBuckets(); + boolean foundAll = true; + for(Address addr: addresses) { + Bucket bucket = buckets.get(addr); + if(bucket == null) { + foundAll = false; + break; } } - } - return probe; + if(foundAll) { + return buckets; + } - } + if(++nTries >= 50) { + Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses) + + ", Actual: " + buckets); + } - private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException { - return new AddOrUpdateRoutes(createRouteIds()); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } } - private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException { - return new RemoveRoutes(createRouteIds()); + @SuppressWarnings("unchecked") + @Test + public void testAddRoutesConcurrency() throws Exception { + final JavaTestKit testKit = new JavaTestKit(node1); + + registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender()); + + final int nRoutes = 500; + final RouteIdentifier[] added = new RouteIdentifier[nRoutes]; + for(int i = 0; i < nRoutes; i++) { + final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, + new QName(new URI("/mockrpc"), "type" + i), null); + added[i] = routeId; + + //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + registry1.tell(new AddOrUpdateRoutes(Arrays.>asList(routeId)), + ActorRef.noSender()); + } + + GetAllBuckets getAllBuckets = new GetAllBuckets(); + FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS); + int nTries = 0; + while(true) { + registry1.tell(getAllBuckets, testKit.getRef()); + GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class); + + Bucket localBucket = reply.getBuckets().values().iterator().next(); + RoutingTable table = localBucket.getData(); + if(table != null && table.size() == nRoutes) { + for(RouteIdentifier r: added) { + Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r)); + } + + break; + } + + if(++nTries >= 50) { + Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size()); + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } } private List> createRouteIds() throws URISyntaxException { - QName type = new QName(new URI("/mockrpc"), "mockrpc"); + QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++); List> routeIds = new ArrayList<>(); routeIds.add(new RouteIdentifierImpl(null, type, null)); return routeIds; } - } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index 78fcbd3a14..ddd08a5f47 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -12,27 +12,23 @@ import akka.actor.Address; import akka.actor.Props; import akka.testkit.TestActorRef; import com.typesafe.config.ConfigFactory; +import java.util.HashMap; +import java.util.Map; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.TerminationMonitor; -import java.util.HashMap; -import java.util.Map; - public class BucketStoreTest { private static ActorSystem system; - private static BucketStore store; @BeforeClass public static void setup() { system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); - - store = createStore(); } @AfterClass @@ -40,21 +36,6 @@ public class BucketStoreTest { system.shutdown(); } - /** - * Given a new local bucket - * Should replace - */ - @Test - public void testReceiveUpdateBucket(){ - Bucket bucket = new BucketImpl(); - Long expectedVersion = bucket.getVersion(); - - store.receiveUpdateBucket(bucket); - - Assert.assertEquals(bucket, store.getLocalBucket()); - Assert.assertEquals(expectedVersion, store.getLocalBucket().getVersion()); - } - /** * Given remote buckets * Should merge with local copy of remote buckets @@ -62,6 +43,8 @@ public class BucketStoreTest { @Test public void testReceiveUpdateRemoteBuckets(){ + BucketStore store = createStore(); + Address localAddress = system.provider().getDefaultAddress(); Bucket localBucket = new BucketImpl(); @@ -84,7 +67,7 @@ public class BucketStoreTest { //Should NOT contain local bucket //Should contain ONLY 3 entries i.e a1, a2, a3 - Map remoteBucketsInStore = store.getRemoteBuckets(); + Map> remoteBucketsInStore = store.getRemoteBuckets(); Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress)); Assert.assertTrue(remoteBucketsInStore.size() == 3); @@ -122,11 +105,9 @@ public class BucketStoreTest { Assert.assertTrue(remoteBucketsInStore.size() == 4); //Should update versions map - //versions map contains versions for all remote buckets (4) + local bucket - //so it should have total 5. + //versions map contains versions for all remote buckets (4). Map versionsInStore = store.getVersions(); - Assert.assertTrue(String.format("Expected:%s, Actual:%s", 5, versionsInStore.size()), - versionsInStore.size() == 5); + Assert.assertEquals(4, versionsInStore.size()); Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1)); Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2)); Assert.assertEquals(b3_new.getVersion(), versionsInStore.get(a3)); -- 2.36.6