From: Tom Pantelis Date: Fri, 26 Jan 2018 05:48:41 +0000 (-0500) Subject: Fix intermittent RemoteRpcRegistryMXBeanImplTest failures X-Git-Tag: release/oxygen~19 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=10427641dc0a75ec62e78ecfc4a7a0a7d438d462 Fix intermittent RemoteRpcRegistryMXBeanImplTest failures testFindRpcByRoute(org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImplTest) Time elapsed: 0.98 sec <<< ERROR! java.lang.IllegalStateException: Attempted to access local bucket before recovery completed at com.google.common.base.Preconditions.checkState(Preconditions.java:501) at org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getLocalBucket(BucketStoreActor.java:384) at org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getLocalData(BucketStoreActor.java:110) at org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl.findRpcByRoute(RemoteRpcRegistryMXBeanImpl.java:91) at org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImplTest.testFindRpcByRoute(RemoteRpcRegistryMXBeanImplTest.java:142) The problem is that the RemoteRpcRegistryMXBeanImpl access the enclosing RpcRegistry Actor instance directly and violates actor encapsulation. RemoteRpcRegistryMXBeanImpl should access the RpcRegistry via messages sent to its ActorRef. Change-Id: Icfd67c38e5d1bc3de283949207009d7aa34ab855 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index fe43a691d2..cefe7525f1 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -28,6 +28,7 @@ import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddO import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor; import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl; @@ -45,7 +46,8 @@ public class RpcRegistry extends BucketStoreActor { public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of())); this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar); - this.mxBean = new RemoteRpcRegistryMXBeanImpl(this); + this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(), + config.getAskDuration()), config.getAskDuration()); } /** diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java index b0e1cde3e7..e06e5fb15b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java @@ -8,10 +8,11 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getBucketsByMembersMessage; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getLocalDataMessage; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getRemoteBucketsMessage; import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.removeBucketMessage; import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.updateRemoteBucketsMessage; -import akka.actor.ActorContext; import akka.actor.ActorRef; import akka.actor.Address; import akka.dispatch.OnComplete; @@ -19,10 +20,12 @@ import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.util.Collection; import java.util.Map; +import java.util.Objects; import java.util.function.Consumer; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; /** * Convenience access to {@link BucketStoreActor}. Used mostly by {@link Gossiper}. @@ -32,17 +35,19 @@ import java.util.function.Consumer; @Beta @VisibleForTesting public final class BucketStoreAccess { - private final ActorContext context; + private final ActorRef actorRef; + private final ExecutionContext dispatcher; private final Timeout timeout; - BucketStoreAccess(final ActorContext context, final Timeout timeout) { - this.context = Preconditions.checkNotNull(context); - this.timeout = Preconditions.checkNotNull(timeout); + public BucketStoreAccess(final ActorRef actorRef, final ExecutionContext dispatcher, final Timeout timeout) { + this.actorRef = Objects.requireNonNull(actorRef); + this.dispatcher = Objects.requireNonNull(dispatcher); + this.timeout = Objects.requireNonNull(timeout); } > void getBucketsByMembers(final Collection
members, final Consumer>> callback) { - Patterns.ask(context.parent(), getBucketsByMembersMessage(members), timeout) + Patterns.ask(actorRef, getBucketsByMembersMessage(members), timeout) .onComplete(new OnComplete() { @SuppressWarnings("unchecked") @Override @@ -51,11 +56,11 @@ public final class BucketStoreAccess { callback.accept((Map>) success); } } - }, context.dispatcher()); + }, dispatcher); } void getBucketVersions(final Consumer> callback) { - Patterns.ask(context.parent(), Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete() { + Patterns.ask(actorRef, Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete() { @SuppressWarnings("unchecked") @Override public void onComplete(final Throwable failure, final Object success) { @@ -63,16 +68,31 @@ public final class BucketStoreAccess { callback.accept((Map) success); } } - }, context.dispatcher()); + }, dispatcher); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Future> getBucketVersions() { + return (Future) Patterns.ask(actorRef, Singletons.GET_BUCKET_VERSIONS, timeout); } @SuppressWarnings("unchecked") void updateRemoteBuckets(final Map> buckets) { - context.parent().tell(updateRemoteBucketsMessage((Map>) buckets), ActorRef.noSender()); + actorRef.tell(updateRemoteBucketsMessage((Map>) buckets), ActorRef.noSender()); } void removeRemoteBucket(final Address addr) { - context.parent().tell(removeBucketMessage(addr), ActorRef.noSender()); + actorRef.tell(removeBucketMessage(addr), ActorRef.noSender()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public > Future getLocalData() { + return (Future) Patterns.ask(actorRef, getLocalDataMessage(), timeout); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public > Future>> getRemoteBuckets() { + return (Future) Patterns.ask(actorRef, getRemoteBucketsMessage(), timeout); } public enum Singletons { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java index 02e8770337..84a7042513 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java @@ -106,6 +106,14 @@ public abstract class BucketStoreActor> extends return actor -> actor.updateRemoteBuckets(buckets); } + static ExecuteInActor getLocalDataMessage() { + return actor -> actor.getSender().tell(actor.getLocalData(), actor.getSelf()); + } + + static ExecuteInActor getRemoteBucketsMessage() { + return actor -> actor.getSender().tell(ImmutableMap.copyOf(actor.getRemoteBuckets()), actor.getSelf()); + } + public final T getLocalData() { return getLocalBucket().getData(); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 275200f8d5..2a4e3b7f93 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -106,7 +106,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration()); + bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration()); if (provider instanceof ClusterActorRefProvider) { cluster = Cluster.get(getContext().system()); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java index 58abcac0d5..94a8d4a3e8 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.remote.rpc.registry.mbeans; import akka.actor.Address; +import akka.util.Timeout; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -17,10 +18,12 @@ import java.util.Set; import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.remote.rpc.registry.RoutingTable; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean { @@ -33,17 +36,38 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot private static final String NAME_CONSTANT = " | name:"; - private final RpcRegistry rpcRegistry; + private final BucketStoreAccess rpcRegistryAccess; + private final Timeout timeout; - public RemoteRpcRegistryMXBeanImpl(final RpcRegistry rpcRegistry) { + public RemoteRpcRegistryMXBeanImpl(final BucketStoreAccess rpcRegistryAccess, Timeout timeout) { super("RemoteRpcRegistry", "RemoteRpcBroker", null); - this.rpcRegistry = rpcRegistry; + this.rpcRegistryAccess = rpcRegistryAccess; + this.timeout = timeout; registerMBean(); } + @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"}) + private RoutingTable getLocalData() { + try { + return (RoutingTable) Await.result((Future) rpcRegistryAccess.getLocalData(), timeout.duration()); + } catch (Exception e) { + throw new RuntimeException("getLocalData failed", e); + } + } + + @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"}) + private Map> getRemoteBuckets() { + try { + return (Map>) Await.result((Future)rpcRegistryAccess.getRemoteBuckets(), + timeout.duration()); + } catch (Exception e) { + throw new RuntimeException("getRemoteBuckets failed", e); + } + } + @Override public Set getGlobalRpc() { - RoutingTable table = rpcRegistry.getLocalData(); + RoutingTable table = getLocalData(); Set globalRpc = new HashSet<>(table.getRoutes().size()); for (DOMRpcIdentifier route : table.getRoutes()) { if (route.getContextReference().isEmpty()) { @@ -57,7 +81,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot @Override public Set getLocalRegisteredRoutedRpc() { - RoutingTable table = rpcRegistry.getLocalData(); + RoutingTable table = getLocalData(); Set routedRpc = new HashSet<>(table.getRoutes().size()); for (DOMRpcIdentifier route : table.getRoutes()) { if (!route.getContextReference().isEmpty()) { @@ -71,12 +95,12 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot @Override public Map findRpcByName(final String name) { - RoutingTable localTable = rpcRegistry.getLocalData(); + RoutingTable localTable = getLocalData(); // Get all RPCs from local bucket Map rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT)); // Get all RPCs from remote bucket - Map> buckets = rpcRegistry.getRemoteBuckets(); + Map> buckets = getRemoteBuckets(); for (Entry> entry : buckets.entrySet()) { RoutingTable table = entry.getValue().getData(); rpcMap.putAll(getRpcMemberMapByName(table, name, entry.getKey().toString())); @@ -88,10 +112,10 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot @Override public Map findRpcByRoute(final String routeId) { - RoutingTable localTable = rpcRegistry.getLocalData(); + RoutingTable localTable = getLocalData(); Map rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT)); - Map> buckets = rpcRegistry.getRemoteBuckets(); + Map> buckets = getRemoteBuckets(); for (Entry> entry : buckets.entrySet()) { RoutingTable table = entry.getValue().getData(); rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, entry.getKey().toString())); @@ -138,7 +162,12 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot } @Override + @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"}) public String getBucketVersions() { - return rpcRegistry.getVersions().toString(); + try { + return Await.result((Future)rpcRegistryAccess.getBucketVersions(), timeout.duration()).toString(); + } catch (Exception e) { + throw new RuntimeException("getVersions failed", e); + } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImplTest.java index 8b23154232..dea930bf85 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImplTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImplTest.java @@ -10,10 +10,11 @@ package org.opendaylight.controller.remote.rpc.registry.mbeans; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.dispatch.Dispatchers; import akka.testkit.TestActorRef; import akka.testkit.javadsl.TestKit; +import akka.util.Timeout; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collections; import java.util.List; import java.util.Map; @@ -26,6 +27,7 @@ import org.junit.Test; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaPath; @@ -56,12 +58,13 @@ public class RemoteRpcRegistryMXBeanImplTest { final TestKit invoker = new TestKit(system); final TestKit registrar = new TestKit(system); final TestKit supervisor = new TestKit(system); - final Props props = RpcRegistry.props(config, invoker.getRef(), registrar.getRef()); + final Props props = RpcRegistry.props(config, invoker.getRef(), registrar.getRef()) + .withDispatcher(Dispatchers.DefaultDispatcherId()); testActor = new TestActorRef<>(system, props, supervisor.getRef(), "testActor"); - final RpcRegistry rpcRegistry = testActor.underlyingActor(); - mxBean = new RemoteRpcRegistryMXBeanImpl(rpcRegistry); - Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + final Timeout timeout = Timeout.apply(10, TimeUnit.SECONDS); + mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(testActor, system.dispatcher(), timeout), + timeout); } @After