*/
package org.opendaylight.controller.remote.rpc.registry;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Option;
-import akka.japi.Pair;
-import akka.pattern.Patterns;
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
-import java.util.ArrayList;
-import java.util.Collections;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
+import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
/**
- * Registry to look up cluster nodes that have registered for a given rpc.
- * <p/>
- * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * Registry to look up cluster nodes that have registered for a given RPC.
+ *
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
* cluster wide information.
*/
-public class RpcRegistry extends UntypedActor {
-
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
- /**
- * Store to keep the registry. Bucket store sync's it across nodes in the cluster
- */
- private ActorRef bucketStore;
+public class RpcRegistry extends BucketStoreActor<RoutingTable> {
+ private final ActorRef rpcRegistrar;
+ private final RemoteRpcRegistryMXBeanImpl mxBean;
+
+ public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+ super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
+ this.rpcRegistrar = requireNonNull(rpcRegistrar);
+ this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+ config.getAskDuration()), config.getAskDuration());
+ }
/**
- * Rpc broker that would use the registry to route requests.
+ * Create a new props instance for instantiating an RpcRegistry actor.
+ *
+ * @param config Provider configuration
+ * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+ * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+ * @return A new {@link Props} instance
*/
- private ActorRef localRouter;
-
- public RpcRegistry() {
- bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-
- log.info("Bucket store path = {}", bucketStore.path().toString());
+ public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
+ final ActorRef rpcRegistrar) {
+ return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
}
- public RpcRegistry(ActorRef bucketStore) {
- this.bucketStore = bucketStore;
+ @Override
+ public void postStop() throws Exception {
+ super.postStop();
+ this.mxBean.unregister();
}
@Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: message [{}]", message);
-
- //TODO: if sender is remote, reject message
-
- if (message instanceof SetLocalRouter)
- receiveSetLocalRouter((SetLocalRouter) message);
-
- if (message instanceof AddOrUpdateRoutes)
+ protected void handleCommand(final Object message) throws Exception {
+ if (message instanceof AddOrUpdateRoutes) {
receiveAddRoutes((AddOrUpdateRoutes) message);
-
- else if (message instanceof RemoveRoutes)
+ } else if (message instanceof RemoveRoutes) {
receiveRemoveRoutes((RemoveRoutes) message);
-
- else if (message instanceof Messages.FindRouters)
- receiveGetRouter((FindRouters) message);
-
- else
- unhandled(message);
- }
-
- /**
- * Register's rpc broker
- *
- * @param message contains {@link akka.actor.ActorRef} for rpc broker
- */
- private void receiveSetLocalRouter(SetLocalRouter message) {
- localRouter = message.getRouter();
- }
-
- /**
- * @param msg
- */
- private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
- Preconditions.checkState(localRouter != null, "Router must be set first");
-
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
- futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+ } else {
+ super.handleCommand(message);
+ }
}
- /**
- * @param msg contains list of route ids to remove
- */
- private void receiveRemoveRoutes(RemoveRoutes msg) {
-
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
- futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
-
+ private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+ LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+ updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
}
/**
- * Finds routers for the given rpc.
+ * Processes a RemoveRoutes message.
*
- * @param msg
+ * @param msg contains list of route ids to remove
*/
- private void receiveGetRouter(FindRouters msg) {
- final ActorRef sender = getSender();
-
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
- futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
+ private void receiveRemoveRoutes(final RemoveRoutes msg) {
+ LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
+ updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
}
- /**
- * Helper to create empty reply when no routers are found
- *
- * @return
- */
- private Messages.FindRoutersReply createEmptyReply() {
- List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
- return new Messages.FindRoutersReply(routerWithUpdateTime);
+ @Override
+ protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
+ rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
+ ActorRef.noSender());
}
- /**
- * Helper to create a reply when routers are found for the given rpc
- *
- * @param buckets
- * @param routeId
- * @return
- */
- private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
- List<Pair<ActorRef, Long>> routers = new ArrayList<>();
- Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
-
- for (Bucket bucket : buckets.values()) {
-
- RoutingTable table = (RoutingTable) bucket.getData();
- if (table == null)
- continue;
+ @Override
+ protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+ final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
- routerWithUpdateTime = table.getRouterFor(routeId);
- if (routerWithUpdateTime.isEmpty())
- continue;
+ for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+ final RoutingTable table = e.getValue().getData();
- routers.add(routerWithUpdateTime.get());
+ final Collection<DOMRpcIdentifier> rpcs = table.getItems();
+ endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+ : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
}
- return new Messages.FindRoutersReply(routers);
- }
-
-
- ///
- ///private factories to create Mapper
- ///
-
- /**
- * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
- *
- * @param routeId the rpc
- * @param sender client who asked to find the routers.
- * @return
- */
- private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
-
- if (replyMessage instanceof GetAllBucketsReply) {
-
- GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
- Map<Address, Bucket> buckets = reply.getBuckets();
-
- if (buckets == null || buckets.isEmpty()) {
- sender.tell(createEmptyReply(), getSelf());
- return null;
- }
-
- sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
- }
- return null;
- }
- };
- }
-
- /**
- * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
- * it updates the local bucket in bucket store.
- *
- * @param routeIds rpc to remote
- * @return
- */
- private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetLocalBucketReply) {
-
- GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
- Bucket<RoutingTable> bucket = reply.getBucket();
-
- if (bucket == null) {
- log.debug("Local bucket is null");
- return null;
- }
-
- RoutingTable table = bucket.getData();
- if (table == null)
- table = new RoutingTable();
-
- table.setRouter(localRouter);
-
- if (!table.isEmpty()) {
- for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- table.removeRoute(routeId);
- }
- }
- bucket.setData(table);
-
- UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
- bucketStore.tell(updateBucketMessage, getSelf());
- }
- return null;
- }
- };
+ if (!endpoints.isEmpty()) {
+ rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+ }
}
- /**
- * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
- * it updates the local bucket in bucket store.
- *
- * @param routeIds rpc to add
- * @return
- */
- private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
+ public static final class RemoteRpcEndpoint {
+ private final Set<DOMRpcIdentifier> rpcs;
+ private final ActorRef router;
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetLocalBucketReply) {
-
- GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
- Bucket<RoutingTable> bucket = reply.getBucket();
-
- if (bucket == null) {
- log.debug("Local bucket is null");
- return null;
- }
-
- RoutingTable table = bucket.getData();
- if (table == null)
- table = new RoutingTable();
-
- table.setRouter(localRouter);
- for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- table.addRoute(routeId);
- }
-
- bucket.setData(table);
+ @VisibleForTesting
+ public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+ this.router = requireNonNull(router);
+ this.rpcs = ImmutableSet.copyOf(rpcs);
+ }
- UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
- bucketStore.tell(updateBucketMessage, getSelf());
- }
+ public ActorRef getRouter() {
+ return router;
+ }
- return null;
- }
- };
+ public Set<DOMRpcIdentifier> getRpcs() {
+ return rpcs;
+ }
}
/**
- * All messages used by the RpcRegistry
+ * All messages used by the RpcRegistry.
*/
public static class Messages {
+ abstract static class AbstractRouteMessage {
+ final List<DOMRpcIdentifier> rpcRouteIdentifiers;
-
- public static class ContainsRoute {
- final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
-
- public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
- Preconditions.checkArgument(routeIdentifiers != null &&
- !routeIdentifiers.isEmpty(),
- "Route Identifiers must be supplied");
- this.routeIdentifiers = routeIdentifiers;
+ AbstractRouteMessage(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+ checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
+ this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
}
- public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
- return this.routeIdentifiers;
+ List<DOMRpcIdentifier> getRouteIdentifiers() {
+ return this.rpcRouteIdentifiers;
}
@Override
public String toString() {
- return "ContainsRoute{" +
- "routeIdentifiers=" + routeIdentifiers +
- '}';
+ return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
}
}
- public static class AddOrUpdateRoutes extends ContainsRoute {
-
- public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
- super(routeIdentifiers);
+ public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
+ public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+ super(rpcRouteIdentifiers);
}
- }
-
- public static class RemoveRoutes extends ContainsRoute {
- public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
- super(routeIdentifiers);
- }
}
- public static class SetLocalRouter {
- private final ActorRef router;
-
- public SetLocalRouter(ActorRef router) {
- Preconditions.checkArgument(router != null, "Router must not be null");
- this.router = router;
- }
-
- public ActorRef getRouter() {
- return this.router;
- }
-
- @Override
- public String toString() {
- return "SetLocalRouter{" +
- "router=" + router +
- '}';
- }
- }
-
- public static class FindRouters {
- private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
-
- public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
- Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
- this.routeIdentifier = routeIdentifier;
- }
-
- public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
- return routeIdentifier;
- }
-
- @Override
- public String toString() {
- return "FindRouters{" +
- "routeIdentifier=" + routeIdentifier +
- '}';
+ public static final class RemoveRoutes extends AbstractRouteMessage {
+ public RemoveRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+ super(rpcRouteIdentifiers);
}
}
- public static class FindRoutersReply {
- final List<Pair<ActorRef, Long>> routerWithUpdateTime;
+ public static final class UpdateRemoteEndpoints {
+ private final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints;
- public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
- Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
- this.routerWithUpdateTime = routerWithUpdateTime;
- }
- public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
- return routerWithUpdateTime;
+ @VisibleForTesting
+ public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
+ this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
}
- @Override
- public String toString() {
- return "FindRoutersReply{" +
- "routerWithUpdateTime=" + routerWithUpdateTime +
- '}';
+ public Map<Address, Optional<RemoteRpcEndpoint>> getRpcEndpoints() {
+ return rpcEndpoints;
}
}
}