BUG-3128: rework sal-remoterpc-connector 88/50488/26
authorRobert Varga <rovarga@cisco.com>
Fri, 13 Jan 2017 17:30:22 +0000 (18:30 +0100)
committerRobert Varga <rovarga@cisco.com>
Mon, 30 Jan 2017 13:52:44 +0000 (14:52 +0100)
This patch reworks the implementation to take advantage
of the services provided by DOMRpcService, notifying us
of locally-available services.

Previously we have registered all routed RPCs known in
the SchemaContext for global routing context, which has
causes lookups for routed RPCs not otherwise bound to
call back to the remote connector, which then performed
a router lookup.

This approach is slow as each RPC invocation incurs
an additional round-trip to the RpcRegistry to lookup
the appropriate router before the request is sent to it.
It also does not work for global RPCs, because they
only ever have a global context -- hence the routing
decision needs to be made solely in DOMRpcService.

With this patch we maintain a single higher-cost
implementation registered towards each remote node,
handling RPCs discovered via gossiping RoutingTables.
The implementation dispatches requests directly to that
node's RpcInvoker (formerly known as RpcBroker). That
way DOMRpcService will perform internal routing based
on cost and invoke our service only if there are no
local implementations registered.

RpcRegistry no longer performs delayed router discovery
and instead dispatches RoutingTable bucket updates to
a new actor, RpcRegistrator, whose sole job is to maintain
registrations of RemoteRpcImplementation instances for
each remote node with DOMRpcProviderService.

Because of DOMRpcService's ability to filter registration
notifications, our RpcListener will never be notified
of our registrations, which precludes routing loops. We
can therefore remote RemoteRpcInput, whose sole purpose
was to act as a loop detector.

Futher cleanup is done to RpcManager and RemoteRpcProvider
lifecycle, as these now correctly terminate their children
and remove registrations on both restarts and shutdowns.
RpcManager's children startup is also moved from the
constructor to preStart(), so as correctly plug into
the actor lifecycle.

Gossiper is updated to forward node removals to the BucketStore,
so that buckets from unreachable nodes are removed as soon
as possible.

BucketStore is updated to pass down changed buckets
to the subclass whenever a bucket is removed or updated.
It also requires the subclass to provide the initial bucket
data item -- which makes it obvious that a bucket's data
can never be null. This was previously achieved by sending
updating the bucket data from the subclass constructor.

BucketStore/Gossiper messages are updated to be immutable,
which simplifies their instantiation and ensures that they
do not contain nulls (which is required anyway).

Change-Id: I4efb3ddd8ea46ae5be1eb59f1d4fe508f2bc5763
Signed-off-by: Robert Varga <rovarga@cisco.com>
24 files changed:
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcInvoker.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcRegistrar.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/UpdateSchemaContext.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java

index 8d2cefb..067386e 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -8,81 +8,45 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-import static akka.pattern.Patterns.ask;
-
 import akka.actor.ActorRef;
-import akka.dispatch.OnComplete;
-import akka.japi.Pair;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import java.util.List;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
-import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
 
-public class RemoteRpcImplementation implements DOMRpcImplementation {
-    private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
+/**
+ * A {@link DOMRpcImplementation} which routes invocation requests to a remote invoker actor.
+ *
+ * @author Robert Varga
+ */
+final class RemoteRpcImplementation implements DOMRpcImplementation {
+    // 0 for local, 1 for binding, 2 for remote
+    private static final long COST = 2;
 
-    private final ActorRef rpcRegistry;
-    private final RemoteRpcProviderConfig config;
+    private final ActorRef remoteInvoker;
+    private final Timeout askDuration;
 
-    public RemoteRpcImplementation(final ActorRef rpcRegistry, final RemoteRpcProviderConfig config) {
-        this.config = config;
-        this.rpcRegistry = rpcRegistry;
+    RemoteRpcImplementation(final ActorRef remoteInvoker, final RemoteRpcProviderConfig config) {
+        this.remoteInvoker = Preconditions.checkNotNull(remoteInvoker);
+        this.askDuration = config.getAskDuration();
     }
 
     @Override
     public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final DOMRpcIdentifier rpc,
             final NormalizedNode<?, ?> input) {
-        if (input instanceof RemoteRpcInput) {
-            LOG.warn("Rpc {} was removed during execution or there is loop present. Failing received rpc.", rpc);
-            return Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(
-                    new DOMRpcImplementationNotAvailableException(
-                            "Rpc implementation for %s was removed during processing.", rpc));
-        }
-        final RemoteDOMRpcFuture frontEndFuture = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent());
-        findRouteAsync(rpc).onComplete(new OnComplete<FindRoutersReply>() {
-
-            @Override
-            public void onComplete(final Throwable error, final FindRoutersReply routes) throws Throwable {
-                if (error != null) {
-                    frontEndFuture.failNow(error);
-                } else {
-                    final List<Pair<ActorRef, Long>> routePairs = routes.getRouterWithUpdateTime();
-                    if (routePairs == null || routePairs.isEmpty()) {
-                        frontEndFuture.failNow(new DOMRpcImplementationNotAvailableException(
-                                "No local or remote implementation available for rpc %s", rpc.getType()));
-                    } else {
-                        final ActorRef remoteImplRef = new LatestEntryRoutingLogic(routePairs).select();
-                        final Object executeRpcMessage = ExecuteRpc.from(rpc, input);
-                        LOG.debug("Found remote actor {} for rpc {} - sending {}", remoteImplRef, rpc.getType(),
-                                executeRpcMessage);
-                        frontEndFuture.completeWith(ask(remoteImplRef, executeRpcMessage, config.getAskDuration()));
-                    }
-                }
-            }
-        }, ExecutionContext.Implicits$.MODULE$.global());
-        return frontEndFuture;
+        final RemoteDOMRpcFuture ret = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent());
+        ret.completeWith(Patterns.ask(remoteInvoker, ExecuteRpc.from(rpc, input), askDuration));
+        return ret;
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private Future<FindRoutersReply> findRouteAsync(final DOMRpcIdentifier rpc) {
-        // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly.
-        final RpcRouter.RouteIdentifier<?, ?, ?> routeId =
-                new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
-        final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
-        return (Future) ask(rpcRegistry, findMsg, config.getAskDuration());
+    @Override
+    public long invocationCost() {
+        return COST;
     }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java
deleted file mode 100644 (file)
index 59528fd..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import java.util.Collection;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-class RemoteRpcInput implements ContainerNode {
-
-    private final ContainerNode delegate;
-
-    private RemoteRpcInput(final ContainerNode delegate) {
-        this.delegate = delegate;
-    }
-
-    protected static RemoteRpcInput from(@Nullable final NormalizedNode<?, ?> node) {
-        if (node == null) {
-            return null;
-        }
-
-        Preconditions.checkArgument(node instanceof ContainerNode);
-        return new RemoteRpcInput((ContainerNode) node);
-    }
-
-    ContainerNode delegate() {
-        return delegate;
-    }
-
-    @Override
-    public Map<QName, String> getAttributes() {
-        return delegate().getAttributes();
-    }
-
-    @Override
-    public Object getAttributeValue(final QName name) {
-        return delegate().getAttributeValue(name);
-    }
-
-    @Override
-    public QName getNodeType() {
-        return delegate().getNodeType();
-    }
-
-    @Override
-    public Collection<DataContainerChild<? extends PathArgument, ?>> getValue() {
-        return delegate().getValue();
-    }
-
-    @Override
-    public NodeIdentifier getIdentifier() {
-        return delegate().getIdentifier();
-    }
-
-    @Override
-    public Optional<DataContainerChild<? extends PathArgument, ?>> getChild(final PathArgument child) {
-        return delegate().getChild(child);
-    }
-
-    @Override
-    public int hashCode() {
-        return delegate().hashCode();
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        return delegate().equals(obj);
-    }
-}
index 52f803d..a0e1f87 100644 (file)
@@ -8,20 +8,17 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.util.Collection;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.Provider;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,19 +26,17 @@ import org.slf4j.LoggerFactory;
  * This is the base class which initialize all the actors, listeners and
  * default RPc implementation so remote invocation of rpcs.
  */
-public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContextListener {
+public class RemoteRpcProvider implements AutoCloseable, Provider {
 
     private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
 
     private final DOMRpcProviderService rpcProvisionRegistry;
-
-    private ListenerRegistration<SchemaContextListener> schemaListenerRegistration;
+    private final RemoteRpcProviderConfig config;
     private final ActorSystem actorSystem;
-    private SchemaService schemaService;
+
     private DOMRpcService rpcService;
-    private SchemaContext schemaContext;
+    private SchemaService schemaService;
     private ActorRef rpcManager;
-    private final RemoteRpcProviderConfig config;
 
     public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry,
             final RemoteRpcProviderConfig config) {
@@ -50,47 +45,38 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext
         this.config = Preconditions.checkNotNull(config);
     }
 
-    public void setRpcService(DOMRpcService rpcService) {
+    public void setRpcService(final DOMRpcService rpcService) {
         this.rpcService = rpcService;
     }
 
-    public void setSchemaService(SchemaService schemaService) {
+    public void setSchemaService(final SchemaService schemaService) {
         this.schemaService = schemaService;
     }
 
     @Override
-    public void close() throws Exception {
-        if (schemaListenerRegistration != null) {
-            schemaListenerRegistration.close();
-            schemaListenerRegistration = null;
+    public void close() {
+        if (rpcManager != null) {
+            LOG.info("Stopping RPC Manager at {}", rpcManager);
+            rpcManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            rpcManager = null;
         }
     }
 
     @Override
     public void onSessionInitiated(final Broker.ProviderSession session) {
-        schemaService = session.getService(SchemaService.class);
         rpcService = session.getService(DOMRpcService.class);
         start();
     }
 
     @Override
     public Collection<ProviderFunctionality> getProviderFunctionality() {
-        return null;
+        return ImmutableSet.of();
     }
 
     public void start() {
-        LOG.info("Starting remote rpc service...");
-
-        schemaContext = schemaService.getGlobalContext();
-        rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, rpcProvisionRegistry, rpcService, config),
+        LOG.info("Starting Remote RPC service...");
+        rpcManager = actorSystem.actorOf(RpcManager.props(rpcProvisionRegistry, rpcService, config),
                 config.getRpcManagerName());
-        schemaListenerRegistration = schemaService.registerSchemaContextListener(this);
-        LOG.debug("rpc manager started");
-    }
-
-    @Override
-    public void onGlobalContextUpdated(final SchemaContext newSchemaContext) {
-        this.schemaContext = newSchemaContext;
-        rpcManager.tell(new UpdateSchemaContext(newSchemaContext), null);
+        LOG.debug("RPC Manager started at {}", rpcManager);
     }
 }
index b08df44..c1846b8 100644 (file)
@@ -17,6 +17,7 @@ import scala.concurrent.duration.FiniteDuration;
 public class RemoteRpcProviderConfig extends CommonConfig {
 
     protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name";
+    protected static final String TAG_RPC_REGISTRAR_NAME = "rpc-registrar-name";
     protected static final String TAG_RPC_REGISTRY_NAME = "registry-name";
     protected static final String TAG_RPC_MGR_NAME = "rpc-manager-name";
     protected static final String TAG_RPC_BROKER_PATH = "rpc-broker-path";
@@ -29,7 +30,7 @@ public class RemoteRpcProviderConfig extends CommonConfig {
     private Timeout cachedAskDuration;
     private FiniteDuration cachedGossipTickInterval;
 
-    public RemoteRpcProviderConfig(Config config) {
+    public RemoteRpcProviderConfig(final Config config) {
         super(config);
     }
 
@@ -37,6 +38,10 @@ public class RemoteRpcProviderConfig extends CommonConfig {
         return get().getString(TAG_RPC_BROKER_NAME);
     }
 
+    public String getRpcRegistrarName() {
+        return get().getString(TAG_RPC_REGISTRAR_NAME);
+    }
+
     public String getRpcRegistryName() {
         return get().getString(TAG_RPC_REGISTRY_NAME);
     }
@@ -87,19 +92,20 @@ public class RemoteRpcProviderConfig extends CommonConfig {
             justification = "Findbugs flags this as an unconfirmed cast of return value but the build method clearly "
                 + "returns RemoteRpcProviderConfig. Perhaps it's confused b/c the build method is overloaded and "
                 + "and differs in return type from the base class.")
-    public static RemoteRpcProviderConfig newInstance(String actorSystemName, boolean metricCaptureEnabled,
-            int mailboxCapacity) {
+    public static RemoteRpcProviderConfig newInstance(final String actorSystemName, final boolean metricCaptureEnabled,
+            final int mailboxCapacity) {
         return new Builder(actorSystemName).metricCaptureEnabled(metricCaptureEnabled)
                 .mailboxCapacity(mailboxCapacity).build();
     }
 
     public static class Builder extends CommonConfig.Builder<Builder> {
 
-        public Builder(String actorSystemName) {
+        public Builder(final String actorSystemName) {
             super(actorSystemName);
 
             //Actor names
             configHolder.put(TAG_RPC_BROKER_NAME, "broker");
+            configHolder.put(TAG_RPC_REGISTRAR_NAME, "registrar");
             configHolder.put(TAG_RPC_REGISTRY_NAME, "registry");
             configHolder.put(TAG_RPC_MGR_NAME, "rpc");
 
@@ -114,7 +120,7 @@ public class RemoteRpcProviderConfig extends CommonConfig {
 
         }
 
-        public Builder gossipTickInterval(String interval) {
+        public Builder gossipTickInterval(final String interval) {
             configHolder.put(TAG_GOSSIP_TICK_INTERVAL, interval);
             return this;
         }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
deleted file mode 100644 (file)
index 3d870c9..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
-/**
- * Actor to initiate execution of remote RPC on other nodes of the cluster.
- */
-
-public class RpcBroker extends AbstractUntypedActor {
-    private final DOMRpcService rpcService;
-
-    private RpcBroker(final DOMRpcService rpcService) {
-        this.rpcService = rpcService;
-    }
-
-    public static Props props(final DOMRpcService rpcService) {
-        Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
-        return Props.create(RpcBroker.class, rpcService);
-    }
-
-    @Override
-    protected void handleReceive(final Object message) throws Exception {
-        if (message instanceof ExecuteRpc) {
-            executeRpc((ExecuteRpc) message);
-        }
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void executeRpc(final ExecuteRpc msg) {
-        LOG.debug("Executing rpc {}", msg.getRpc());
-        final NormalizedNode<?, ?> input = RemoteRpcInput.from(msg.getInputNormalizedNode());
-        final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
-        final ActorRef sender = getSender();
-        final ActorRef self = self();
-
-        try {
-            final CheckedFuture<DOMRpcResult, DOMRpcException> future = rpcService.invokeRpc(schemaPath, input);
-
-            Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
-                @Override
-                public void onSuccess(final DOMRpcResult result) {
-                    if (result == null) {
-                        // This shouldn't happen but the FutureCallback annotates the result param with Nullable so
-                        // handle null here to avoid FindBugs warning.
-                        LOG.debug("Got null DOMRpcResult - sending null response for execute rpc : {}", msg.getRpc());
-                        sender.tell(new RpcResponse(null), self);
-                        return;
-                    }
-
-                    if (!result.getErrors().isEmpty()) {
-                        final String message = String.format("Execution of RPC %s failed", msg.getRpc());
-                        sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message,
-                                result.getErrors())), self);
-                    } else {
-                        LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
-
-                        sender.tell(new RpcResponse(result.getResult()), self);
-                    }
-                }
-
-                @Override
-                public void onFailure(final Throwable failure) {
-                    LOG.error(
-                        "executeRpc for {} failed with root cause: {}. For exception details, enable Debug logging.",
-                        msg.getRpc(), Throwables.getRootCause(failure));
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Detailed exception for execute RPC failure :{}", failure);
-                    }
-                    sender.tell(new akka.actor.Status.Failure(failure), self);
-                }
-            });
-        } catch (final RuntimeException e) {
-            sender.tell(new akka.actor.Status.Failure(e), sender);
-        }
-    }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcInvoker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcInvoker.java
new file mode 100644 (file)
index 0000000..7bb4b93
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Actor receiving invocation requests from remote nodes, routing them to
+ * {@link DOMRpcService#invokeRpc(SchemaPath, NormalizedNode)}.
+ */
+final class RpcInvoker extends AbstractUntypedActor {
+    private final DOMRpcService rpcService;
+
+    private RpcInvoker(final DOMRpcService rpcService) {
+        this.rpcService = Preconditions.checkNotNull(rpcService);
+    }
+
+    public static Props props(final DOMRpcService rpcService) {
+        Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
+        return Props.create(RpcInvoker.class, rpcService);
+    }
+
+    @Override
+    protected void handleReceive(final Object message) {
+        if (message instanceof ExecuteRpc) {
+            executeRpc((ExecuteRpc) message);
+        } else {
+            unknownMessage(message);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void executeRpc(final ExecuteRpc msg) {
+        LOG.debug("Executing rpc {}", msg.getRpc());
+        final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
+        final ActorRef sender = getSender();
+        final ActorRef self = self();
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> future;
+        try {
+            future = rpcService.invokeRpc(schemaPath, msg.getInputNormalizedNode());
+        } catch (final RuntimeException e) {
+            LOG.debug("Failed to invoke RPC {}", msg.getRpc(), e);
+            sender.tell(new akka.actor.Status.Failure(e), sender);
+            return;
+        }
+
+        Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
+            @Override
+            public void onSuccess(final DOMRpcResult result) {
+                if (result == null) {
+                    // This shouldn't happen but the FutureCallback annotates the result param with Nullable so
+                    // handle null here to avoid FindBugs warning.
+                    LOG.debug("Got null DOMRpcResult - sending null response for execute rpc : {}", msg.getRpc());
+                    sender.tell(new RpcResponse(null), self);
+                    return;
+                }
+
+                if (!result.getErrors().isEmpty()) {
+                    final String message = String.format("Execution of RPC %s failed", msg.getRpc());
+                    sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, result.getErrors())),
+                        self);
+                } else {
+                    LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
+                    sender.tell(new RpcResponse(result.getResult()), self);
+                }
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                LOG.debug("Failed to execute RPC {}", msg.getRpc(), failure);
+                LOG.error("Failed to execute RPC {} due to {}. More details are available on DEBUG level.",
+                    msg.getRpc(), Throwables.getRootCause(failure));
+                sender.tell(new akka.actor.Status.Failure(failure), self);
+            }
+        });
+    }
+}
index 5b1c554..3ff58f0 100644 (file)
@@ -5,10 +5,8 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.remote.rpc;
 
-
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
@@ -17,35 +15,42 @@ import java.util.List;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RpcListener implements DOMRpcAvailabilityListener {
-
+/**
+ * A {@link DOMRpcAvailabilityListener} reacting to RPC implementations different than {@link RemoteRpcImplementation}.
+ * The knowledge of such implementations is forwarded to {@link RpcRegistry}, which is responsible for advertising
+ * their presence to other nodes.
+ */
+final class RpcListener implements DOMRpcAvailabilityListener {
     private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
+
     private final ActorRef rpcRegistry;
 
-    public RpcListener(final ActorRef rpcRegistry) {
-        this.rpcRegistry = rpcRegistry;
+    RpcListener(final ActorRef rpcRegistry) {
+        this.rpcRegistry = Preconditions.checkNotNull(rpcRegistry);
     }
 
     @Override
     public void onRpcAvailable(@Nonnull final Collection<DOMRpcIdentifier> rpcs) {
         Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Adding registration for [{}]", rpcs);
-        }
-        final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
+        LOG.debug("Adding registration for [{}]", rpcs);
+
+        final List<RouteIdentifier<?,?,?>> routeIds = new ArrayList<>(rpcs.size());
 
         for (final DOMRpcIdentifier rpc : rpcs) {
-            final RpcRouter.RouteIdentifier<?,?,?> routeId =
+            // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly.
+            final RouteIdentifier<?,?,?> routeId =
                     new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
             routeIds.add(routeId);
         }
-        final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds);
-        rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
+        rpcRegistry.tell(new AddOrUpdateRoutes(routeIds), ActorRef.noSender());
     }
 
     @Override
@@ -54,13 +59,15 @@ public class RpcListener implements DOMRpcAvailabilityListener {
 
         LOG.debug("Removing registration for [{}]", rpcs);
 
-        final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
+        final List<RouteIdentifier<?,?,?>> routeIds = new ArrayList<>(rpcs.size());
         for (final DOMRpcIdentifier rpc : rpcs) {
-            final RpcRouter.RouteIdentifier<?,?,?> routeId =
-                    new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
-            routeIds.add(routeId);
+            routeIds.add(new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference()));
         }
-        final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds);
-        rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
+        rpcRegistry.tell(new RemoveRoutes(routeIds), ActorRef.noSender());
+    }
+
+    @Override
+    public boolean acceptsImplementation(final DOMRpcImplementation impl) {
+        return !(impl instanceof RemoteRpcImplementation);
     }
 }
index 8e53bcb..6f0e3b8 100644 (file)
 
 package org.opendaylight.controller.remote.rpc;
 
-
 import akka.actor.ActorRef;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
-import akka.actor.SupervisorStrategy.Directive;
-import akka.japi.Function;
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
-import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import scala.concurrent.duration.Duration;
 
 /**
- * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also starts
- * the rpc listeners
+ * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
+ * {@link RpcListener} with the local {@link DOMRpcService}.
  */
-
 public class RpcManager extends AbstractUntypedActor {
-    private SchemaContext schemaContext;
-    private ActorRef rpcBroker;
-    private ActorRef rpcRegistry;
-    private final RemoteRpcProviderConfig config;
-    private RpcListener rpcListener;
-    private RemoteRpcImplementation rpcImplementation;
     private final DOMRpcProviderService rpcProvisionRegistry;
+    private final RemoteRpcProviderConfig config;
     private final DOMRpcService rpcServices;
 
-    private RpcManager(final SchemaContext schemaContext,
-                       final DOMRpcProviderService rpcProvisionRegistry,
-                       final DOMRpcService rpcSevices,
-                       final RemoteRpcProviderConfig config) {
-        this.schemaContext = schemaContext;
-        this.rpcProvisionRegistry = rpcProvisionRegistry;
-        rpcServices = rpcSevices;
-        this.config = config;
+    private ListenerRegistration<RpcListener> listenerReg;
+    private ActorRef rpcInvoker;
+    private ActorRef rpcRegistry;
+    private ActorRef rpcRegistrar;
 
-        createRpcActors();
-        startListeners();
+    private RpcManager(final DOMRpcProviderService rpcProvisionRegistry,
+                       final DOMRpcService rpcServices,
+                       final RemoteRpcProviderConfig config) {
+        this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
+        this.rpcServices = Preconditions.checkNotNull(rpcServices);
+        this.config = Preconditions.checkNotNull(config);
     }
 
-
-    public static Props props(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry,
-            final DOMRpcService rpcServices, final RemoteRpcProviderConfig config) {
-        Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
+    public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+            final RemoteRpcProviderConfig config) {
         Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
         Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
-        return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
-    }
-
-    private void createRpcActors() {
-        LOG.debug("Create rpc registry and broker actors");
-
-        rpcRegistry = getContext().actorOf(RpcRegistry.props(config)
-                .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
-
-        rpcBroker = getContext().actorOf(RpcBroker.props(rpcServices)
-                .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
-
-        final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
-        rpcRegistry.tell(localRouter, self());
+        Preconditions.checkNotNull(config, "RemoteRpcProviderConfig can not be null!");
+        return Props.create(RpcManager.class, rpcProvisionRegistry, rpcServices, config);
     }
 
-    private void startListeners() {
-        LOG.debug("Registers rpc listeners");
+    @Override
+    public void preStart() throws Exception {
+        super.preStart();
 
-        rpcListener = new RpcListener(rpcRegistry);
-        rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
+        rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
+            .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+        LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
 
-        rpcServices.registerRpcListener(rpcListener);
+        rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
+            .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
+        LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
 
-        registerRoutedRpcDelegate();
-        announceSupportedRpcs();
-    }
+        rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
+                .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+        LOG.debug("Propagating RPC information with {}", rpcRegistry);
 
-    private void registerRoutedRpcDelegate() {
-        final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
-        final Set<Module> modules = schemaContext.getModules();
-        for (final Module module : modules) {
-            for (final RpcDefinition rpcDefinition : module.getRpcs()) {
-                if (RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
-                    LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
-                    rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
-                }
-            }
-        }
-        rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
+        final RpcListener rpcListener = new RpcListener(rpcRegistry);
+        LOG.debug("Registering local availabitility listener {}", rpcListener);
+        listenerReg = rpcServices.registerRpcListener(rpcListener);
     }
 
-    /**
-     * Add all the locally registered RPCs in the clustered routing table.
-     */
-    private void announceSupportedRpcs() {
-        LOG.debug("Adding all supported rpcs to routing table");
-        final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
-        final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
-        for (final RpcDefinition rpcDef : currentlySupportedRpc) {
-            rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
+    @Override
+    public void postStop() throws Exception {
+        if (listenerReg != null) {
+            listenerReg.close();
+            listenerReg = null;
         }
 
-        if (!rpcs.isEmpty()) {
-            rpcListener.onRpcAvailable(rpcs);
-        }
+        super.postStop();
     }
 
-
     @Override
-    protected void handleReceive(final Object message) throws Exception {
-        if (message instanceof UpdateSchemaContext) {
-            updateSchemaContext((UpdateSchemaContext) message);
-        }
-    }
-
-    private void updateSchemaContext(final UpdateSchemaContext message) {
-        schemaContext = message.getSchemaContext();
-        registerRoutedRpcDelegate();
-        rpcBroker.tell(message, ActorRef.noSender());
+    protected void handleReceive(final Object message) {
+        unknownMessage(message);
     }
 
     @Override
     public SupervisorStrategy supervisorStrategy() {
-        return new OneForOneStrategy(10, Duration.create("1 minute"), (Function<Throwable, Directive>) t -> {
+        return new OneForOneStrategy(10, Duration.create("1 minute"), t -> {
             LOG.error("An exception happened actor will be resumed", t);
-
             return SupervisorStrategy.resume();
         });
     }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcRegistrar.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcRegistrar.java
new file mode 100644 (file)
index 0000000..6499974
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.Address;
+import akka.actor.Props;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
+
+/**
+ * Actor handling registration of RPCs available on remote nodes with the local {@link DOMRpcProviderService}.
+ *
+ * @author Robert Varga
+ */
+final class RpcRegistrar extends AbstractUntypedActor {
+    private final Map<Address, DOMRpcImplementationRegistration<?>> regs = new HashMap<>();
+    private final DOMRpcProviderService rpcProviderService;
+    private final RemoteRpcProviderConfig config;
+
+    RpcRegistrar(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) {
+        this.config = Preconditions.checkNotNull(config);
+        this.rpcProviderService = Preconditions.checkNotNull(rpcProviderService);
+    }
+
+    public static Props props(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) {
+        Preconditions.checkNotNull(rpcProviderService, "DOMRpcProviderService cannot be null");
+        return Props.create(RpcRegistrar.class, config, rpcProviderService);
+    }
+
+    @Override
+    public void postStop() throws Exception {
+        regs.values().forEach(DOMRpcImplementationRegistration::close);
+        regs.clear();
+
+        super.postStop();
+    }
+
+    @Override
+    protected void handleReceive(final Object message) throws Exception {
+        if (message instanceof UpdateRemoteEndpoints) {
+            updateRemoteEndpoints(((UpdateRemoteEndpoints) message).getEndpoints());
+        } else {
+            unknownMessage(message);
+        }
+    }
+
+    private void updateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
+        /*
+         * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close
+         * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
+         * unavailability which would occur if we were to do it the other way around.
+         *
+         * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
+         * hence we register all new implementations before closing all registrations.
+         */
+        final Collection<DOMRpcImplementationRegistration<?>> prevRegs = new ArrayList<>(endpoints.size());
+
+        for (Entry<Address, Optional<RemoteRpcEndpoint>> e : endpoints.entrySet()) {
+            LOG.debug("Updating RPC registrations for {}", e.getKey());
+
+            final DOMRpcImplementationRegistration<?> prevReg;
+            final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
+            if (maybeEndpoint.isPresent()) {
+                final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
+                final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
+                prevReg = regs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
+                    endpoint.getRpcs()));
+            } else {
+                prevReg = regs.remove(e.getKey());
+            }
+
+            if (prevReg != null) {
+                prevRegs.add(prevReg);
+            }
+        }
+
+        for (DOMRpcImplementationRegistration<?> r : prevRegs) {
+            r.close();
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/UpdateSchemaContext.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/UpdateSchemaContext.java
deleted file mode 100644 (file)
index 68302c5..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc.messages;
-
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
-public class UpdateSchemaContext {
-    private final SchemaContext schemaContext;
-
-    public UpdateSchemaContext(final SchemaContext schemaContext) {
-        this.schemaContext = schemaContext;
-    }
-
-    public SchemaContext getSchemaContext() {
-        return schemaContext;
-    }
-}
index 09a987f..0c68b55 100644 (file)
@@ -8,53 +8,48 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.japi.Option;
-import akka.japi.Pair;
+import com.google.common.base.Preconditions;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 
 public class RoutingTable implements Copier<RoutingTable>, Serializable {
     private static final long serialVersionUID = 5592610415175278760L;
 
-    private final Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
-    private ActorRef router;
+    private final Map<RouteIdentifier<?, ?, ?>, Long> table;
+    private final ActorRef router;
 
-    @Override
-    public RoutingTable copy() {
-        RoutingTable copy = new RoutingTable();
-        copy.table.putAll(table);
-        copy.setRouter(this.getRouter());
-
-        return copy;
+    private RoutingTable(final ActorRef router, final Map<RouteIdentifier<?, ?, ?>, Long> table) {
+        this.router = Preconditions.checkNotNull(router);
+        this.table = Preconditions.checkNotNull(table);
     }
 
-    public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-        Long updatedTime = table.get(routeId);
+    RoutingTable(final ActorRef router) {
+        this(router, new HashMap<>());
+    }
 
-        if (updatedTime == null || router == null) {
-            return Option.none();
-        } else {
-            return Option.option(new Pair<>(router, updatedTime));
-        }
+    @Override
+    public RoutingTable copy() {
+        return new RoutingTable(router, new HashMap<>(table));
     }
 
     public Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRoutes() {
         return table.keySet();
     }
 
-    public void addRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+    public void addRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
         table.put(routeId, System.currentTimeMillis());
     }
 
-    public void removeRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+    public void removeRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
         table.remove(routeId);
     }
 
-    public boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+    public boolean contains(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
         return table.containsKey(routeId);
     }
 
@@ -70,10 +65,6 @@ public class RoutingTable implements Copier<RoutingTable>, Serializable {
         return router;
     }
 
-    public void setRouter(ActorRef router) {
-        this.router = router;
-    }
-
     @Override
     public String toString() {
         return "RoutingTable{" + "table=" + table + ", router=" + router + '}';
index fc5b618..1545eb0 100644 (file)
@@ -8,80 +8,72 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.actor.Cancellable;
+import akka.actor.Address;
 import akka.actor.Props;
-import akka.japi.Option;
-import akka.japi.Pair;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-import scala.concurrent.duration.FiniteDuration;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 /**
- * Registry to look up cluster nodes that have registered for a given rpc.
+ * Registry to look up cluster nodes that have registered for a given RPC.
  *
  * <p>
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
  */
 public class RpcRegistry extends BucketStore<RoutingTable> {
-    private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
-    private final FiniteDuration findRouterTimeout;
+    private final ActorRef rpcRegistrar;
 
-    public RpcRegistry(RemoteRpcProviderConfig config) {
-        super(config);
-        getLocalBucket().setData(new RoutingTable());
-        findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
+    public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+        super(config, new RoutingTable(rpcInvoker));
+        this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
     }
 
-    public static Props props(RemoteRpcProviderConfig config) {
-        return Props.create(RpcRegistry.class, config);
+    /**
+     * Create a new props instance for instantiating an RpcRegistry actor.
+     *
+     * @param config Provider configuration
+     * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+     * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+     * @return A new {@link Props} instance
+     */
+    public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
+            final ActorRef rpcRegistrar) {
+        return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
     }
 
     @Override
-    protected void handleReceive(Object message) throws Exception {
-        //TODO: if sender is remote, reject message
-
-        if (message instanceof SetLocalRouter) {
-            receiveSetLocalRouter((SetLocalRouter) message);
-        } else if (message instanceof AddOrUpdateRoutes) {
+    protected void handleReceive(final Object message) throws Exception {
+        if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
         } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
-        } else if (message instanceof Messages.FindRouters) {
-            receiveGetRouter((FindRouters) message);
-        } else if (message instanceof Runnable) {
-            ((Runnable)message).run();
         } else {
             super.handleReceive(message);
         }
     }
 
-    /**
-     * Registers a rpc broker.
-     *
-     * @param message contains {@link akka.actor.ActorRef} for rpc broker
-     */
-    private void receiveSetLocalRouter(SetLocalRouter message) {
-        getLocalBucket().getData().setRouter(message.getRouter());
-    }
-
-    private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
-        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+    private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+        LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
 
         RoutingTable table = getLocalBucket().getData().copy();
         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
@@ -89,8 +81,6 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
 
         updateLocalBucket(table);
-
-        onBucketsUpdated();
     }
 
     /**
@@ -98,8 +88,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      *
      * @param msg contains list of route ids to remove
      */
-    private void receiveRemoveRoutes(RemoveRoutes msg) {
-
+    private void receiveRemoveRoutes(final RemoveRoutes msg) {
         RoutingTable table = getLocalBucket().getData().copy();
         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
             table.removeRoute(routeId);
@@ -108,85 +97,52 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         updateLocalBucket(table);
     }
 
-    /**
-     * Finds routers for the given rpc.
-     *
-     * @param findRouters the FindRouters request
-     */
-    private void receiveGetRouter(final FindRouters findRouters) {
-        log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
-
-        final ActorRef sender = getSender();
-        if (!findRouters(findRouters, sender)) {
-            log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
-                    findRouterTimeout.toMillis());
-
-            final AtomicReference<Cancellable> timer = new AtomicReference<>();
-            final Runnable routesUpdatedRunnable = new Runnable() {
-                @Override
-                public void run() {
-                    if (findRouters(findRouters, sender)) {
-                        routesUpdatedCallbacks.remove(this);
-                        timer.get().cancel();
-                    }
-                }
-            };
-
-            routesUpdatedCallbacks.add(routesUpdatedRunnable);
-
-            Runnable timerRunnable = () -> {
-                log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
-
-                routesUpdatedCallbacks.remove(routesUpdatedRunnable);
-                sender.tell(new Messages.FindRoutersReply(
-                        Collections.<Pair<ActorRef, Long>>emptyList()), self());
-            };
-
-            timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
-                    getContext().dispatcher(), self()));
-        }
+    @Override
+    protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
+        rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
     }
 
-    private boolean findRouters(FindRouters findRouters, ActorRef sender) {
-        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-
-        RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
-        findRoutes(getLocalBucket().getData(), routeId, routers);
+    @Override
+    protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
+
+        for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+            final RoutingTable table = e.getValue().getData();
+
+            final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
+            for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
+                if (ri instanceof RouteIdentifierImpl) {
+                    final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
+                    rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
+                } else {
+                    LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
+                }
+            }
 
-        for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
-            findRoutes(bucket.getData(), routeId, routers);
+            endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+                    : Optional.of(new RemoteRpcEndpoint(table.getRouter(), rpcs)));
         }
 
-        log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
-
-        boolean foundRouters = !routers.isEmpty();
-        if (foundRouters) {
-            sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+        if (!endpoints.isEmpty()) {
+            rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
         }
-
-        return foundRouters;
     }
 
-    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
-            List<Pair<ActorRef, Long>> routers) {
-        if (table == null) {
-            return;
-        }
+    public static final class RemoteRpcEndpoint {
+        private final Set<DOMRpcIdentifier> rpcs;
+        private final ActorRef router;
 
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
-        if (!routerWithUpdateTime.isEmpty()) {
-            routers.add(routerWithUpdateTime.get());
+        RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+            this.router = Preconditions.checkNotNull(router);
+            this.rpcs = ImmutableSet.copyOf(rpcs);
         }
-    }
 
-    @Override
-    protected void onBucketsUpdated() {
-        if (routesUpdatedCallbacks.isEmpty()) {
-            return;
+        public ActorRef getRouter() {
+            return router;
         }
 
-        for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
-            callBack.run();
+        public Set<DOMRpcIdentifier> getRpcs() {
+            return rpcs;
         }
     }
 
@@ -194,18 +150,16 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      * All messages used by the RpcRegistry.
      */
     public static class Messages {
-
-
-        public static class ContainsRoute {
+        abstract static class AbstractRouteMessage {
             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
 
-            public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+            AbstractRouteMessage(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
                         "Route Identifiers must be supplied");
                 this.routeIdentifiers = routeIdentifiers;
             }
 
-            public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+            List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
                 return this.routeIdentifiers;
             }
 
@@ -215,71 +169,27 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
             }
         }
 
-        public static class AddOrUpdateRoutes extends ContainsRoute {
-
-            public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+        public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
+            public AddOrUpdateRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
                 super(routeIdentifiers);
             }
         }
 
-        public static class RemoveRoutes extends ContainsRoute {
-
-            public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+        public static final class RemoveRoutes extends AbstractRouteMessage {
+            public RemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
                 super(routeIdentifiers);
             }
         }
 
-        public static class SetLocalRouter {
-            private final ActorRef router;
-
-            public SetLocalRouter(ActorRef router) {
-                Preconditions.checkArgument(router != null, "Router must not be null");
-                this.router = router;
-            }
-
-            public ActorRef getRouter() {
-                return this.router;
-            }
-
-            @Override
-            public String toString() {
-                return "SetLocalRouter{" + "router=" + router + '}';
-            }
-        }
-
-        public static class FindRouters {
-            private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
-
-            public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
-                this.routeIdentifier = routeIdentifier;
-            }
-
-            public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
-                return routeIdentifier;
-            }
+        public static final class UpdateRemoteEndpoints {
+            private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
 
-            @Override
-            public String toString() {
-                return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
+            UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
+                this.endpoints = ImmutableMap.copyOf(endpoints);
             }
-        }
 
-        public static class FindRoutersReply {
-            final List<Pair<ActorRef, Long>> routerWithUpdateTime;
-
-            public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
-                Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
-                this.routerWithUpdateTime = routerWithUpdateTime;
-            }
-
-            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
-                return routerWithUpdateTime;
-            }
-
-            @Override
-            public String toString() {
-                return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
+            public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
+                return endpoints;
             }
         }
     }
index 444faed..0031962 100644 (file)
@@ -9,33 +9,30 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import java.io.Serializable;
 
-public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
+public final class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
     private static final long serialVersionUID = 294779770032719196L;
 
     private Long version = System.currentTimeMillis();
 
     private T data;
 
-    public BucketImpl() {
-    }
-
-    public BucketImpl(T data) {
+    public BucketImpl(final T data) {
         this.data = data;
     }
 
-    public BucketImpl(Bucket<T> other) {
+    public BucketImpl(final Bucket<T> other) {
         this.version = other.getVersion();
         this.data = other.getData();
     }
 
-    public void setData(T data) {
+    public void setData(final T data) {
         this.data = data;
         this.version = System.currentTimeMillis() + 1;
     }
 
     @Override
-    public Long getVersion() {
-        return version;
+    public long getVersion() {
+        return version.longValue();
     }
 
     @Override
index 81e6a9c..7ae091d 100644 (file)
@@ -11,11 +11,11 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.Address;
-import akka.actor.Props;
 import akka.cluster.ClusterActorRefProvider;
 import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
@@ -25,10 +25,9 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 import org.opendaylight.controller.utils.ConditionalProbe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -41,18 +40,13 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
-
-    private static final Long NO_VERSION = -1L;
-
-    protected final Logger log = LoggerFactory.getLogger(getClass());
-
     /**
      * Bucket owned by the node.
      */
-    private final BucketImpl<T> localBucket = new BucketImpl<>();
+    private final BucketImpl<T> localBucket;
 
     /**
-     * Buckets ownded by other known nodes in the cluster.
+     * Buckets owned by other known nodes in the cluster.
      */
     private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
 
@@ -66,12 +60,14 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
      */
     private Address selfAddress;
 
+    // FIXME: should be part of test-specific subclass
     private ConditionalProbe probe;
 
     private final RemoteRpcProviderConfig config;
 
-    public BucketStore(RemoteRpcProviderConfig config) {
+    public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
         this.config = Preconditions.checkNotNull(config);
+        this.localBucket = new BucketImpl<>(initialData);
     }
 
     @Override
@@ -79,34 +75,37 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
-        if ( provider instanceof ClusterActorRefProvider) {
-            getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper");
+        if (provider instanceof ClusterActorRefProvider) {
+            getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
         }
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    protected void handleReceive(Object message) throws Exception {
+    protected void handleReceive(final Object message) throws Exception {
         if (probe != null) {
             probe.tell(message, getSelf());
         }
 
-        if (message instanceof ConditionalProbe) {
-            // The ConditionalProbe is only used for unit tests.
-            log.info("Received probe {} {}", getSelf(), message);
-            probe = (ConditionalProbe) message;
-            // Send back any message to tell the caller we got the probe.
-            getSender().tell("Got it", getSelf());
-        } else if (message instanceof GetAllBuckets) {
-            receiveGetAllBuckets();
-        } else if (message instanceof GetBucketsByMembers) {
+        if (message instanceof GetBucketsByMembers) {
             receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
         } else if (message instanceof GetBucketVersions) {
             receiveGetBucketVersions();
         } else if (message instanceof UpdateRemoteBuckets) {
             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
+        } else if (message instanceof RemoveRemoteBucket) {
+            removeBucket(((RemoveRemoteBucket) message).getAddress());
+        } else if (message instanceof GetAllBuckets) {
+            // GetAllBuckets is used only for unit tests.
+            receiveGetAllBuckets();
+        } else if (message instanceof ConditionalProbe) {
+            // The ConditionalProbe is only used for unit tests.
+            LOG.info("Received probe {} {}", getSelf(), message);
+            probe = (ConditionalProbe) message;
+            // Send back any message to tell the caller we got the probe.
+            getSender().tell("Got it", getSelf());
         } else {
-            log.debug("Unhandled message [{}]", message);
+            LOG.debug("Unhandled message [{}]", message);
             unhandled(message);
         }
     }
@@ -145,7 +144,7 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
      *
      * @param members requested members
      */
-    void receiveGetBucketsByMembers(Set<Address> members) {
+    void receiveGetBucketsByMembers(final Set<Address> members) {
         final ActorRef sender = getSender();
         Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
         sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
@@ -157,7 +156,7 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
      * @param members requested members
      * @return buckets for requested members
      */
-    Map<Address, Bucket<T>> getBucketsByMembers(Set<Address> members) {
+    Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
         Map<Address, Bucket<T>> buckets = new HashMap<>();
 
         //first add the local bucket if asked
@@ -190,53 +189,77 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
      * @param receivedBuckets buckets sent by remote
      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
      */
-    void receiveUpdateRemoteBuckets(Map<Address, Bucket<T>> receivedBuckets) {
-        log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
+    void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
+        LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
-            return; //nothing to do
+            //nothing to do
+            return;
         }
 
-        //Remote cant update self's bucket
-        receivedBuckets.remove(selfAddress);
-
-        for (Map.Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
-
-            Long localVersion = versions.get(entry.getKey());
-            if (localVersion == null) {
-                localVersion = NO_VERSION;
+        final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
+        for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
+            if (selfAddress.equals(entry.getKey())) {
+                // Remote cannot update our bucket
+                continue;
             }
 
-            Bucket<T> receivedBucket = entry.getValue();
-
+            final Bucket<T> receivedBucket = entry.getValue();
             if (receivedBucket == null) {
+                LOG.debug("Ignoring null bucket from {}", entry.getKey());
                 continue;
             }
 
-            Long remoteVersion = receivedBucket.getVersion();
-            if (remoteVersion == null) {
-                remoteVersion = NO_VERSION;
+            // update only if remote version is newer
+            final long remoteVersion = receivedBucket.getVersion();
+            final Long localVersion = versions.get(entry.getKey());
+            if (localVersion != null && remoteVersion <= localVersion.longValue()) {
+                LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion,
+                    remoteVersion);
+                continue;
             }
 
-            //update only if remote version is newer
-            if ( remoteVersion.longValue() > localVersion.longValue() ) {
-                remoteBuckets.put(entry.getKey(), receivedBucket);
-                versions.put(entry.getKey(), remoteVersion);
-            }
+            newBuckets.put(entry.getKey(), receivedBucket);
+            remoteBuckets.put(entry.getKey(), receivedBucket);
+            versions.put(entry.getKey(), remoteVersion);
+            LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
         }
 
-        log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+        LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+
+        onBucketsUpdated(newBuckets);
+    }
+
+    private void removeBucket(final Address address) {
+        final Bucket<T> bucket = remoteBuckets.remove(address);
+        if (bucket != null) {
+            onBucketRemoved(address, bucket);
+        }
+    }
 
-        onBucketsUpdated();
+    /**
+     * Callback to subclasses invoked when a bucket is removed.
+     *
+     * @param address Remote address
+     * @param bucket Bucket removed
+     */
+    protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
+        // Default noop
     }
 
-    protected void onBucketsUpdated() {
+    /**
+     * Callback to subclasses invoked when the set of remote buckets is updated.
+     *
+     * @param newBuckets Map of address to new bucket. Never null, but can be empty.
+     */
+    protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
+        // Default noop
     }
 
     public BucketImpl<T> getLocalBucket() {
         return localBucket;
     }
 
-    protected void updateLocalBucket(T data) {
+    protected void updateLocalBucket(final T data) {
         localBucket.setData(data);
         versions.put(selfAddress, localBucket.getVersion());
     }
index 04c767e..db50041 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRefProvider;
 import akka.actor.ActorSelection;
 import akka.actor.Address;
 import akka.actor.Cancellable;
+import akka.actor.Props;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
@@ -21,6 +22,7 @@ import akka.pattern.Patterns;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -33,12 +35,11 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -61,41 +62,38 @@ import scala.concurrent.duration.FiniteDuration;
  * for update.
  */
 public class Gossiper extends AbstractUntypedActorWithMetering {
+    private final boolean autoStartGossipTicks;
+    private final RemoteRpcProviderConfig config;
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    private Cluster cluster;
+    /**
+     * All known cluster members.
+     */
+    private final List<Address> clusterMembers = new ArrayList<>();
 
     /**
      * ActorSystem's address for the current cluster node.
      */
     private Address selfAddress;
 
-    /**
-     * All known cluster members.
-     */
-    private List<Address> clusterMembers = new ArrayList<>();
+    private Cluster cluster;
 
     private Cancellable gossipTask;
 
-    private Boolean autoStartGossipTicks = true;
+    Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+        this.config = Preconditions.checkNotNull(config);
+        this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
+    }
 
-    private final RemoteRpcProviderConfig config;
+    Gossiper(final RemoteRpcProviderConfig config) {
+        this(config, Boolean.TRUE);
+    }
 
-    public Gossiper(RemoteRpcProviderConfig config) {
-        this.config = Preconditions.checkNotNull(config);
+    public static Props props(final RemoteRpcProviderConfig config) {
+        return Props.create(Gossiper.class, config);
     }
 
-    /**
-     * Constructor for testing.
-     *
-     * @param autoStartGossipTicks used for turning off gossip ticks during testing.
-     *                             Gossip tick can be manually sent.
-     */
-    @VisibleForTesting
-    public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
-        this(config);
-        this.autoStartGossipTicks = autoStartGossipTicks;
+    static Props testProps(final RemoteRpcProviderConfig config) {
+        return Props.create(Gossiper.class, config, Boolean.FALSE);
     }
 
     @Override
@@ -103,7 +101,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
-        if ( provider instanceof ClusterActorRefProvider ) {
+        if (provider instanceof ClusterActorRefProvider ) {
             cluster = Cluster.get(getContext().system());
             cluster.subscribe(getSelf(),
                     ClusterEvent.initialStateAsEvents(),
@@ -116,10 +114,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             gossipTask = getContext().system().scheduler().schedule(
                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
                     config.getGossipTickInterval(),                 //interval
-                    getSelf(),                                       //target
-                    new Messages.GossiperMessages.GossipTick(),      //message
-                    getContext().dispatcher(),                       //execution context
-                    getSelf()                                        //sender
+                    getSelf(),                                      //target
+                    new Messages.GossiperMessages.GossipTick(),     //message
+                    getContext().dispatcher(),                      //execution context
+                    getSelf()                                       //sender
             );
         }
     }
@@ -136,7 +134,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
-    protected void handleReceive(Object message) throws Exception {
+    protected void handleReceive(final Object message) throws Exception {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
         if (message instanceof GossipTick) {
@@ -158,7 +156,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         } else if (message instanceof ClusterEvent.MemberRemoved) {
             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
 
-        } else if ( message instanceof ClusterEvent.UnreachableMember) {
+        } else if (message instanceof ClusterEvent.UnreachableMember) {
             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
 
         } else {
@@ -171,7 +169,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param member who went down
      */
-    void receiveMemberRemoveOrUnreachable(Member member) {
+    private void receiveMemberRemoveOrUnreachable(final Member member) {
         //if its self, then stop itself
         if (selfAddress.equals(member.address())) {
             getContext().stop(getSelf());
@@ -179,7 +177,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         }
 
         clusterMembers.remove(member.address());
-        log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+        LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+
+        getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
     }
 
     /**
@@ -187,10 +187,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param member the member to add
      */
-    void receiveMemberUpOrReachable(final Member member) {
-
+    private void receiveMemberUpOrReachable(final Member member) {
+        //ignore up notification for self
         if (selfAddress.equals(member.address())) {
-            //ignore up notification for self
             return;
         }
 
@@ -198,7 +197,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             clusterMembers.add(member.address());
         }
 
-        log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+        LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
     }
 
     /**
@@ -208,21 +207,23 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
      */
+    @VisibleForTesting
     void receiveGossipTick() {
-        if (clusterMembers.size() == 0) {
-            return; //no members to send gossip status to
-        }
-
-        Address remoteMemberToGossipTo;
-
-        if (clusterMembers.size() == 1) {
-            remoteMemberToGossipTo = clusterMembers.get(0);
-        } else {
-            Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
-            remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+        final Address remoteMemberToGossipTo;
+        switch (clusterMembers.size()) {
+            case 0:
+                //no members to send gossip status to
+                return;
+            case 1:
+                remoteMemberToGossipTo = clusterMembers.get(0);
+                break;
+            default:
+                final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+                remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+                break;
         }
 
-        log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
+        LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
         getLocalStatusAndSendTo(remoteMemberToGossipTo);
     }
 
@@ -239,7 +240,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param status bucket versions from a remote member
      */
-    void receiveGossipStatus(GossipStatus status) {
+    @VisibleForTesting
+    void receiveGossipStatus(final GossipStatus status) {
         //Don't accept messages from non-members
         if (!clusterMembers.contains(status.from())) {
             return;
@@ -250,7 +252,6 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
-
     }
 
     /**
@@ -258,18 +259,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param envelope contains buckets from a remote gossiper
      */
-    <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
+    @VisibleForTesting
+    <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
         //TODO: Add more validations
         if (!selfAddress.equals(envelope.to())) {
-            if (log.isTraceEnabled()) {
-                log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
-                        envelope.to());
-            }
+            LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
             return;
         }
 
         updateRemoteBuckets(envelope.getBuckets());
-
     }
 
     /**
@@ -277,9 +275,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param buckets map of Buckets to update
      */
-    <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
-        UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
-        getContext().parent().tell(updateRemoteBuckets, getSelf());
+    @VisibleForTesting
+    <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
+        getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
     }
 
     /**
@@ -300,7 +298,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      *
      * @param remoteActorSystemAddress remote gossiper to send to
      */
-    void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
+    @VisibleForTesting
+    void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
 
         //Get local status from bucket store and send to remote
         Future<Object> futureReply =
@@ -310,10 +309,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorSelection remoteRef = getContext().system().actorSelection(
                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
 
-        log.trace("Sending bucket versions to [{}]", remoteRef);
+        LOG.trace("Sending bucket versions to [{}]", remoteRef);
 
         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
-
     }
 
     /**
@@ -322,13 +320,13 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param remote        remote gossiper to send versions to
      * @param localVersions bucket versions received from local store
      */
-    void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
+    void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
 
         GossipStatus status = new GossipStatus(selfAddress, localVersions);
         remote.tell(status, getSelf());
     }
 
-    void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
+    void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
 
         GossipStatus status = new GossipStatus(selfAddress, localVersions);
         remote.tell(status, getSelf());
@@ -342,7 +340,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
         return new Mapper<Object, Void>() {
             @Override
-            public Void apply(Object replyMessage) {
+            public Void apply(final Object replyMessage) {
                 if (replyMessage instanceof GetBucketVersionsReply) {
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
@@ -379,7 +377,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
         return new Mapper<Object, Void>() {
             @Override
-            public Void apply(Object replyMessage) {
+            public Void apply(final Object replyMessage) {
                 if (replyMessage instanceof GetBucketVersionsReply) {
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
@@ -400,7 +398,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                         Long remoteVersion = entry.getValue();
                         Long localVersion = localVersions.get(address);
                         if (localVersion == null || remoteVersion == null) {
-                            continue; //this condition is taken care of by above diffs
+                            //this condition is taken care of by above diffs
+                            continue;
                         }
 
                         if (localVersion < remoteVersion) {
@@ -411,13 +410,13 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                     }
 
                     if (!localIsOlder.isEmpty()) {
-                        sendGossipStatusTo(sender, localVersions );
+                        sendGossipStatusTo(sender, localVersions);
                     }
 
                     if (!localIsNewer.isEmpty()) {
-                        sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+                        //send newer buckets to remote
+                        sendGossipTo(sender, localIsNewer);
                     }
-
                 }
                 return null;
             }
@@ -441,10 +440,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         return new Mapper<Object, Void>() {
             @SuppressWarnings({ "rawtypes", "unchecked" })
             @Override
-            public Void apply(Object msg) {
+            public Void apply(final Object msg) {
                 if (msg instanceof GetBucketsByMembersReply) {
                     Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
-                    log.trace("Buckets to send from {}: {}", selfAddress, buckets);
+                    LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
                     sender.tell(envelope, getSelf());
                 }
@@ -456,19 +455,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     ///
     ///Getter Setters
     ///
-    List<Address> getClusterMembers() {
-        return clusterMembers;
-    }
 
-    void setClusterMembers(List<Address> clusterMembers) {
-        this.clusterMembers = clusterMembers;
-    }
-
-    Address getSelfAddress() {
-        return selfAddress;
-    }
-
-    void setSelfAddress(Address selfAddress) {
-        this.selfAddress = selfAddress;
+    @VisibleForTesting
+    void setClusterMembers(final Address... members) {
+        clusterMembers.clear();
+        clusterMembers.addAll(Arrays.asList(members));
     }
 }
index 4f18584..cc64ba4 100644 (file)
@@ -9,16 +9,14 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import akka.actor.Address;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
 
-
 /**
  * These messages are used by {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} and
  * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} actors.
@@ -27,21 +25,21 @@ public class Messages {
 
     public static class BucketStoreMessages {
 
-        public static class GetAllBuckets implements Serializable {
+        public static final class GetAllBuckets implements Serializable {
             private static final long serialVersionUID = 1L;
         }
 
-        public static class GetBucketsByMembers implements Serializable {
+        public static final class GetBucketsByMembers implements Serializable {
             private static final long serialVersionUID = 1L;
             private final Set<Address> members;
 
-            public GetBucketsByMembers(Set<Address> members) {
+            public GetBucketsByMembers(final Set<Address> members) {
                 Preconditions.checkArgument(members != null, "members can not be null");
-                this.members = members;
+                this.members = ImmutableSet.copyOf(members);
             }
 
             public Set<Address> getMembers() {
-                return new HashSet<>(members);
+                return members;
             }
         }
 
@@ -50,42 +48,33 @@ public class Messages {
 
             private final Map<Address, Bucket<T>> buckets;
 
-            public ContainsBuckets(Map<Address, Bucket<T>> buckets) {
+            protected ContainsBuckets(final Map<Address, Bucket<T>> buckets) {
                 Preconditions.checkArgument(buckets != null, "buckets can not be null");
-                this.buckets = buckets;
+                this.buckets = ImmutableMap.copyOf(buckets);
             }
 
-            public Map<Address, Bucket<T>> getBuckets() {
-                Map<Address, Bucket<T>> copy = new HashMap<>(buckets.size());
-
-                for (Map.Entry<Address, Bucket<T>> entry : buckets.entrySet()) {
-                    //ignore null entries
-                    if ( entry.getKey() == null || entry.getValue() == null ) {
-                        continue;
-                    }
-                    copy.put(entry.getKey(), entry.getValue());
-                }
-                return copy;
+            public final Map<Address, Bucket<T>> getBuckets() {
+                return buckets;
             }
         }
 
-        public static class GetAllBucketsReply<T extends Copier<T>> extends ContainsBuckets<T> {
+        public static final class GetAllBucketsReply<T extends Copier<T>> extends ContainsBuckets<T> {
             private static final long serialVersionUID = 1L;
 
-            public GetAllBucketsReply(Map<Address, Bucket<T>> buckets) {
+            public GetAllBucketsReply(final Map<Address, Bucket<T>> buckets) {
                 super(buckets);
             }
         }
 
-        public static class GetBucketsByMembersReply<T extends Copier<T>> extends ContainsBuckets<T>  {
+        public static final class GetBucketsByMembersReply<T extends Copier<T>> extends ContainsBuckets<T>  {
             private static final long serialVersionUID = 1L;
 
-            public GetBucketsByMembersReply(Map<Address, Bucket<T>> buckets) {
+            public GetBucketsByMembersReply(final Map<Address, Bucket<T>> buckets) {
                 super(buckets);
             }
         }
 
-        public static class GetBucketVersions implements Serializable {
+        public static final class GetBucketVersions implements Serializable {
             private static final long serialVersionUID = 1L;
         }
 
@@ -94,33 +83,48 @@ public class Messages {
 
             Map<Address, Long> versions;
 
-            public ContainsBucketVersions(Map<Address, Long> versions) {
+            public ContainsBucketVersions(final Map<Address, Long> versions) {
                 Preconditions.checkArgument(versions != null, "versions can not be null or empty");
 
-                this.versions = versions;
+                this.versions = ImmutableMap.copyOf(versions);
             }
 
             public Map<Address, Long> getVersions() {
-                return Collections.unmodifiableMap(versions);
+                return versions;
             }
-
         }
 
-        public static class GetBucketVersionsReply extends ContainsBucketVersions {
+        public static final class GetBucketVersionsReply extends ContainsBucketVersions {
             private static final long serialVersionUID = 1L;
 
-            public GetBucketVersionsReply(Map<Address, Long> versions) {
+            public GetBucketVersionsReply(final Map<Address, Long> versions) {
                 super(versions);
             }
         }
 
-        public static class UpdateRemoteBuckets<T extends Copier<T>> extends ContainsBuckets<T> {
+        public static final class UpdateRemoteBuckets<T extends Copier<T>> extends ContainsBuckets<T> {
             private static final long serialVersionUID = 1L;
 
-            public UpdateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
+            public UpdateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
                 super(buckets);
             }
         }
+
+        /**
+         * Message sent from the gossiper to its parent, therefore not Serializable, requesting removal
+         * of a bucket corresponding to an address.
+         */
+        public static final class RemoveRemoteBucket {
+            private final Address address;
+
+            public RemoveRemoteBucket(final Address address) {
+                this.address = Preconditions.checkNotNull(address);
+            }
+
+            public Address getAddress() {
+                return address;
+            }
+        }
     }
 
     public static class GossiperMessages {
@@ -137,7 +141,7 @@ public class Messages {
 
             private final Address from;
 
-            public GossipStatus(Address from, Map<Address, Long> versions) {
+            public GossipStatus(final Address from, final Map<Address, Long> versions) {
                 super(versions);
                 this.from = from;
             }
@@ -153,7 +157,7 @@ public class Messages {
             private final Address from;
             private final Address to;
 
-            public GossipEnvelope(Address from, Address to, Map<Address, Bucket<T>> buckets) {
+            public GossipEnvelope(final Address from, final Address to, final Map<Address, Bucket<T>> buckets) {
                 super(buckets);
                 Preconditions.checkArgument(to != null, "Recipient of message must not be null");
                 this.to = to;
index 02dd857..438c742 100644 (file)
@@ -16,15 +16,13 @@ import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import java.io.InputStream;
 import java.net.URI;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.mockito.Mockito;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
@@ -70,15 +68,18 @@ public class AbstractRpcTest {
     static RemoteRpcProviderConfig config1;
     static RemoteRpcProviderConfig config2;
 
-    protected ActorRef rpcBroker1;
+    protected ActorRef rpcInvoker1;
     protected JavaTestKit rpcRegistry1Probe;
-    protected ActorRef rpcBroker2;
+    protected ActorRef rpcInvoker2;
     protected JavaTestKit rpcRegistry2Probe;
     protected Broker.ProviderSession brokerSession;
     protected SchemaContext schemaContext;
     protected RemoteRpcImplementation remoteRpcImpl1;
     protected RemoteRpcImplementation remoteRpcImpl2;
+
+    @Mock
     protected DOMRpcService domRpcService1;
+    @Mock
     protected DOMRpcService domRpcService2;
 
     @BeforeClass
@@ -98,25 +99,17 @@ public class AbstractRpcTest {
     }
 
     @Before
-    public void setUp() throws Exception {
-        final List<InputStream> sources = Collections.singletonList(
-            AbstractRpcTest.this.getClass().getResourceAsStream("/test-rpc.yang"));
-
-        try {
-            schemaContext = YangParserTestUtils.parseYangStreams(sources);
-        } catch (ReactorException e) {
-            throw new RuntimeException("Unable to build schema context from " + sources, e);
-        }
+    public void setUp() throws ReactorException {
+        schemaContext = YangParserTestUtils.parseYangResources(AbstractRpcTest.class, "/test-rpc.yang");
+
+        MockitoAnnotations.initMocks(this);
 
-        domRpcService1 = Mockito.mock(DOMRpcService.class);
-        domRpcService2 = Mockito.mock(DOMRpcService.class);
         rpcRegistry1Probe = new JavaTestKit(node1);
-        rpcBroker1 = node1.actorOf(RpcBroker.props(domRpcService1));
+        rpcInvoker1 = node1.actorOf(RpcInvoker.props(domRpcService1));
         rpcRegistry2Probe = new JavaTestKit(node2);
-        rpcBroker2 = node2.actorOf(RpcBroker.props(domRpcService2));
-        remoteRpcImpl1 = new RemoteRpcImplementation(rpcRegistry1Probe.getRef(), config1);
-        remoteRpcImpl2 = new RemoteRpcImplementation(rpcRegistry2Probe.getRef(), config2);
-
+        rpcInvoker2 = node2.actorOf(RpcInvoker.props(domRpcService2));
+        remoteRpcImpl1 = new RemoteRpcImplementation(rpcInvoker2, config1);
+        remoteRpcImpl2 = new RemoteRpcImplementation(rpcInvoker1, config2);
     }
 
     static void assertRpcErrorEquals(final RpcError rpcError, final ErrorSeverity severity,
index 05ee9f5..0fe9dfe 100644 (file)
@@ -11,30 +11,23 @@ package org.opendaylight.controller.remote.rpc;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
 
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.japi.Pair;
-import akka.testkit.JavaTestKit;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 /**
  * Unit tests for RemoteRpcImplementation.
@@ -43,19 +36,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  */
 public class RemoteRpcImplementationTest extends AbstractRpcTest {
 
-
-
-    @Test(expected = DOMRpcImplementationNotAvailableException.class)
-    public void testInvokeRpcWithNoRemoteActor() throws Exception {
-        final ContainerNode input = makeRPCInput("foo");
-        final CheckedFuture<DOMRpcResult, DOMRpcException> failedFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, input);
-        rpcRegistry1Probe.expectMsgClass(JavaTestKit.duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
-        rpcRegistry1Probe
-                .reply(new RpcRegistry.Messages.FindRoutersReply(Collections.<Pair<ActorRef, Long>>emptyList()));
-        failedFuture.checkedGet(5, TimeUnit.SECONDS);
-    }
-
-
     /**
      * This test method invokes and executes the remote rpc.
      */
@@ -75,13 +55,6 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
                 remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
         assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
-        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
-                rpcBroker2, 200L))));
 
         final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
         assertEquals(rpcOutput, result.getResult());
@@ -105,19 +78,11 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
                 remoteRpcImpl1.invokeRpc(TEST_RPC_ID, null);
         assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
-        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
-                rpcBroker2, 200L))));
 
         final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
         assertEquals(rpcOutput, result.getResult());
     }
 
-
     /**
      * This test method invokes and executes the remote rpc.
      */
@@ -137,19 +102,11 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
                 remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
         assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
-        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
-                rpcBroker2, 200L))));
 
         final DOMRpcResult result = frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
         assertNull(result.getResult());
     }
 
-
     /**
      * This test method invokes and executes the remote rpc.
      */
@@ -167,13 +124,6 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
                 remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
         assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-
-        rpcRegistry1Probe.reply(new RpcRegistry.Messages.FindRoutersReply(Arrays.asList(new Pair<>(
-                rpcBroker2, 200L))));
         frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
     }
 
@@ -188,10 +138,6 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
                 remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
         assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
 
         frontEndFuture.checkedGet(20, TimeUnit.SECONDS);
     }
@@ -203,25 +149,13 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
     @Test(expected = DOMRpcException.class)
     public void testInvokeRpcWithLookupException() throws Exception {
         final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
-        final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
-                remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
-        assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
-        final FindRouters findRouters = rpcRegistry1Probe.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-        final RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
-        assertEquals("getType", TEST_RPC, routeIdentifier.getType());
-        assertEquals("getRoute", TEST_PATH, routeIdentifier.getRoute());
-        rpcRegistry1Probe.reply( new Status.Failure(new RuntimeException("test")));
-        frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
-    }
 
-    /**
-     * This test method invokes and executes the remote rpc.
-     */
-    @Test(expected = DOMRpcImplementationNotAvailableException.class)
-    public void testInvokeRpcWithLoopException() throws Exception {
-        final NormalizedNode<?, ?> invokeRpcInput = RemoteRpcInput.from(makeRPCInput("foo"));
+        doThrow(new RuntimeException("test")).when(domRpcService2).invokeRpc(any(SchemaPath.class),
+            any(NormalizedNode.class));
+
         final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
                 remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+        assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
 
         frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
     }
index e8ea373..06781b4 100644 (file)
@@ -6,10 +6,8 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-
 package org.opendaylight.controller.remote.rpc;
 
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -25,8 +23,6 @@ import org.junit.Test;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.model.SchemaService;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.duration.Duration;
 
@@ -53,9 +49,6 @@ public class RemoteRpcProviderTest {
         try (final RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(DOMRpcProviderService.class),
                 new RemoteRpcProviderConfig(system.settings().config()))) {
             final Broker.ProviderSession session = mock(Broker.ProviderSession.class);
-            final SchemaService schemaService = mock(SchemaService.class);
-            when(schemaService.getGlobalContext()).thenReturn(mock(SchemaContext.class));
-            when(session.getService(SchemaService.class)).thenReturn(schemaService);
             when(session.getService(DOMRpcService.class)).thenReturn(mock(DOMRpcService.class));
 
             rpcProvider.onSessionInitiated(session);
index a553ab9..4fb0083 100644 (file)
@@ -42,7 +42,7 @@ public class RpcBrokerTest extends AbstractRpcTest {
 
                 final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
 
-                rpcBroker1.tell(executeMsg, getRef());
+                rpcInvoker1.tell(executeMsg, getRef());
 
                 final RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
 
@@ -61,7 +61,7 @@ public class RpcBrokerTest extends AbstractRpcTest {
 
                 final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
 
-                rpcBroker1.tell(executeMsg, getRef());
+                rpcInvoker1.tell(executeMsg, getRef());
 
                 final Failure rpcResponse = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
 
index a937093..3ceb5c0 100644 (file)
@@ -13,13 +13,11 @@ import static org.junit.Assert.fail;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
-import akka.actor.Props;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.cluster.Member;
 import akka.cluster.MemberStatus;
 import akka.cluster.UniqueAddress;
-import akka.japi.Pair;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
@@ -32,6 +30,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
@@ -44,10 +43,9 @@ import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
@@ -68,6 +66,12 @@ public class RpcRegistryTest {
     private static ActorSystem node2;
     private static ActorSystem node3;
 
+    private JavaTestKit invoker1;
+    private JavaTestKit invoker2;
+    private JavaTestKit invoker3;
+    private JavaTestKit registrar1;
+    private JavaTestKit registrar2;
+    private JavaTestKit registrar3;
     private ActorRef registry1;
     private ActorRef registry2;
     private ActorRef registry3;
@@ -92,7 +96,7 @@ public class RpcRegistryTest {
         waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
     }
 
-    static void waitForMembersUp(ActorSystem node, UniqueAddress... addresses) {
+    static void waitForMembersUp(final ActorSystem node, final UniqueAddress... addresses) {
         Set<UniqueAddress> otherMembersSet = Sets.newHashSet(addresses);
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
@@ -119,12 +123,18 @@ public class RpcRegistryTest {
 
     @Before
     public void setup() {
-        registry1 = node1.actorOf(Props.create(RpcRegistry.class, config(node1)));
-        registry2 = node2.actorOf(Props.create(RpcRegistry.class, config(node2)));
-        registry3 = node3.actorOf(Props.create(RpcRegistry.class, config(node3)));
+        invoker1 = new JavaTestKit(node1);
+        registrar1 = new JavaTestKit(node1);
+        registry1 = node1.actorOf(RpcRegistry.props(config(node1), invoker1.getRef(), registrar1.getRef()));
+        invoker2 = new JavaTestKit(node2);
+        registrar2 = new JavaTestKit(node2);
+        registry2 = node2.actorOf(RpcRegistry.props(config(node2), invoker2.getRef(), registrar2.getRef()));
+        invoker3 = new JavaTestKit(node3);
+        registrar3 = new JavaTestKit(node3);
+        registry3 = node3.actorOf(RpcRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef()));
     }
 
-    private RemoteRpcProviderConfig config(ActorSystem node) {
+    private static RemoteRpcProviderConfig config(final ActorSystem node) {
         return new RemoteRpcProviderConfig(node.settings().config());
     }
 
@@ -139,6 +149,26 @@ public class RpcRegistryTest {
         if (registry3 != null) {
             node3.stop(registry3);
         }
+
+        if (invoker1 != null) {
+            node1.stop(invoker1.getRef());
+        }
+        if (invoker2 != null) {
+            node2.stop(invoker2.getRef());
+        }
+        if (invoker3 != null) {
+            node3.stop(invoker3.getRef());
+        }
+
+        if (registrar1 != null) {
+            node1.stop(registrar1.getRef());
+        }
+        if (registrar2 != null) {
+            node2.stop(registrar2.getRef());
+        }
+        if (registrar3 != null) {
+            node3.stop(registrar3.getRef());
+        }
     }
 
     /**
@@ -149,32 +179,30 @@ public class RpcRegistryTest {
     public void testAddRemoveRpcOnSameNode() throws Exception {
         LOG.info("testAddRemoveRpcOnSameNode starting");
 
-        final JavaTestKit mockBroker = new JavaTestKit(node1);
-
         Address nodeAddress = node1.provider().getDefaultAddress();
 
         // Add rpc on node 1
-        registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
 
         List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
 
-        registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef());
+        registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender());
 
         // Bucket store should get an update bucket message. Updated bucket contains added rpc.
+        final JavaTestKit testKit = new JavaTestKit(node1);
 
-        Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry1, mockBroker, nodeAddress);
+        Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry1, testKit, nodeAddress);
         verifyBucket(buckets.get(nodeAddress), addedRouteIds);
 
-        Map<Address, Long> versions = retrieveVersions(registry1, mockBroker);
-        Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(),
+        Map<Address, Long> versions = retrieveVersions(registry1, testKit);
+        Assert.assertEquals("Version for bucket " + nodeAddress, (Long) buckets.get(nodeAddress).getVersion(),
                 versions.get(nodeAddress));
 
         // Now remove rpc
-        registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef());
+        registry1.tell(new RemoveRoutes(addedRouteIds), ActorRef.noSender());
 
         // Bucket store should get an update bucket message. Rpc is removed in the updated bucket
 
-        verifyEmptyBucket(mockBroker, registry1, nodeAddress);
+        verifyEmptyBucket(testKit, registry1, nodeAddress);
 
         LOG.info("testAddRemoveRpcOnSameNode ending");
 
@@ -189,34 +217,31 @@ public class RpcRegistryTest {
 
         LOG.info("testRpcAddRemoveInCluster starting");
 
-        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
-        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
-
         List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
 
         Address node1Address = node1.provider().getDefaultAddress();
 
         // Add rpc on node 1
-        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-        registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef());
+        registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender());
 
         // Bucket store on node2 should get a message to update its local copy of remote buckets
+        final JavaTestKit testKit = new JavaTestKit(node2);
 
-        Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry2, mockBroker2, node1Address);
+        Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry2, testKit, node1Address);
         verifyBucket(buckets.get(node1Address), addedRouteIds);
 
         // Now remove
-        registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef());
+        registry1.tell(new RemoveRoutes(addedRouteIds), ActorRef.noSender());
 
         // Bucket store on node2 should get a message to update its local copy of remote buckets.
         // Wait for the bucket for node1 to be empty.
 
-        verifyEmptyBucket(mockBroker2, registry2, node1Address);
+        verifyEmptyBucket(testKit, registry2, node1Address);
 
         LOG.info("testRpcAddRemoveInCluster ending");
     }
 
-    private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address)
+    private void verifyEmptyBucket(final JavaTestKit testKit, final ActorRef registry, final Address address)
             throws AssertionError {
         Map<Address, Bucket<RoutingTable>> buckets;
         int numTries = 0;
@@ -241,59 +266,68 @@ public class RpcRegistryTest {
      */
     @Test
     public void testRpcAddedOnMultiNodes() throws Exception {
-
-        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
-        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
-        final JavaTestKit mockBroker3 = new JavaTestKit(node3);
-
-        registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
+        final JavaTestKit testKit = new JavaTestKit(node3);
 
         // Add rpc on node 1
         List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds1 = createRouteIds();
-        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-        registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef());
+        registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender());
+
+        final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+            UpdateRemoteEndpoints.class);
 
         // Add rpc on node 2
         List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds2 = createRouteIds();
-        registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
-        registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef());
+        registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender());
 
-        Address node1Address = node1.provider().getDefaultAddress();
+        final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+            UpdateRemoteEndpoints.class);
         Address node2Address = node2.provider().getDefaultAddress();
+        Address node1Address = node1.provider().getDefaultAddress();
 
-        Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry3, mockBroker3, node1Address,
+        Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry3, testKit, node1Address,
                 node2Address);
 
         verifyBucket(buckets.get(node1Address), addedRouteIds1);
         verifyBucket(buckets.get(node2Address), addedRouteIds2);
 
-        Map<Address, Long> versions = retrieveVersions(registry3, mockBroker3);
-        Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(),
+        Map<Address, Long> versions = retrieveVersions(registry3, testKit);
+        Assert.assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(),
                 versions.get(node1Address));
-        Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(),
+        Assert.assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(),
                 versions.get(node2Address));
 
-        RouteIdentifier<?, ?, ?> routeID = addedRouteIds1.get(0);
-        registry3.tell(new FindRouters(routeID), mockBroker3.getRef());
+        assertEndpoints(req1, node1Address, invoker1);
+        assertEndpoints(req2, node2Address, invoker2);
+
+    }
+
+    private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address,
+            final JavaTestKit invoker) {
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = msg.getEndpoints();
+        Assert.assertEquals(1, endpoints.size());
 
-        FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
-                FindRoutersReply.class);
+        final Optional<RemoteRpcEndpoint> maybeEndpoint = endpoints.get(address);
+        Assert.assertNotNull(maybeEndpoint);
+        Assert.assertTrue(maybeEndpoint.isPresent());
 
-        List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
-        Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size());
+        final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
+        final ActorRef router = endpoint.getRouter();
+        Assert.assertNotNull(router);
 
-        respList.get(0).first().tell("hello", ActorRef.noSender());
-        mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello");
+        router.tell("hello", ActorRef.noSender());
+        final String s = invoker.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), String.class);
+        Assert.assertEquals("hello", s);
     }
 
-    private Map<Address, Long> retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) {
+    private static Map<Address, Long> retrieveVersions(final ActorRef bucketStore, final JavaTestKit testKit) {
         bucketStore.tell(new GetBucketVersions(), testKit.getRef());
         GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
                 GetBucketVersionsReply.class);
         return reply.getVersions();
     }
 
-    private void verifyBucket(Bucket<RoutingTable> bucket, List<RouteIdentifier<?, ?, ?>> expRouteIds) {
+    private static void verifyBucket(final Bucket<RoutingTable> bucket,
+            final List<RouteIdentifier<?, ?, ?>> expRouteIds) {
         RoutingTable table = bucket.getData();
         Assert.assertNotNull("Bucket RoutingTable is null", table);
         for (RouteIdentifier<?, ?, ?> r : expRouteIds) {
@@ -305,8 +339,8 @@ public class RpcRegistryTest {
         Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size());
     }
 
-    private Map<Address, Bucket<RoutingTable>> retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit,
-            Address... addresses) {
+    private static Map<Address, Bucket<RoutingTable>> retrieveBuckets(final ActorRef bucketStore,
+            final JavaTestKit testKit, final Address... addresses) {
         int numTries = 0;
         while (true) {
             bucketStore.tell(new GetAllBuckets(), testKit.getRef());
@@ -341,8 +375,6 @@ public class RpcRegistryTest {
     public void testAddRoutesConcurrency() throws Exception {
         final JavaTestKit testKit = new JavaTestKit(node1);
 
-        registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender());
-
         final int nRoutes = 500;
         final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
         for (int i = 0; i < nRoutes; i++) {
@@ -383,53 +415,8 @@ public class RpcRegistryTest {
 
     private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
         QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++);
-        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>(1);
         routeIds.add(new RouteIdentifierImpl(null, type, null));
         return routeIds;
     }
-
-    @Test
-    public void testFindRoutersNotPresentInitially() throws Exception {
-
-        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
-        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
-
-        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-        registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
-
-        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = createRouteIds();
-        routeIds.addAll(createRouteIds());
-
-        JavaTestKit replyKit1 = new JavaTestKit(node1);
-        registry1.tell(new FindRouters(routeIds.get(0)), replyKit1.getRef());
-        JavaTestKit replyKit2 = new JavaTestKit(node1);
-        registry1.tell(new FindRouters(routeIds.get(1)), replyKit2.getRef());
-
-        registry2.tell(new AddOrUpdateRoutes(routeIds), mockBroker2.getRef());
-
-        FindRoutersReply reply = replyKit1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
-                FindRoutersReply.class);
-        Assert.assertEquals("getRouterWithUpdateTime size", 1, reply.getRouterWithUpdateTime().size());
-
-        reply = replyKit2.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
-                FindRoutersReply.class);
-        Assert.assertEquals("getRouterWithUpdateTime size", 1, reply.getRouterWithUpdateTime().size());
-    }
-
-    @Test
-    public void testFindRoutersNonExistent() throws Exception {
-
-        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
-
-        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-
-        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = createRouteIds();
-
-        registry1.tell(new FindRouters(routeIds.get(0)), mockBroker1.getRef());
-
-        FindRoutersReply reply = mockBroker1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
-                FindRoutersReply.class);
-        List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
-        Assert.assertEquals("getRouterWithUpdateTime size", 0, respList.size());
-    }
 }
index 39bcd76..ff7365c 100644 (file)
@@ -59,15 +59,15 @@ public class BucketStoreTest {
         final BucketStore<T> store = createStore();
 
         Address localAddress = system.provider().getDefaultAddress();
-        Bucket<T> localBucket = new BucketImpl<>();
+        Bucket<T> localBucket = new BucketImpl<>(new T());
 
         Address a1 = new Address("tcp", "system1");
         Address a2 = new Address("tcp", "system2");
         Address a3 = new Address("tcp", "system3");
 
-        Bucket<T> b1 = new BucketImpl<>();
-        Bucket<T> b2 = new BucketImpl<>();
-        Bucket<T> b3 = new BucketImpl<>();
+        Bucket<T> b1 = new BucketImpl<>(new T());
+        Bucket<T> b2 = new BucketImpl<>(new T());
+        Bucket<T> b3 = new BucketImpl<>(new T());
 
         Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
         remoteBuckets.put(a1, b1);
@@ -86,7 +86,7 @@ public class BucketStoreTest {
 
         //Add a new remote bucket
         Address a4 = new Address("tcp", "system4");
-        Bucket<T> b4 = new BucketImpl<>();
+        Bucket<T> b4 = new BucketImpl<>(new T());
         remoteBuckets.clear();
         remoteBuckets.put(a4, b4);
         store.receiveUpdateRemoteBuckets(remoteBuckets);
@@ -98,7 +98,7 @@ public class BucketStoreTest {
         Assert.assertTrue(remoteBucketsInStore.size() == 4);
 
         //Update a bucket
-        Bucket<T> b3New = new BucketImpl<>();
+        Bucket<T> b3New = new BucketImpl<>(new T());
         remoteBuckets.clear();
         remoteBuckets.put(a3, b3New);
         remoteBuckets.put(a1, null);
@@ -121,10 +121,10 @@ public class BucketStoreTest {
         //versions map contains versions for all remote buckets (4).
         Map<Address, Long> versionsInStore = store.getVersions();
         Assert.assertEquals(4, versionsInStore.size());
-        Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1));
-        Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2));
-        Assert.assertEquals(b3New.getVersion(), versionsInStore.get(a3));
-        Assert.assertEquals(b4.getVersion(), versionsInStore.get(a4));
+        Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1));
+        Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2));
+        Assert.assertEquals((Long)b3New.getVersion(), versionsInStore.get(a3));
+        Assert.assertEquals((Long)b4.getVersion(), versionsInStore.get(a4));
 
         //Send older version of bucket
         remoteBuckets.clear();
@@ -134,7 +134,7 @@ public class BucketStoreTest {
         //Should NOT update a3
         remoteBucketsInStore = store.getRemoteBuckets();
         b3InStore = remoteBucketsInStore.get(a3);
-        Assert.assertTrue(b3InStore.getVersion().longValue() == b3New.getVersion().longValue());
+        Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion());
 
     }
 
@@ -144,7 +144,8 @@ public class BucketStoreTest {
      * @return instance of BucketStore class
      */
     private static BucketStore<T> createStore() {
-        final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()));
+        final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()),
+            new T());
         final TestActorRef<BucketStore<T>> testRef = TestActorRef.create(system, props, "testStore");
         return testRef.underlyingActor();
     }
index 6425340..cd4f6d5 100644 (file)
@@ -22,9 +22,6 @@ import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.typesafe.config.ConfigFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -70,8 +67,7 @@ public class GossiperTest {
 
     @Test
     public void testReceiveGossipTick_WhenNoRemoteMemberShouldIgnore() {
-
-        mockGossiper.setClusterMembers(Collections.<Address>emptyList());
+        mockGossiper.setClusterMembers();
         doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
         mockGossiper.receiveGossipTick();
         verify(mockGossiper, times(0)).getLocalStatusAndSendTo(any(Address.class));
@@ -79,11 +75,7 @@ public class GossiperTest {
 
     @Test
     public void testReceiveGossipTick_WhenRemoteMemberExistsShouldSendStatus() {
-        List<Address> members = new ArrayList<>();
-        Address remote = new Address("tcp", "member");
-        members.add(remote);
-
-        mockGossiper.setClusterMembers(members);
+        mockGossiper.setClusterMembers(new Address("tcp", "member"));
         doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
         mockGossiper.receiveGossipTick();
         verify(mockGossiper, times(1)).getLocalStatusAndSendTo(any(Address.class));
@@ -97,10 +89,7 @@ public class GossiperTest {
         GossipStatus remoteStatus = new GossipStatus(nonMember, mock(Map.class));
 
         //add a member
-        List<Address> members = new ArrayList<>();
-        members.add(new Address("tcp", "member"));
-
-        mockGossiper.setClusterMembers(members);
+        mockGossiper.setClusterMembers(new Address("tcp", "member"));
         mockGossiper.receiveGossipStatus(remoteStatus);
         verify(mockGossiper, times(0)).getSender();
     }
@@ -122,8 +111,7 @@ public class GossiperTest {
      * @return instance of Gossiper class
      */
     private static Gossiper createGossiper() {
-        final Props props = Props.create(Gossiper.class, false,
-                new RemoteRpcProviderConfig(system.settings().config()));
+        final Props props = Gossiper.testProps(new RemoteRpcProviderConfig(system.settings().config()));
         final TestActorRef<Gossiper> testRef = TestActorRef.create(system, props, "testGossiper");
 
         return testRef.underlyingActor();