Fix intermittent RemoteRpcRegistryMXBeanImplTest failures 93/67593/4
authorTom Pantelis <tompantelis@gmail.com>
Fri, 26 Jan 2018 05:48:41 +0000 (00:48 -0500)
committerTom Pantelis <tompantelis@gmail.com>
Sat, 27 Jan 2018 15:47:15 +0000 (15:47 +0000)
testFindRpcByRoute(org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImplTest)  Time elapsed: 0.98 sec  <<< ERROR!
java.lang.IllegalStateException: Attempted to access local bucket before recovery completed
at com.google.common.base.Preconditions.checkState(Preconditions.java:501)
at org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getLocalBucket(BucketStoreActor.java:384)
at org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getLocalData(BucketStoreActor.java:110)
at org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl.findRpcByRoute(RemoteRpcRegistryMXBeanImpl.java:91)
at org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImplTest.testFindRpcByRoute(RemoteRpcRegistryMXBeanImplTest.java:142)

The problem is that the RemoteRpcRegistryMXBeanImpl access the enclosing
RpcRegistry Actor instance directly and violates actor encapsulation.
RemoteRpcRegistryMXBeanImpl should access the RpcRegistry via messages
sent to its ActorRef.

Change-Id: Icfd67c38e5d1bc3de283949207009d7aa34ab855
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImplTest.java

index fe43a69..cefe752 100644 (file)
@@ -28,6 +28,7 @@ import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddO
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
 
@@ -45,7 +46,8 @@ public class RpcRegistry extends BucketStoreActor<RoutingTable> {
     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
         super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
-        this.mxBean = new RemoteRpcRegistryMXBeanImpl(this);
+        this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+                config.getAskDuration()), config.getAskDuration());
     }
 
     /**
index b0e1cde..e06e5fb 100644 (file)
@@ -8,10 +8,11 @@
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getBucketsByMembersMessage;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getLocalDataMessage;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getRemoteBucketsMessage;
 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.removeBucketMessage;
 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.updateRemoteBucketsMessage;
 
-import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.dispatch.OnComplete;
@@ -19,10 +20,12 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Consumer;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
 
 /**
  * Convenience access to {@link BucketStoreActor}. Used mostly by {@link Gossiper}.
@@ -32,17 +35,19 @@ import java.util.function.Consumer;
 @Beta
 @VisibleForTesting
 public final class BucketStoreAccess {
-    private final ActorContext context;
+    private final ActorRef actorRef;
+    private final ExecutionContext dispatcher;
     private final Timeout timeout;
 
-    BucketStoreAccess(final ActorContext context, final Timeout timeout) {
-        this.context = Preconditions.checkNotNull(context);
-        this.timeout = Preconditions.checkNotNull(timeout);
+    public BucketStoreAccess(final ActorRef actorRef, final ExecutionContext dispatcher, final Timeout timeout) {
+        this.actorRef = Objects.requireNonNull(actorRef);
+        this.dispatcher = Objects.requireNonNull(dispatcher);
+        this.timeout = Objects.requireNonNull(timeout);
     }
 
     <T extends BucketData<T>> void getBucketsByMembers(final Collection<Address> members,
             final Consumer<Map<Address, Bucket<T>>> callback) {
-        Patterns.ask(context.parent(), getBucketsByMembersMessage(members), timeout)
+        Patterns.ask(actorRef, getBucketsByMembersMessage(members), timeout)
             .onComplete(new OnComplete<Object>() {
                 @SuppressWarnings("unchecked")
                 @Override
@@ -51,11 +56,11 @@ public final class BucketStoreAccess {
                         callback.accept((Map<Address, Bucket<T>>) success);
                     }
                 }
-            }, context.dispatcher());
+            }, dispatcher);
     }
 
     void getBucketVersions(final Consumer<Map<Address, Long>> callback) {
-        Patterns.ask(context.parent(), Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete<Object>() {
+        Patterns.ask(actorRef, Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete<Object>() {
             @SuppressWarnings("unchecked")
             @Override
             public void onComplete(final Throwable failure, final Object success) {
@@ -63,16 +68,31 @@ public final class BucketStoreAccess {
                     callback.accept((Map<Address, Long>) success);
                 }
             }
-        }, context.dispatcher());
+        }, dispatcher);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public Future<Map<Address, Long>> getBucketVersions() {
+        return (Future) Patterns.ask(actorRef, Singletons.GET_BUCKET_VERSIONS, timeout);
     }
 
     @SuppressWarnings("unchecked")
     void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
-        context.parent().tell(updateRemoteBucketsMessage((Map<Address, Bucket<?>>) buckets), ActorRef.noSender());
+        actorRef.tell(updateRemoteBucketsMessage((Map<Address, Bucket<?>>) buckets), ActorRef.noSender());
     }
 
     void removeRemoteBucket(final Address addr) {
-        context.parent().tell(removeBucketMessage(addr), ActorRef.noSender());
+        actorRef.tell(removeBucketMessage(addr), ActorRef.noSender());
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public <T extends BucketData<T>> Future<T> getLocalData() {
+        return (Future) Patterns.ask(actorRef, getLocalDataMessage(), timeout);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public <T extends BucketData<T>> Future<Map<Address, Bucket<T>>> getRemoteBuckets() {
+        return (Future) Patterns.ask(actorRef, getRemoteBucketsMessage(), timeout);
     }
 
     public enum Singletons {
index 02e8770..84a7042 100644 (file)
@@ -106,6 +106,14 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
         return actor -> actor.updateRemoteBuckets(buckets);
     }
 
+    static ExecuteInActor getLocalDataMessage() {
+        return actor -> actor.getSender().tell(actor.getLocalData(), actor.getSelf());
+    }
+
+    static ExecuteInActor getRemoteBucketsMessage() {
+        return actor -> actor.getSender().tell(ImmutableMap.copyOf(actor.getRemoteBuckets()), actor.getSelf());
+    }
+
     public final T getLocalData() {
         return getLocalBucket().getData();
     }
index 275200f..2a4e3b7 100644 (file)
@@ -106,7 +106,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
-        bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
+        bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration());
 
         if (provider instanceof ClusterActorRefProvider) {
             cluster = Cluster.get(getContext().system());
index 58abcac..94a8d4a 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.remote.rpc.registry.mbeans;
 
 import akka.actor.Address;
+import akka.util.Timeout;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -17,10 +18,12 @@ import java.util.Set;
 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 
 
 public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean {
@@ -33,17 +36,38 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     private static final String NAME_CONSTANT = " | name:";
 
-    private final RpcRegistry rpcRegistry;
+    private final BucketStoreAccess rpcRegistryAccess;
+    private final Timeout timeout;
 
-    public RemoteRpcRegistryMXBeanImpl(final RpcRegistry rpcRegistry) {
+    public RemoteRpcRegistryMXBeanImpl(final BucketStoreAccess rpcRegistryAccess, Timeout timeout) {
         super("RemoteRpcRegistry", "RemoteRpcBroker", null);
-        this.rpcRegistry = rpcRegistry;
+        this.rpcRegistryAccess = rpcRegistryAccess;
+        this.timeout = timeout;
         registerMBean();
     }
 
+    @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+    private RoutingTable getLocalData() {
+        try {
+            return (RoutingTable) Await.result((Future) rpcRegistryAccess.getLocalData(), timeout.duration());
+        } catch (Exception e) {
+            throw new RuntimeException("getLocalData failed", e);
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+    private Map<Address, Bucket<RoutingTable>> getRemoteBuckets() {
+        try {
+            return (Map<Address, Bucket<RoutingTable>>) Await.result((Future)rpcRegistryAccess.getRemoteBuckets(),
+                    timeout.duration());
+        } catch (Exception e) {
+            throw new RuntimeException("getRemoteBuckets failed", e);
+        }
+    }
+
     @Override
     public Set<String> getGlobalRpc() {
-        RoutingTable table = rpcRegistry.getLocalData();
+        RoutingTable table = getLocalData();
         Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
         for (DOMRpcIdentifier route : table.getRoutes()) {
             if (route.getContextReference().isEmpty()) {
@@ -57,7 +81,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Set<String> getLocalRegisteredRoutedRpc() {
-        RoutingTable table = rpcRegistry.getLocalData();
+        RoutingTable table = getLocalData();
         Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
         for (DOMRpcIdentifier route : table.getRoutes()) {
             if (!route.getContextReference().isEmpty()) {
@@ -71,12 +95,12 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Map<String, String> findRpcByName(final String name) {
-        RoutingTable localTable = rpcRegistry.getLocalData();
+        RoutingTable localTable = getLocalData();
         // Get all RPCs from local bucket
         Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
 
         // Get all RPCs from remote bucket
-        Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+        Map<Address, Bucket<RoutingTable>> buckets = getRemoteBuckets();
         for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
             RoutingTable table = entry.getValue().getData();
             rpcMap.putAll(getRpcMemberMapByName(table, name, entry.getKey().toString()));
@@ -88,10 +112,10 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Map<String, String> findRpcByRoute(final String routeId) {
-        RoutingTable localTable = rpcRegistry.getLocalData();
+        RoutingTable localTable = getLocalData();
         Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
 
-        Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+        Map<Address, Bucket<RoutingTable>> buckets = getRemoteBuckets();
         for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
             RoutingTable table = entry.getValue().getData();
             rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, entry.getKey().toString()));
@@ -138,7 +162,12 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     }
 
     @Override
+    @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
     public String getBucketVersions() {
-        return rpcRegistry.getVersions().toString();
+        try {
+            return Await.result((Future)rpcRegistryAccess.getBucketVersions(), timeout.duration()).toString();
+        } catch (Exception e) {
+            throw new RuntimeException("getVersions failed", e);
+        }
     }
 }
index 8b23154..dea930b 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.controller.remote.rpc.registry.mbeans;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.dispatch.Dispatchers;
 import akka.testkit.TestActorRef;
 import akka.testkit.javadsl.TestKit;
+import akka.util.Timeout;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -26,6 +27,7 @@ import org.junit.Test;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
@@ -56,12 +58,13 @@ public class RemoteRpcRegistryMXBeanImplTest {
         final TestKit invoker = new TestKit(system);
         final TestKit registrar = new TestKit(system);
         final TestKit supervisor = new TestKit(system);
-        final Props props = RpcRegistry.props(config, invoker.getRef(), registrar.getRef());
+        final Props props = RpcRegistry.props(config, invoker.getRef(), registrar.getRef())
+                .withDispatcher(Dispatchers.DefaultDispatcherId());
         testActor = new TestActorRef<>(system, props, supervisor.getRef(), "testActor");
-        final RpcRegistry rpcRegistry = testActor.underlyingActor();
 
-        mxBean = new RemoteRpcRegistryMXBeanImpl(rpcRegistry);
-        Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        final Timeout timeout = Timeout.apply(10, TimeUnit.SECONDS);
+        mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(testActor, system.dispatcher(), timeout),
+                timeout);
     }
 
     @After

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.