BUG 4151 : Create a shared actor system
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.japi.Creator;
13 import akka.japi.Option;
14 import akka.japi.Pair;
15 import com.google.common.base.Preconditions;
16 import java.util.ArrayList;
17 import java.util.List;
18 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
19 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
21 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
22 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
25 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
26 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
27 import org.opendaylight.controller.sal.connector.api.RpcRouter;
28 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
29
30 /**
31  * Registry to look up cluster nodes that have registered for a given rpc.
32  * <p/>
33  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
34  * cluster wide information.
35  */
36 public class RpcRegistry extends BucketStore<RoutingTable> {
37
38     public RpcRegistry(RemoteRpcProviderConfig config) {
39         super(config);
40         getLocalBucket().setData(new RoutingTable());
41     }
42
43     public static Props props(RemoteRpcProviderConfig config) {
44         return Props.create(new RpcRegistryCreator(config));
45     }
46
47     @Override
48     protected void handleReceive(Object message) throws Exception {
49         //TODO: if sender is remote, reject message
50
51         if (message instanceof SetLocalRouter) {
52             receiveSetLocalRouter((SetLocalRouter) message);
53         } else if (message instanceof AddOrUpdateRoutes) {
54             receiveAddRoutes((AddOrUpdateRoutes) message);
55         } else if (message instanceof RemoveRoutes) {
56             receiveRemoveRoutes((RemoveRoutes) message);
57         } else if (message instanceof Messages.FindRouters) {
58             receiveGetRouter((FindRouters) message);
59         } else {
60             super.handleReceive(message);
61         }
62     }
63
64     /**
65      * Register's rpc broker
66      *
67      * @param message contains {@link akka.actor.ActorRef} for rpc broker
68      */
69     private void receiveSetLocalRouter(SetLocalRouter message) {
70         getLocalBucket().getData().setRouter(message.getRouter());
71     }
72
73     /**
74      * @param msg
75      */
76     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
77
78         log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
79
80         RoutingTable table = getLocalBucket().getData().copy();
81         for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
82             table.addRoute(routeId);
83         }
84
85         updateLocalBucket(table);
86     }
87
88     /**
89      * @param msg contains list of route ids to remove
90      */
91     private void receiveRemoveRoutes(RemoveRoutes msg) {
92
93         RoutingTable table = getLocalBucket().getData().copy();
94         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
95             table.removeRoute(routeId);
96         }
97
98         updateLocalBucket(table);
99     }
100
101     /**
102      * Finds routers for the given rpc.
103      *
104      * @param msg
105      */
106     private void receiveGetRouter(FindRouters msg) {
107         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
108
109         RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
110         findRoutes(getLocalBucket().getData(), routeId, routers);
111
112         for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
113             findRoutes(bucket.getData(), routeId, routers);
114         }
115
116         getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
117     }
118
119     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
120             List<Pair<ActorRef, Long>> routers) {
121         if (table == null) {
122             return;
123         }
124
125         Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
126         if(!routerWithUpdateTime.isEmpty()) {
127             routers.add(routerWithUpdateTime.get());
128         }
129     }
130
131     /**
132      * All messages used by the RpcRegistry
133      */
134     public static class Messages {
135
136
137         public static class ContainsRoute {
138             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
139
140             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
141                 Preconditions.checkArgument(routeIdentifiers != null &&
142                                             !routeIdentifiers.isEmpty(),
143                                             "Route Identifiers must be supplied");
144                 this.routeIdentifiers = routeIdentifiers;
145             }
146
147             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
148                 return this.routeIdentifiers;
149             }
150
151             @Override
152             public String toString() {
153                 return "ContainsRoute{" +
154                         "routeIdentifiers=" + routeIdentifiers +
155                         '}';
156             }
157         }
158
159         public static class AddOrUpdateRoutes extends ContainsRoute {
160
161             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
162                 super(routeIdentifiers);
163             }
164         }
165
166         public static class RemoveRoutes extends ContainsRoute {
167
168             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
169                 super(routeIdentifiers);
170             }
171         }
172
173         public static class SetLocalRouter {
174             private final ActorRef router;
175
176             public SetLocalRouter(ActorRef router) {
177                 Preconditions.checkArgument(router != null, "Router must not be null");
178                 this.router = router;
179             }
180
181             public ActorRef getRouter() {
182                 return this.router;
183             }
184
185             @Override
186             public String toString() {
187                 return "SetLocalRouter{" +
188                         "router=" + router +
189                         '}';
190             }
191         }
192
193         public static class FindRouters {
194             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
195
196             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
197                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
198                 this.routeIdentifier = routeIdentifier;
199             }
200
201             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
202                 return routeIdentifier;
203             }
204
205             @Override
206             public String toString() {
207                 return "FindRouters{" +
208                         "routeIdentifier=" + routeIdentifier +
209                         '}';
210             }
211         }
212
213         public static class FindRoutersReply {
214             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
215
216             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
217                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
218                 this.routerWithUpdateTime = routerWithUpdateTime;
219             }
220
221             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
222                 return routerWithUpdateTime;
223             }
224
225             @Override
226             public String toString() {
227                 return "FindRoutersReply{" +
228                         "routerWithUpdateTime=" + routerWithUpdateTime +
229                         '}';
230             }
231         }
232     }
233
234     private static class RpcRegistryCreator implements Creator<RpcRegistry> {
235         private static final long serialVersionUID = 1L;
236         private final RemoteRpcProviderConfig config;
237
238         private RpcRegistryCreator(RemoteRpcProviderConfig config) {
239             this.config = config;
240         }
241
242         @Override
243         public RpcRegistry create() throws Exception {
244             RpcRegistry registry =  new RpcRegistry(config);
245             RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);
246             return registry;
247         }
248     }
249 }