2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.japi.Creator;
13 import akka.japi.Option;
14 import akka.japi.Pair;
15 import com.google.common.base.Preconditions;
16 import java.util.ArrayList;
17 import java.util.List;
18 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
19 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
21 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
22 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
25 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
26 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
27 import org.opendaylight.controller.sal.connector.api.RpcRouter;
28 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
31 * Registry to look up cluster nodes that have registered for a given rpc.
33 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
34 * cluster wide information.
36 public class RpcRegistry extends BucketStore<RoutingTable> {
38 public RpcRegistry(RemoteRpcProviderConfig config) {
40 getLocalBucket().setData(new RoutingTable());
43 public static Props props(RemoteRpcProviderConfig config) {
44 return Props.create(new RpcRegistryCreator(config));
48 protected void handleReceive(Object message) throws Exception {
49 //TODO: if sender is remote, reject message
51 if (message instanceof SetLocalRouter) {
52 receiveSetLocalRouter((SetLocalRouter) message);
53 } else if (message instanceof AddOrUpdateRoutes) {
54 receiveAddRoutes((AddOrUpdateRoutes) message);
55 } else if (message instanceof RemoveRoutes) {
56 receiveRemoveRoutes((RemoveRoutes) message);
57 } else if (message instanceof Messages.FindRouters) {
58 receiveGetRouter((FindRouters) message);
60 super.handleReceive(message);
65 * Register's rpc broker
67 * @param message contains {@link akka.actor.ActorRef} for rpc broker
69 private void receiveSetLocalRouter(SetLocalRouter message) {
70 getLocalBucket().getData().setRouter(message.getRouter());
76 private void receiveAddRoutes(AddOrUpdateRoutes msg) {
78 log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
80 RoutingTable table = getLocalBucket().getData().copy();
81 for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
82 table.addRoute(routeId);
85 updateLocalBucket(table);
89 * @param msg contains list of route ids to remove
91 private void receiveRemoveRoutes(RemoveRoutes msg) {
93 RoutingTable table = getLocalBucket().getData().copy();
94 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
95 table.removeRoute(routeId);
98 updateLocalBucket(table);
102 * Finds routers for the given rpc.
106 private void receiveGetRouter(FindRouters msg) {
107 List<Pair<ActorRef, Long>> routers = new ArrayList<>();
109 RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
110 findRoutes(getLocalBucket().getData(), routeId, routers);
112 for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
113 findRoutes(bucket.getData(), routeId, routers);
116 getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
119 private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
120 List<Pair<ActorRef, Long>> routers) {
125 Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
126 if(!routerWithUpdateTime.isEmpty()) {
127 routers.add(routerWithUpdateTime.get());
132 * All messages used by the RpcRegistry
134 public static class Messages {
137 public static class ContainsRoute {
138 final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
140 public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
141 Preconditions.checkArgument(routeIdentifiers != null &&
142 !routeIdentifiers.isEmpty(),
143 "Route Identifiers must be supplied");
144 this.routeIdentifiers = routeIdentifiers;
147 public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
148 return this.routeIdentifiers;
152 public String toString() {
153 return "ContainsRoute{" +
154 "routeIdentifiers=" + routeIdentifiers +
159 public static class AddOrUpdateRoutes extends ContainsRoute {
161 public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
162 super(routeIdentifiers);
166 public static class RemoveRoutes extends ContainsRoute {
168 public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
169 super(routeIdentifiers);
173 public static class SetLocalRouter {
174 private final ActorRef router;
176 public SetLocalRouter(ActorRef router) {
177 Preconditions.checkArgument(router != null, "Router must not be null");
178 this.router = router;
181 public ActorRef getRouter() {
186 public String toString() {
187 return "SetLocalRouter{" +
193 public static class FindRouters {
194 private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
196 public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
197 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
198 this.routeIdentifier = routeIdentifier;
201 public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
202 return routeIdentifier;
206 public String toString() {
207 return "FindRouters{" +
208 "routeIdentifier=" + routeIdentifier +
213 public static class FindRoutersReply {
214 final List<Pair<ActorRef, Long>> routerWithUpdateTime;
216 public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
217 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
218 this.routerWithUpdateTime = routerWithUpdateTime;
221 public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
222 return routerWithUpdateTime;
226 public String toString() {
227 return "FindRoutersReply{" +
228 "routerWithUpdateTime=" + routerWithUpdateTime +
234 private static class RpcRegistryCreator implements Creator<RpcRegistry> {
235 private static final long serialVersionUID = 1L;
236 private final RemoteRpcProviderConfig config;
238 private RpcRegistryCreator(RemoteRpcProviderConfig config) {
239 this.config = config;
243 public RpcRegistry create() throws Exception {
244 RpcRegistry registry = new RpcRegistry(config);
245 RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);