X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=1545eb00d2cc9af4bda80881d197bad132f0e667;hb=8426e7a67b1235e8ecc67b1a98a5bd096c88e729;hp=5109d316446b13158e3739824e653e0259929135;hpb=641dd738c5c64121b7f7816dcef3b766051d1196;p=controller.git
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
index 5109d31644..1545eb00d2 100644
--- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
+++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
@@ -10,385 +10,186 @@ package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Option;
-import akka.japi.Pair;
-import akka.pattern.Patterns;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
/**
- * Registry to look up cluster nodes that have registered for a given rpc.
- *
+ * Registry to look up cluster nodes that have registered for a given RPC.
+ *
+ *
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
-public class RpcRegistry extends UntypedActor {
-
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+public class RpcRegistry extends BucketStore {
+ private final ActorRef rpcRegistrar;
- /**
- * Store to keep the registry. Bucket store sync's it across nodes in the cluster
- */
- private ActorRef bucketStore;
+ public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+ super(config, new RoutingTable(rpcInvoker));
+ this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
+ }
/**
- * Rpc broker that would use the registry to route requests.
+ * Create a new props instance for instantiating an RpcRegistry actor.
+ *
+ * @param config Provider configuration
+ * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+ * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+ * @return A new {@link Props} instance
*/
- private ActorRef localRouter;
-
- public RpcRegistry() {
- bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-
- log.info("Bucket store path = {}", bucketStore.path().toString());
- }
-
- public RpcRegistry(ActorRef bucketStore) {
- this.bucketStore = bucketStore;
+ public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
+ final ActorRef rpcRegistrar) {
+ return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
}
@Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: message [{}]", message);
-
- //TODO: if sender is remote, reject message
-
- if (message instanceof SetLocalRouter)
- receiveSetLocalRouter((SetLocalRouter) message);
-
- if (message instanceof AddOrUpdateRoutes)
+ protected void handleReceive(final Object message) throws Exception {
+ if (message instanceof AddOrUpdateRoutes) {
receiveAddRoutes((AddOrUpdateRoutes) message);
-
- else if (message instanceof RemoveRoutes)
+ } else if (message instanceof RemoveRoutes) {
receiveRemoveRoutes((RemoveRoutes) message);
-
- else if (message instanceof Messages.FindRouters)
- receiveGetRouter((FindRouters) message);
-
- else
- unhandled(message);
- }
-
- /**
- * Register's rpc broker
- *
- * @param message contains {@link akka.actor.ActorRef} for rpc broker
- */
- private void receiveSetLocalRouter(SetLocalRouter message) {
- localRouter = message.getRouter();
- }
-
- /**
- * @param msg
- */
- private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
- Preconditions.checkState(localRouter != null, "Router must be set first");
-
- Future