2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.japi.Option;
12 import akka.japi.Pair;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayList;
15 import java.util.List;
16 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
17 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
18 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
19 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
20 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
22 import org.opendaylight.controller.sal.connector.api.RpcRouter;
23 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
26 * Registry to look up cluster nodes that have registered for a given rpc.
28 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
29 * cluster wide information.
31 public class RpcRegistry extends BucketStore<RoutingTable> {
33 public RpcRegistry() {
34 getLocalBucket().setData(new RoutingTable());
38 protected void handleReceive(Object message) throws Exception {
39 //TODO: if sender is remote, reject message
41 if (message instanceof SetLocalRouter) {
42 receiveSetLocalRouter((SetLocalRouter) message);
43 } else if (message instanceof AddOrUpdateRoutes) {
44 receiveAddRoutes((AddOrUpdateRoutes) message);
45 } else if (message instanceof RemoveRoutes) {
46 receiveRemoveRoutes((RemoveRoutes) message);
47 } else if (message instanceof Messages.FindRouters) {
48 receiveGetRouter((FindRouters) message);
50 super.handleReceive(message);
55 * Register's rpc broker
57 * @param message contains {@link akka.actor.ActorRef} for rpc broker
59 private void receiveSetLocalRouter(SetLocalRouter message) {
60 getLocalBucket().getData().setRouter(message.getRouter());
66 private void receiveAddRoutes(AddOrUpdateRoutes msg) {
68 log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
70 RoutingTable table = getLocalBucket().getData().copy();
71 for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
72 table.addRoute(routeId);
75 updateLocalBucket(table);
79 * @param msg contains list of route ids to remove
81 private void receiveRemoveRoutes(RemoveRoutes msg) {
83 RoutingTable table = getLocalBucket().getData().copy();
84 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
85 table.removeRoute(routeId);
88 updateLocalBucket(table);
92 * Finds routers for the given rpc.
96 private void receiveGetRouter(FindRouters msg) {
97 List<Pair<ActorRef, Long>> routers = new ArrayList<>();
99 RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
100 findRoutes(getLocalBucket().getData(), routeId, routers);
102 for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
103 findRoutes(bucket.getData(), routeId, routers);
106 getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
109 private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
110 List<Pair<ActorRef, Long>> routers) {
115 Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
116 if(!routerWithUpdateTime.isEmpty()) {
117 routers.add(routerWithUpdateTime.get());
122 * All messages used by the RpcRegistry
124 public static class Messages {
127 public static class ContainsRoute {
128 final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
130 public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
131 Preconditions.checkArgument(routeIdentifiers != null &&
132 !routeIdentifiers.isEmpty(),
133 "Route Identifiers must be supplied");
134 this.routeIdentifiers = routeIdentifiers;
137 public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
138 return this.routeIdentifiers;
142 public String toString() {
143 return "ContainsRoute{" +
144 "routeIdentifiers=" + routeIdentifiers +
149 public static class AddOrUpdateRoutes extends ContainsRoute {
151 public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
152 super(routeIdentifiers);
156 public static class RemoveRoutes extends ContainsRoute {
158 public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
159 super(routeIdentifiers);
163 public static class SetLocalRouter {
164 private final ActorRef router;
166 public SetLocalRouter(ActorRef router) {
167 Preconditions.checkArgument(router != null, "Router must not be null");
168 this.router = router;
171 public ActorRef getRouter() {
176 public String toString() {
177 return "SetLocalRouter{" +
183 public static class FindRouters {
184 private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
186 public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
187 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
188 this.routeIdentifier = routeIdentifier;
191 public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
192 return routeIdentifier;
196 public String toString() {
197 return "FindRouters{" +
198 "routeIdentifier=" + routeIdentifier +
203 public static class FindRoutersReply {
204 final List<Pair<ActorRef, Long>> routerWithUpdateTime;
206 public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
207 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
208 this.routerWithUpdateTime = routerWithUpdateTime;
211 public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
212 return routerWithUpdateTime;
216 public String toString() {
217 return "FindRoutersReply{" +
218 "routerWithUpdateTime=" + routerWithUpdateTime +