Merge "BUG 2954 : Thread local DocumentBuilder's for XmlUtil"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.japi.Creator;
13 import akka.japi.Option;
14 import akka.japi.Pair;
15 import com.google.common.base.Preconditions;
16 import java.util.ArrayList;
17 import java.util.List;
18 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
19 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
21 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
24 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
25 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
26 import org.opendaylight.controller.sal.connector.api.RpcRouter;
27 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
28
29 /**
30  * Registry to look up cluster nodes that have registered for a given rpc.
31  * <p/>
32  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
33  * cluster wide information.
34  */
35 public class RpcRegistry extends BucketStore<RoutingTable> {
36
37     public RpcRegistry() {
38         getLocalBucket().setData(new RoutingTable());
39     }
40
41     public static Props props() {
42         return Props.create(new RpcRegistryCreator());
43     }
44
45     @Override
46     protected void handleReceive(Object message) throws Exception {
47         //TODO: if sender is remote, reject message
48
49         if (message instanceof SetLocalRouter) {
50             receiveSetLocalRouter((SetLocalRouter) message);
51         } else if (message instanceof AddOrUpdateRoutes) {
52             receiveAddRoutes((AddOrUpdateRoutes) message);
53         } else if (message instanceof RemoveRoutes) {
54             receiveRemoveRoutes((RemoveRoutes) message);
55         } else if (message instanceof Messages.FindRouters) {
56             receiveGetRouter((FindRouters) message);
57         } else {
58             super.handleReceive(message);
59         }
60     }
61
62     /**
63      * Register's rpc broker
64      *
65      * @param message contains {@link akka.actor.ActorRef} for rpc broker
66      */
67     private void receiveSetLocalRouter(SetLocalRouter message) {
68         getLocalBucket().getData().setRouter(message.getRouter());
69     }
70
71     /**
72      * @param msg
73      */
74     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
75
76         log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
77
78         RoutingTable table = getLocalBucket().getData().copy();
79         for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
80             table.addRoute(routeId);
81         }
82
83         updateLocalBucket(table);
84     }
85
86     /**
87      * @param msg contains list of route ids to remove
88      */
89     private void receiveRemoveRoutes(RemoveRoutes msg) {
90
91         RoutingTable table = getLocalBucket().getData().copy();
92         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
93             table.removeRoute(routeId);
94         }
95
96         updateLocalBucket(table);
97     }
98
99     /**
100      * Finds routers for the given rpc.
101      *
102      * @param msg
103      */
104     private void receiveGetRouter(FindRouters msg) {
105         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
106
107         RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
108         findRoutes(getLocalBucket().getData(), routeId, routers);
109
110         for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
111             findRoutes(bucket.getData(), routeId, routers);
112         }
113
114         getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
115     }
116
117     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
118             List<Pair<ActorRef, Long>> routers) {
119         if (table == null) {
120             return;
121         }
122
123         Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
124         if(!routerWithUpdateTime.isEmpty()) {
125             routers.add(routerWithUpdateTime.get());
126         }
127     }
128
129     /**
130      * All messages used by the RpcRegistry
131      */
132     public static class Messages {
133
134
135         public static class ContainsRoute {
136             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
137
138             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
139                 Preconditions.checkArgument(routeIdentifiers != null &&
140                                             !routeIdentifiers.isEmpty(),
141                                             "Route Identifiers must be supplied");
142                 this.routeIdentifiers = routeIdentifiers;
143             }
144
145             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
146                 return this.routeIdentifiers;
147             }
148
149             @Override
150             public String toString() {
151                 return "ContainsRoute{" +
152                         "routeIdentifiers=" + routeIdentifiers +
153                         '}';
154             }
155         }
156
157         public static class AddOrUpdateRoutes extends ContainsRoute {
158
159             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
160                 super(routeIdentifiers);
161             }
162         }
163
164         public static class RemoveRoutes extends ContainsRoute {
165
166             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
167                 super(routeIdentifiers);
168             }
169         }
170
171         public static class SetLocalRouter {
172             private final ActorRef router;
173
174             public SetLocalRouter(ActorRef router) {
175                 Preconditions.checkArgument(router != null, "Router must not be null");
176                 this.router = router;
177             }
178
179             public ActorRef getRouter() {
180                 return this.router;
181             }
182
183             @Override
184             public String toString() {
185                 return "SetLocalRouter{" +
186                         "router=" + router +
187                         '}';
188             }
189         }
190
191         public static class FindRouters {
192             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
193
194             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
195                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
196                 this.routeIdentifier = routeIdentifier;
197             }
198
199             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
200                 return routeIdentifier;
201             }
202
203             @Override
204             public String toString() {
205                 return "FindRouters{" +
206                         "routeIdentifier=" + routeIdentifier +
207                         '}';
208             }
209         }
210
211         public static class FindRoutersReply {
212             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
213
214             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
215                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
216                 this.routerWithUpdateTime = routerWithUpdateTime;
217             }
218
219             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
220                 return routerWithUpdateTime;
221             }
222
223             @Override
224             public String toString() {
225                 return "FindRoutersReply{" +
226                         "routerWithUpdateTime=" + routerWithUpdateTime +
227                         '}';
228             }
229         }
230     }
231
232     private static class RpcRegistryCreator implements Creator<RpcRegistry> {
233         private static final long serialVersionUID = 1L;
234
235         @Override
236         public RpcRegistry create() throws Exception {
237             RpcRegistry registry =  new RpcRegistry();
238             RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);
239             return registry;
240         }
241     }
242 }