2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.japi.Creator;
13 import akka.japi.Option;
14 import akka.japi.Pair;
15 import com.google.common.base.Preconditions;
16 import java.util.ArrayList;
17 import java.util.List;
18 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
19 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
21 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
24 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
25 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
26 import org.opendaylight.controller.sal.connector.api.RpcRouter;
27 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
30 * Registry to look up cluster nodes that have registered for a given rpc.
32 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
33 * cluster wide information.
35 public class RpcRegistry extends BucketStore<RoutingTable> {
37 public RpcRegistry() {
38 getLocalBucket().setData(new RoutingTable());
41 public static Props props() {
42 return Props.create(new RpcRegistryCreator());
46 protected void handleReceive(Object message) throws Exception {
47 //TODO: if sender is remote, reject message
49 if (message instanceof SetLocalRouter) {
50 receiveSetLocalRouter((SetLocalRouter) message);
51 } else if (message instanceof AddOrUpdateRoutes) {
52 receiveAddRoutes((AddOrUpdateRoutes) message);
53 } else if (message instanceof RemoveRoutes) {
54 receiveRemoveRoutes((RemoveRoutes) message);
55 } else if (message instanceof Messages.FindRouters) {
56 receiveGetRouter((FindRouters) message);
58 super.handleReceive(message);
63 * Register's rpc broker
65 * @param message contains {@link akka.actor.ActorRef} for rpc broker
67 private void receiveSetLocalRouter(SetLocalRouter message) {
68 getLocalBucket().getData().setRouter(message.getRouter());
74 private void receiveAddRoutes(AddOrUpdateRoutes msg) {
76 log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
78 RoutingTable table = getLocalBucket().getData().copy();
79 for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
80 table.addRoute(routeId);
83 updateLocalBucket(table);
87 * @param msg contains list of route ids to remove
89 private void receiveRemoveRoutes(RemoveRoutes msg) {
91 RoutingTable table = getLocalBucket().getData().copy();
92 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
93 table.removeRoute(routeId);
96 updateLocalBucket(table);
100 * Finds routers for the given rpc.
104 private void receiveGetRouter(FindRouters msg) {
105 List<Pair<ActorRef, Long>> routers = new ArrayList<>();
107 RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
108 findRoutes(getLocalBucket().getData(), routeId, routers);
110 for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
111 findRoutes(bucket.getData(), routeId, routers);
114 getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
117 private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
118 List<Pair<ActorRef, Long>> routers) {
123 Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
124 if(!routerWithUpdateTime.isEmpty()) {
125 routers.add(routerWithUpdateTime.get());
130 * All messages used by the RpcRegistry
132 public static class Messages {
135 public static class ContainsRoute {
136 final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
138 public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
139 Preconditions.checkArgument(routeIdentifiers != null &&
140 !routeIdentifiers.isEmpty(),
141 "Route Identifiers must be supplied");
142 this.routeIdentifiers = routeIdentifiers;
145 public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
146 return this.routeIdentifiers;
150 public String toString() {
151 return "ContainsRoute{" +
152 "routeIdentifiers=" + routeIdentifiers +
157 public static class AddOrUpdateRoutes extends ContainsRoute {
159 public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
160 super(routeIdentifiers);
164 public static class RemoveRoutes extends ContainsRoute {
166 public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
167 super(routeIdentifiers);
171 public static class SetLocalRouter {
172 private final ActorRef router;
174 public SetLocalRouter(ActorRef router) {
175 Preconditions.checkArgument(router != null, "Router must not be null");
176 this.router = router;
179 public ActorRef getRouter() {
184 public String toString() {
185 return "SetLocalRouter{" +
191 public static class FindRouters {
192 private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
194 public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
195 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
196 this.routeIdentifier = routeIdentifier;
199 public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
200 return routeIdentifier;
204 public String toString() {
205 return "FindRouters{" +
206 "routeIdentifier=" + routeIdentifier +
211 public static class FindRoutersReply {
212 final List<Pair<ActorRef, Long>> routerWithUpdateTime;
214 public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
215 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
216 this.routerWithUpdateTime = routerWithUpdateTime;
219 public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
220 return routerWithUpdateTime;
224 public String toString() {
225 return "FindRoutersReply{" +
226 "routerWithUpdateTime=" + routerWithUpdateTime +
232 private static class RpcRegistryCreator implements Creator<RpcRegistry> {
233 private static final long serialVersionUID = 1L;
236 public RpcRegistry create() throws Exception {
237 RpcRegistry registry = new RpcRegistry();
238 RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);