Bug 2526: Race condition may cause missing routes in RPC BucketStore
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.event.Logging;
12 import akka.event.LoggingAdapter;
13 import akka.japi.Option;
14 import akka.japi.Pair;
15 import com.google.common.base.Preconditions;
16 import java.util.ArrayList;
17 import java.util.List;
18 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
19 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
21 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
24 import org.opendaylight.controller.sal.connector.api.RpcRouter;
25 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
26
27 /**
28  * Registry to look up cluster nodes that have registered for a given rpc.
29  * <p/>
30  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
31  * cluster wide information.
32  */
33 public class RpcRegistry extends BucketStore<RoutingTable> {
34
35     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
36
37     public RpcRegistry() {
38         getLocalBucket().setData(new RoutingTable());
39     }
40
41     @Override
42     protected void handleReceive(Object message) throws Exception {
43         //TODO: if sender is remote, reject message
44
45         if (message instanceof SetLocalRouter) {
46             receiveSetLocalRouter((SetLocalRouter) message);
47         } else if (message instanceof AddOrUpdateRoutes) {
48             receiveAddRoutes((AddOrUpdateRoutes) message);
49         } else if (message instanceof RemoveRoutes) {
50             receiveRemoveRoutes((RemoveRoutes) message);
51         } else if (message instanceof Messages.FindRouters) {
52             receiveGetRouter((FindRouters) message);
53         } else {
54             super.handleReceive(message);
55         }
56     }
57
58     /**
59      * Register's rpc broker
60      *
61      * @param message contains {@link akka.actor.ActorRef} for rpc broker
62      */
63     private void receiveSetLocalRouter(SetLocalRouter message) {
64         getLocalBucket().getData().setRouter(message.getRouter());
65     }
66
67     /**
68      * @param msg
69      */
70     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
71
72         log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
73
74         RoutingTable table = getLocalBucket().getData().copy();
75         for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
76             table.addRoute(routeId);
77         }
78
79         updateLocalBucket(table);
80     }
81
82     /**
83      * @param msg contains list of route ids to remove
84      */
85     private void receiveRemoveRoutes(RemoveRoutes msg) {
86
87         RoutingTable table = getLocalBucket().getData().copy();
88         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
89             table.removeRoute(routeId);
90         }
91
92         updateLocalBucket(table);
93     }
94
95     /**
96      * Finds routers for the given rpc.
97      *
98      * @param msg
99      */
100     private void receiveGetRouter(FindRouters msg) {
101         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
102
103         RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
104         findRoutes(getLocalBucket().getData(), routeId, routers);
105
106         for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
107             findRoutes(bucket.getData(), routeId, routers);
108         }
109
110         getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
111     }
112
113     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
114             List<Pair<ActorRef, Long>> routers) {
115         if (table == null) {
116             return;
117         }
118
119         Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
120         if(!routerWithUpdateTime.isEmpty()) {
121             routers.add(routerWithUpdateTime.get());
122         }
123     }
124
125     /**
126      * All messages used by the RpcRegistry
127      */
128     public static class Messages {
129
130
131         public static class ContainsRoute {
132             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
133
134             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
135                 Preconditions.checkArgument(routeIdentifiers != null &&
136                                             !routeIdentifiers.isEmpty(),
137                                             "Route Identifiers must be supplied");
138                 this.routeIdentifiers = routeIdentifiers;
139             }
140
141             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
142                 return this.routeIdentifiers;
143             }
144
145             @Override
146             public String toString() {
147                 return "ContainsRoute{" +
148                         "routeIdentifiers=" + routeIdentifiers +
149                         '}';
150             }
151         }
152
153         public static class AddOrUpdateRoutes extends ContainsRoute {
154
155             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
156                 super(routeIdentifiers);
157             }
158         }
159
160         public static class RemoveRoutes extends ContainsRoute {
161
162             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
163                 super(routeIdentifiers);
164             }
165         }
166
167         public static class SetLocalRouter {
168             private final ActorRef router;
169
170             public SetLocalRouter(ActorRef router) {
171                 Preconditions.checkArgument(router != null, "Router must not be null");
172                 this.router = router;
173             }
174
175             public ActorRef getRouter() {
176                 return this.router;
177             }
178
179             @Override
180             public String toString() {
181                 return "SetLocalRouter{" +
182                         "router=" + router +
183                         '}';
184             }
185         }
186
187         public static class FindRouters {
188             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
189
190             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
191                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
192                 this.routeIdentifier = routeIdentifier;
193             }
194
195             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
196                 return routeIdentifier;
197             }
198
199             @Override
200             public String toString() {
201                 return "FindRouters{" +
202                         "routeIdentifier=" + routeIdentifier +
203                         '}';
204             }
205         }
206
207         public static class FindRoutersReply {
208             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
209
210             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
211                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
212                 this.routerWithUpdateTime = routerWithUpdateTime;
213             }
214
215             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
216                 return routerWithUpdateTime;
217             }
218
219             @Override
220             public String toString() {
221                 return "FindRoutersReply{" +
222                         "routerWithUpdateTime=" + routerWithUpdateTime +
223                         '}';
224             }
225         }
226     }
227 }