testFindRpcByRoute(org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImplTest) Time elapsed: 0.98 sec <<< ERROR!
java.lang.IllegalStateException: Attempted to access local bucket before recovery completed
at com.google.common.base.Preconditions.checkState(Preconditions.java:501)
at org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getLocalBucket(BucketStoreActor.java:384)
at org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getLocalData(BucketStoreActor.java:110)
at org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl.findRpcByRoute(RemoteRpcRegistryMXBeanImpl.java:91)
at org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImplTest.testFindRpcByRoute(RemoteRpcRegistryMXBeanImplTest.java:142)
The problem is that the RemoteRpcRegistryMXBeanImpl access the enclosing
RpcRegistry Actor instance directly and violates actor encapsulation.
RemoteRpcRegistryMXBeanImpl should access the RpcRegistry via messages
sent to its ActorRef.
Change-Id: Icfd67c38e5d1bc3de283949207009d7aa34ab855
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
- this.mxBean = new RemoteRpcRegistryMXBeanImpl(this);
+ this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+ config.getAskDuration()), config.getAskDuration());
}
/**
package org.opendaylight.controller.remote.rpc.registry.gossip;
import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getBucketsByMembersMessage;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getLocalDataMessage;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getRemoteBucketsMessage;
import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.removeBucketMessage;
import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.updateRemoteBucketsMessage;
-import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.dispatch.OnComplete;
import akka.util.Timeout;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Consumer;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
/**
* Convenience access to {@link BucketStoreActor}. Used mostly by {@link Gossiper}.
@Beta
@VisibleForTesting
public final class BucketStoreAccess {
- private final ActorContext context;
+ private final ActorRef actorRef;
+ private final ExecutionContext dispatcher;
private final Timeout timeout;
- BucketStoreAccess(final ActorContext context, final Timeout timeout) {
- this.context = Preconditions.checkNotNull(context);
- this.timeout = Preconditions.checkNotNull(timeout);
+ public BucketStoreAccess(final ActorRef actorRef, final ExecutionContext dispatcher, final Timeout timeout) {
+ this.actorRef = Objects.requireNonNull(actorRef);
+ this.dispatcher = Objects.requireNonNull(dispatcher);
+ this.timeout = Objects.requireNonNull(timeout);
}
<T extends BucketData<T>> void getBucketsByMembers(final Collection<Address> members,
final Consumer<Map<Address, Bucket<T>>> callback) {
- Patterns.ask(context.parent(), getBucketsByMembersMessage(members), timeout)
+ Patterns.ask(actorRef, getBucketsByMembersMessage(members), timeout)
.onComplete(new OnComplete<Object>() {
@SuppressWarnings("unchecked")
@Override
callback.accept((Map<Address, Bucket<T>>) success);
}
}
- }, context.dispatcher());
+ }, dispatcher);
}
void getBucketVersions(final Consumer<Map<Address, Long>> callback) {
- Patterns.ask(context.parent(), Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete<Object>() {
+ Patterns.ask(actorRef, Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete<Object>() {
@SuppressWarnings("unchecked")
@Override
public void onComplete(final Throwable failure, final Object success) {
callback.accept((Map<Address, Long>) success);
}
}
- }, context.dispatcher());
+ }, dispatcher);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Future<Map<Address, Long>> getBucketVersions() {
+ return (Future) Patterns.ask(actorRef, Singletons.GET_BUCKET_VERSIONS, timeout);
}
@SuppressWarnings("unchecked")
void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
- context.parent().tell(updateRemoteBucketsMessage((Map<Address, Bucket<?>>) buckets), ActorRef.noSender());
+ actorRef.tell(updateRemoteBucketsMessage((Map<Address, Bucket<?>>) buckets), ActorRef.noSender());
}
void removeRemoteBucket(final Address addr) {
- context.parent().tell(removeBucketMessage(addr), ActorRef.noSender());
+ actorRef.tell(removeBucketMessage(addr), ActorRef.noSender());
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public <T extends BucketData<T>> Future<T> getLocalData() {
+ return (Future) Patterns.ask(actorRef, getLocalDataMessage(), timeout);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public <T extends BucketData<T>> Future<Map<Address, Bucket<T>>> getRemoteBuckets() {
+ return (Future) Patterns.ask(actorRef, getRemoteBucketsMessage(), timeout);
}
public enum Singletons {
return actor -> actor.updateRemoteBuckets(buckets);
}
+ static ExecuteInActor getLocalDataMessage() {
+ return actor -> actor.getSender().tell(actor.getLocalData(), actor.getSelf());
+ }
+
+ static ExecuteInActor getRemoteBucketsMessage() {
+ return actor -> actor.getSender().tell(ImmutableMap.copyOf(actor.getRemoteBuckets()), actor.getSelf());
+ }
+
public final T getLocalData() {
return getLocalBucket().getData();
}
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
+ bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration());
if (provider instanceof ClusterActorRefProvider) {
cluster = Cluster.get(getContext().system());
package org.opendaylight.controller.remote.rpc.registry.mbeans;
import akka.actor.Address;
+import akka.util.Timeout;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean {
private static final String NAME_CONSTANT = " | name:";
- private final RpcRegistry rpcRegistry;
+ private final BucketStoreAccess rpcRegistryAccess;
+ private final Timeout timeout;
- public RemoteRpcRegistryMXBeanImpl(final RpcRegistry rpcRegistry) {
+ public RemoteRpcRegistryMXBeanImpl(final BucketStoreAccess rpcRegistryAccess, Timeout timeout) {
super("RemoteRpcRegistry", "RemoteRpcBroker", null);
- this.rpcRegistry = rpcRegistry;
+ this.rpcRegistryAccess = rpcRegistryAccess;
+ this.timeout = timeout;
registerMBean();
}
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+ private RoutingTable getLocalData() {
+ try {
+ return (RoutingTable) Await.result((Future) rpcRegistryAccess.getLocalData(), timeout.duration());
+ } catch (Exception e) {
+ throw new RuntimeException("getLocalData failed", e);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+ private Map<Address, Bucket<RoutingTable>> getRemoteBuckets() {
+ try {
+ return (Map<Address, Bucket<RoutingTable>>) Await.result((Future)rpcRegistryAccess.getRemoteBuckets(),
+ timeout.duration());
+ } catch (Exception e) {
+ throw new RuntimeException("getRemoteBuckets failed", e);
+ }
+ }
+
@Override
public Set<String> getGlobalRpc() {
- RoutingTable table = rpcRegistry.getLocalData();
+ RoutingTable table = getLocalData();
Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
for (DOMRpcIdentifier route : table.getRoutes()) {
if (route.getContextReference().isEmpty()) {
@Override
public Set<String> getLocalRegisteredRoutedRpc() {
- RoutingTable table = rpcRegistry.getLocalData();
+ RoutingTable table = getLocalData();
Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
for (DOMRpcIdentifier route : table.getRoutes()) {
if (!route.getContextReference().isEmpty()) {
@Override
public Map<String, String> findRpcByName(final String name) {
- RoutingTable localTable = rpcRegistry.getLocalData();
+ RoutingTable localTable = getLocalData();
// Get all RPCs from local bucket
Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
// Get all RPCs from remote bucket
- Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+ Map<Address, Bucket<RoutingTable>> buckets = getRemoteBuckets();
for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
RoutingTable table = entry.getValue().getData();
rpcMap.putAll(getRpcMemberMapByName(table, name, entry.getKey().toString()));
@Override
public Map<String, String> findRpcByRoute(final String routeId) {
- RoutingTable localTable = rpcRegistry.getLocalData();
+ RoutingTable localTable = getLocalData();
Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
- Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+ Map<Address, Bucket<RoutingTable>> buckets = getRemoteBuckets();
for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
RoutingTable table = entry.getValue().getData();
rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, entry.getKey().toString()));
}
@Override
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
public String getBucketVersions() {
- return rpcRegistry.getVersions().toString();
+ try {
+ return Await.result((Future)rpcRegistryAccess.getBucketVersions(), timeout.duration()).toString();
+ } catch (Exception e) {
+ throw new RuntimeException("getVersions failed", e);
+ }
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import akka.dispatch.Dispatchers;
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
+import akka.util.Timeout;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
final TestKit invoker = new TestKit(system);
final TestKit registrar = new TestKit(system);
final TestKit supervisor = new TestKit(system);
- final Props props = RpcRegistry.props(config, invoker.getRef(), registrar.getRef());
+ final Props props = RpcRegistry.props(config, invoker.getRef(), registrar.getRef())
+ .withDispatcher(Dispatchers.DefaultDispatcherId());
testActor = new TestActorRef<>(system, props, supervisor.getRef(), "testActor");
- final RpcRegistry rpcRegistry = testActor.underlyingActor();
- mxBean = new RemoteRpcRegistryMXBeanImpl(rpcRegistry);
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ final Timeout timeout = Timeout.apply(10, TimeUnit.SECONDS);
+ mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(testActor, system.dispatcher(), timeout),
+ timeout);
}
@After