import com.google.common.base.Preconditions;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import scala.concurrent.Future;
import java.util.List;
import java.util.Map;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
/**
* Registry to look up cluster nodes that have registered for a given rpc.
- * <p>
+ * <p/>
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
- *
*/
public class RpcRegistry extends UntypedActor {
public RpcRegistry() {
bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
+
+ log.info("Bucket store path = {}", bucketStore.path().toString());
}
public RpcRegistry(ActorRef bucketStore) {
if (message instanceof SetLocalRouter)
receiveSetLocalRouter((SetLocalRouter) message);
- if (message instanceof AddOrUpdateRoute)
- receiveAddRoute((AddOrUpdateRoute) message);
+ if (message instanceof AddOrUpdateRoutes)
+ receiveAddRoutes((AddOrUpdateRoutes) message);
- else if (message instanceof RemoveRoute)
- receiveRemoveRoute((RemoveRoute) message);
+ else if (message instanceof RemoveRoutes)
+ receiveRemoveRoutes((RemoveRoutes) message);
else if (message instanceof Messages.FindRouters)
- receiveGetRouter((Messages.FindRouters) message);
+ receiveGetRouter((FindRouters) message);
else
unhandled(message);
* @param message contains {@link akka.actor.ActorRef} for rpc broker
*/
private void receiveSetLocalRouter(SetLocalRouter message) {
- if (message == null || message.getRouter() == null)
- return;//ignore
-
localRouter = message.getRouter();
}
/**
- * //TODO: update this to accept multiple route registration
* @param msg
*/
- private void receiveAddRoute(AddOrUpdateRoute msg) {
- if (msg.getRouteIdentifier() == null)
- return;//ignore
+ private void receiveAddRoutes(AddOrUpdateRoutes msg) {
Preconditions.checkState(localRouter != null, "Router must be set first");
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
- futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+ futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
/**
- * //TODO: update this to accept multiple routes
- * @param msg
+ * @param msg contains list of route ids to remove
*/
- private void receiveRemoveRoute(RemoveRoute msg) {
- if (msg.getRouteIdentifier() == null)
- return;//ignore
+ private void receiveRemoveRoutes(RemoveRoutes msg) {
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
- futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+ futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
/**
* Finds routers for the given rpc.
+ *
* @param msg
*/
- private void receiveGetRouter(Messages.FindRouters msg) {
+ private void receiveGetRouter(FindRouters msg) {
final ActorRef sender = getSender();
- //if empty message, return empty list
- if (msg.getRouteIdentifier() == null) {
- sender.tell(createEmptyReply(), getSelf());
- return;
- }
-
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
-
}
/**
/**
* Helper to create a reply when routers are found for the given rpc
+ *
* @param buckets
* @param routeId
* @return
private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-
Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
for (Bucket bucket : buckets.values()) {
RoutingTable table = (RoutingTable) bucket.getData();
-
if (table == null)
continue;
routerWithUpdateTime = table.getRouterFor(routeId);
-
if (routerWithUpdateTime.isEmpty())
continue;
///
/**
- * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
+ * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
*
* @param routeId the rpc
* @param sender client who asked to find the routers.
* Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
* it updates the local bucket in bucket store.
*
- * @param routeId rpc to remote
+ * @param routeIds rpc to remote
* @return
*/
- private Mapper<Object, Void> getMapperToRemoveRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
return new Mapper<Object, Void>() {
@Override
public Void apply(Object replyMessage) {
table = new RoutingTable();
table.setRouter(localRouter);
- table.removeRoute(routeId);
+ if (!table.isEmpty()) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ table.removeRoute(routeId);
+ }
+ }
bucket.setData(table);
UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
* Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
* it updates the local bucket in bucket store.
*
- * @param routeId rpc to add
+ * @param routeIds rpc to add
* @return
*/
- private Mapper<Object, Void> getMapperToAddRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
return new Mapper<Object, Void>() {
@Override
table = new RoutingTable();
table.setRouter(localRouter);
- table.addRoute(routeId);
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ table.addRoute(routeId);
+ }
bucket.setData(table);
public static class ContainsRoute {
- final RpcRouter.RouteIdentifier<?,?,?> routeIdentifier;
+ final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
- public ContainsRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
- Preconditions.checkArgument(routeIdentifier != null);
- this.routeIdentifier = routeIdentifier;
+ public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ Preconditions.checkArgument(routeIdentifiers != null &&
+ !routeIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
+ this.routeIdentifiers = routeIdentifiers;
}
- public RpcRouter.RouteIdentifier<?,?,?> getRouteIdentifier(){
- return this.routeIdentifier;
+ public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+ return this.routeIdentifiers;
}
@Override
public String toString() {
- return this.getClass().getSimpleName() + "{" +
- "routeIdentifier=" + routeIdentifier +
+ return "ContainsRoute{" +
+ "routeIdentifiers=" + routeIdentifiers +
'}';
}
}
- public static class AddOrUpdateRoute extends ContainsRoute{
+ public static class AddOrUpdateRoutes extends ContainsRoute {
- public AddOrUpdateRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
- super(routeIdentifier);
+ public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ super(routeIdentifiers);
}
}
- public static class RemoveRoute extends ContainsRoute {
+ public static class RemoveRoutes extends ContainsRoute {
- public RemoveRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
- super(routeIdentifier);
+ public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ super(routeIdentifiers);
}
}
- public static class SetLocalRouter{
+ public static class SetLocalRouter {
private final ActorRef router;
public SetLocalRouter(ActorRef router) {
+ Preconditions.checkArgument(router != null, "Router must not be null");
this.router = router;
}
- public ActorRef getRouter(){
+ public ActorRef getRouter() {
return this.router;
}
}
}
- public static class FindRouters extends ContainsRoute {
+ public static class FindRouters {
+ private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
+
public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
- super(routeIdentifier);
+ Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
+ this.routeIdentifier = routeIdentifier;
+ }
+
+ public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
+ return routeIdentifier;
+ }
+
+ @Override
+ public String toString() {
+ return "FindRouters{" +
+ "routeIdentifier=" + routeIdentifier +
+ '}';
}
}
final List<Pair<ActorRef, Long>> routerWithUpdateTime;
public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
+ Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
this.routerWithUpdateTime = routerWithUpdateTime;
}
- public List<Pair<ActorRef, Long>> getRouterWithUpdateTime(){
+ public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
return routerWithUpdateTime;
}