X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=095d70926b90d3838a777cc14a6976b31ccd9c97;hb=dd281c0e33267296ad3babbffd03e1122cdb127e;hp=51609870cc4aad1c8789dfdcd0b68f04563b5cdf;hpb=ee146664ac8ae45439c14a84fe769633c3ebf847;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 51609870cc..095d70926b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.dispatch.Mapper; import akka.event.Logging; import akka.event.LoggingAdapter; @@ -18,6 +17,8 @@ import akka.japi.Option; import akka.japi.Pair; import akka.pattern.Patterns; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; import org.opendaylight.controller.sal.connector.api.RpcRouter; @@ -28,8 +29,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; @@ -39,12 +41,11 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu /** * Registry to look up cluster nodes that have registered for a given rpc. - *

+ *

* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this * cluster wide information. - * */ -public class RpcRegistry extends UntypedActor { +public class RpcRegistry extends AbstractUntypedActorWithMetering { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); @@ -58,32 +59,34 @@ public class RpcRegistry extends UntypedActor { */ private ActorRef localRouter; + private RemoteRpcProviderConfig config; + public RpcRegistry() { bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store"); + this.config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + log.info("Bucket store path = {}", bucketStore.path().toString()); } public RpcRegistry(ActorRef bucketStore) { this.bucketStore = bucketStore; } - @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: message [{}]", message); + @Override + protected void handleReceive(Object message) throws Exception { //TODO: if sender is remote, reject message if (message instanceof SetLocalRouter) receiveSetLocalRouter((SetLocalRouter) message); - if (message instanceof AddOrUpdateRoute) - receiveAddRoute((AddOrUpdateRoute) message); + if (message instanceof AddOrUpdateRoutes) + receiveAddRoutes((AddOrUpdateRoutes) message); - else if (message instanceof RemoveRoute) - receiveRemoveRoute((RemoveRoute) message); + else if (message instanceof RemoveRoutes) + receiveRemoveRoutes((RemoveRoutes) message); else if (message instanceof Messages.FindRouters) - receiveGetRouter((Messages.FindRouters) message); + receiveGetRouter((FindRouters) message); else unhandled(message); @@ -95,55 +98,40 @@ public class RpcRegistry extends UntypedActor { * @param message contains {@link akka.actor.ActorRef} for rpc broker */ private void receiveSetLocalRouter(SetLocalRouter message) { - if (message == null || message.getRouter() == null) - return;//ignore - localRouter = message.getRouter(); } /** - * //TODO: update this to accept multiple route registration * @param msg */ - private void receiveAddRoute(AddOrUpdateRoute msg) { - if (msg.getRouteIdentifier() == null) - return;//ignore + private void receiveAddRoutes(AddOrUpdateRoutes msg) { Preconditions.checkState(localRouter != null, "Router must be set first"); - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000); - futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher()); + Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); + futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); } /** - * //TODO: update this to accept multiple routes - * @param msg + * @param msg contains list of route ids to remove */ - private void receiveRemoveRoute(RemoveRoute msg) { - if (msg.getRouteIdentifier() == null) - return;//ignore + private void receiveRemoveRoutes(RemoveRoutes msg) { - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000); - futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher()); + Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); + futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); } /** * Finds routers for the given rpc. + * * @param msg */ - private void receiveGetRouter(Messages.FindRouters msg) { + private void receiveGetRouter(FindRouters msg) { final ActorRef sender = getSender(); - //if empty message, return empty list - if (msg.getRouteIdentifier() == null) { - sender.tell(createEmptyReply(), getSelf()); - return; - } - - Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000); + Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration()); futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher()); - } /** @@ -158,25 +146,24 @@ public class RpcRegistry extends UntypedActor { /** * Helper to create a reply when routers are found for the given rpc + * * @param buckets * @param routeId * @return */ - private Messages.FindRoutersReply createReplyWithRouters(Map buckets, RpcRouter.RouteIdentifier routeId) { + private Messages.FindRoutersReply createReplyWithRouters( + Map buckets, RpcRouter.RouteIdentifier routeId) { List> routers = new ArrayList<>(); - Option> routerWithUpdateTime = null; for (Bucket bucket : buckets.values()) { RoutingTable table = (RoutingTable) bucket.getData(); - if (table == null) continue; routerWithUpdateTime = table.getRouterFor(routeId); - if (routerWithUpdateTime.isEmpty()) continue; @@ -192,13 +179,14 @@ public class RpcRegistry extends UntypedActor { /// /** - * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found + * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found * * @param routeId the rpc * @param sender client who asked to find the routers. * @return */ - private Mapper getMapperToGetRouter(final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { + private Mapper getMapperToGetRouter( + final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { return new Mapper() { @Override public Void apply(Object replyMessage) { @@ -224,10 +212,10 @@ public class RpcRegistry extends UntypedActor { * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently, * it updates the local bucket in bucket store. * - * @param routeId rpc to remote + * @param routeIds rpc to remote * @return */ - private Mapper getMapperToRemoveRoute(final RpcRouter.RouteIdentifier routeId) { + private Mapper getMapperToRemoveRoutes(final List> routeIds) { return new Mapper() { @Override public Void apply(Object replyMessage) { @@ -246,8 +234,12 @@ public class RpcRegistry extends UntypedActor { table = new RoutingTable(); table.setRouter(localRouter); - table.removeRoute(routeId); + if (!table.isEmpty()) { + for (RpcRouter.RouteIdentifier routeId : routeIds) { + table.removeRoute(routeId); + } + } bucket.setData(table); UpdateBucket updateBucketMessage = new UpdateBucket(bucket); @@ -262,10 +254,10 @@ public class RpcRegistry extends UntypedActor { * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently, * it updates the local bucket in bucket store. * - * @param routeId rpc to add + * @param routeIds rpc to add * @return */ - private Mapper getMapperToAddRoute(final RpcRouter.RouteIdentifier routeId) { + private Mapper getMapperToAddRoutes(final List> routeIds) { return new Mapper() { @Override @@ -285,7 +277,9 @@ public class RpcRegistry extends UntypedActor { table = new RoutingTable(); table.setRouter(localRouter); - table.addRoute(routeId); + for (RpcRouter.RouteIdentifier routeId : routeIds) { + table.addRoute(routeId); + } bucket.setData(table); @@ -305,47 +299,50 @@ public class RpcRegistry extends UntypedActor { public static class ContainsRoute { - final RpcRouter.RouteIdentifier routeIdentifier; + final List> routeIdentifiers; - public ContainsRoute(RpcRouter.RouteIdentifier routeIdentifier) { - Preconditions.checkArgument(routeIdentifier != null); - this.routeIdentifier = routeIdentifier; + public ContainsRoute(List> routeIdentifiers) { + Preconditions.checkArgument(routeIdentifiers != null && + !routeIdentifiers.isEmpty(), + "Route Identifiers must be supplied"); + this.routeIdentifiers = routeIdentifiers; } - public RpcRouter.RouteIdentifier getRouteIdentifier(){ - return this.routeIdentifier; + public List> getRouteIdentifiers() { + return this.routeIdentifiers; } @Override public String toString() { - return this.getClass().getSimpleName() + "{" + - "routeIdentifier=" + routeIdentifier + + return "ContainsRoute{" + + "routeIdentifiers=" + routeIdentifiers + '}'; } } - public static class AddOrUpdateRoute extends ContainsRoute{ + public static class AddOrUpdateRoutes extends ContainsRoute { - public AddOrUpdateRoute(RpcRouter.RouteIdentifier routeIdentifier) { - super(routeIdentifier); + public AddOrUpdateRoutes(List> routeIdentifiers) { + super(routeIdentifiers); } } - public static class RemoveRoute extends ContainsRoute { + public static class RemoveRoutes extends ContainsRoute { - public RemoveRoute(RpcRouter.RouteIdentifier routeIdentifier) { - super(routeIdentifier); + public RemoveRoutes(List> routeIdentifiers) { + super(routeIdentifiers); } } - public static class SetLocalRouter{ + public static class SetLocalRouter { private final ActorRef router; public SetLocalRouter(ActorRef router) { + Preconditions.checkArgument(router != null, "Router must not be null"); this.router = router; } - public ActorRef getRouter(){ + public ActorRef getRouter() { return this.router; } @@ -357,9 +354,23 @@ public class RpcRegistry extends UntypedActor { } } - public static class FindRouters extends ContainsRoute { + public static class FindRouters { + private final RpcRouter.RouteIdentifier routeIdentifier; + public FindRouters(RpcRouter.RouteIdentifier routeIdentifier) { - super(routeIdentifier); + Preconditions.checkArgument(routeIdentifier != null, "Route must not be null"); + this.routeIdentifier = routeIdentifier; + } + + public RpcRouter.RouteIdentifier getRouteIdentifier() { + return routeIdentifier; + } + + @Override + public String toString() { + return "FindRouters{" + + "routeIdentifier=" + routeIdentifier + + '}'; } } @@ -367,10 +378,11 @@ public class RpcRegistry extends UntypedActor { final List> routerWithUpdateTime; public FindRoutersReply(List> routerWithUpdateTime) { + Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null"); this.routerWithUpdateTime = routerWithUpdateTime; } - public List> getRouterWithUpdateTime(){ + public List> getRouterWithUpdateTime() { return routerWithUpdateTime; }