package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
-import akka.actor.Address;
-import akka.actor.Props;
-import akka.dispatch.Mapper;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Option;
import akka.japi.Pair;
-import akka.pattern.Patterns;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
/**
* Registry to look up cluster nodes that have registered for a given rpc.
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
-public class RpcRegistry extends AbstractUntypedActorWithMetering {
+public class RpcRegistry extends BucketStore<RoutingTable> {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
- /**
- * Store to keep the registry. Bucket store sync's it across nodes in the cluster
- */
- private ActorRef bucketStore;
-
- /**
- * Rpc broker that would use the registry to route requests.
- */
- private ActorRef localRouter;
-
- private RemoteRpcProviderConfig config;
-
public RpcRegistry() {
- bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
- this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
- log.info("Bucket store path = {}", bucketStore.path().toString());
+ getLocalBucket().setData(new RoutingTable());
}
- public RpcRegistry(ActorRef bucketStore) {
- this.bucketStore = bucketStore;
- }
-
-
@Override
protected void handleReceive(Object message) throws Exception {
//TODO: if sender is remote, reject message
- if (message instanceof SetLocalRouter)
+ if (message instanceof SetLocalRouter) {
receiveSetLocalRouter((SetLocalRouter) message);
-
- if (message instanceof AddOrUpdateRoutes)
+ } else if (message instanceof AddOrUpdateRoutes) {
receiveAddRoutes((AddOrUpdateRoutes) message);
-
- else if (message instanceof RemoveRoutes)
+ } else if (message instanceof RemoveRoutes) {
receiveRemoveRoutes((RemoveRoutes) message);
-
- else if (message instanceof Messages.FindRouters)
+ } else if (message instanceof Messages.FindRouters) {
receiveGetRouter((FindRouters) message);
-
- else
- unhandled(message);
+ } else {
+ super.handleReceive(message);
+ }
}
/**
* @param message contains {@link akka.actor.ActorRef} for rpc broker
*/
private void receiveSetLocalRouter(SetLocalRouter message) {
- localRouter = message.getRouter();
+ getLocalBucket().getData().setRouter(message.getRouter());
}
/**
*/
private void receiveAddRoutes(AddOrUpdateRoutes msg) {
- Preconditions.checkState(localRouter != null, "Router must be set first");
+ log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+
+ RoutingTable table = getLocalBucket().getData().copy();
+ for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ table.addRoute(routeId);
+ }
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
- futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+ updateLocalBucket(table);
}
/**
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
- futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+ RoutingTable table = getLocalBucket().getData().copy();
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ table.removeRoute(routeId);
+ }
+ updateLocalBucket(table);
}
/**
* @param msg
*/
private void receiveGetRouter(FindRouters msg) {
- final ActorRef sender = getSender();
-
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
- futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
- }
-
- /**
- * Helper to create empty reply when no routers are found
- *
- * @return
- */
- private Messages.FindRoutersReply createEmptyReply() {
- List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
- return new Messages.FindRoutersReply(routerWithUpdateTime);
- }
-
- /**
- * Helper to create a reply when routers are found for the given rpc
- *
- * @param buckets
- * @param routeId
- * @return
- */
- private Messages.FindRoutersReply createReplyWithRouters(
- Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
List<Pair<ActorRef, Long>> routers = new ArrayList<>();
- Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
-
- for (Bucket bucket : buckets.values()) {
-
- RoutingTable table = (RoutingTable) bucket.getData();
- if (table == null)
- continue;
- routerWithUpdateTime = table.getRouterFor(routeId);
- if (routerWithUpdateTime.isEmpty())
- continue;
+ RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+ findRoutes(getLocalBucket().getData(), routeId, routers);
- routers.add(routerWithUpdateTime.get());
+ for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+ findRoutes(bucket.getData(), routeId, routers);
}
- return new Messages.FindRoutersReply(routers);
- }
-
-
- ///
- ///private factories to create Mapper
- ///
-
- /**
- * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
- *
- * @param routeId the rpc
- * @param sender client who asked to find the routers.
- * @return
- */
- private Mapper<Object, Void> getMapperToGetRouter(
- final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
-
- if (replyMessage instanceof GetAllBucketsReply) {
-
- GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
- Map<Address, Bucket> buckets = reply.getBuckets();
-
- if (buckets == null || buckets.isEmpty()) {
- sender.tell(createEmptyReply(), getSelf());
- return null;
- }
-
- sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
- }
- return null;
- }
- };
- }
-
- /**
- * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
- * it updates the local bucket in bucket store.
- *
- * @param routeIds rpc to remote
- * @return
- */
- private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetLocalBucketReply) {
-
- GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
- Bucket<RoutingTable> bucket = reply.getBucket();
-
- if (bucket == null) {
- log.debug("Local bucket is null");
- return null;
- }
-
- RoutingTable table = bucket.getData();
- if (table == null)
- table = new RoutingTable();
-
- table.setRouter(localRouter);
-
- if (!table.isEmpty()) {
- for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- table.removeRoute(routeId);
- }
- }
- bucket.setData(table);
-
- UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
- bucketStore.tell(updateBucketMessage, getSelf());
- }
- return null;
- }
- };
+ getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
}
- /**
- * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
- * it updates the local bucket in bucket store.
- *
- * @param routeIds rpc to add
- * @return
- */
- private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetLocalBucketReply) {
-
- GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
- Bucket<RoutingTable> bucket = reply.getBucket();
-
- if (bucket == null) {
- log.debug("Local bucket is null");
- return null;
- }
-
- RoutingTable table = bucket.getData();
- if (table == null)
- table = new RoutingTable();
-
- table.setRouter(localRouter);
- for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- table.addRoute(routeId);
- }
-
- bucket.setData(table);
-
- UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
- bucketStore.tell(updateBucketMessage, getSelf());
- }
+ private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
+ List<Pair<ActorRef, Long>> routers) {
+ if (table == null) {
+ return;
+ }
- return null;
- }
- };
+ Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
+ if(!routerWithUpdateTime.isEmpty()) {
+ routers.add(routerWithUpdateTime.get());
+ }
}
/**