1545eb00d2cc9af4bda80881d197bad132f0e667
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.ImmutableSet;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Optional;
23 import java.util.Set;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
25 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
26 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
32 import org.opendaylight.controller.sal.connector.api.RpcRouter;
33 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
34 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
35
36 /**
37  * Registry to look up cluster nodes that have registered for a given RPC.
38  *
39  * <p>
40  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
41  * cluster wide information.
42  */
43 public class RpcRegistry extends BucketStore<RoutingTable> {
44     private final ActorRef rpcRegistrar;
45
46     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
47         super(config, new RoutingTable(rpcInvoker));
48         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
49     }
50
51     /**
52      * Create a new props instance for instantiating an RpcRegistry actor.
53      *
54      * @param config Provider configuration
55      * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
56      * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
57      * @return A new {@link Props} instance
58      */
59     public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
60             final ActorRef rpcRegistrar) {
61         return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
62     }
63
64     @Override
65     protected void handleReceive(final Object message) throws Exception {
66         if (message instanceof AddOrUpdateRoutes) {
67             receiveAddRoutes((AddOrUpdateRoutes) message);
68         } else if (message instanceof RemoveRoutes) {
69             receiveRemoveRoutes((RemoveRoutes) message);
70         } else {
71             super.handleReceive(message);
72         }
73     }
74
75     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
76         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
77
78         RoutingTable table = getLocalBucket().getData().copy();
79         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
80             table.addRoute(routeId);
81         }
82
83         updateLocalBucket(table);
84     }
85
86     /**
87      * Processes a RemoveRoutes message.
88      *
89      * @param msg contains list of route ids to remove
90      */
91     private void receiveRemoveRoutes(final RemoveRoutes msg) {
92         RoutingTable table = getLocalBucket().getData().copy();
93         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
94             table.removeRoute(routeId);
95         }
96
97         updateLocalBucket(table);
98     }
99
100     @Override
101     protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
102         rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
103     }
104
105     @Override
106     protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
107         final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
108
109         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
110             final RoutingTable table = e.getValue().getData();
111
112             final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
113             for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
114                 if (ri instanceof RouteIdentifierImpl) {
115                     final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
116                     rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
117                 } else {
118                     LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
119                 }
120             }
121
122             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
123                     : Optional.of(new RemoteRpcEndpoint(table.getRouter(), rpcs)));
124         }
125
126         if (!endpoints.isEmpty()) {
127             rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
128         }
129     }
130
131     public static final class RemoteRpcEndpoint {
132         private final Set<DOMRpcIdentifier> rpcs;
133         private final ActorRef router;
134
135         RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
136             this.router = Preconditions.checkNotNull(router);
137             this.rpcs = ImmutableSet.copyOf(rpcs);
138         }
139
140         public ActorRef getRouter() {
141             return router;
142         }
143
144         public Set<DOMRpcIdentifier> getRpcs() {
145             return rpcs;
146         }
147     }
148
149     /**
150      * All messages used by the RpcRegistry.
151      */
152     public static class Messages {
153         abstract static class AbstractRouteMessage {
154             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
155
156             AbstractRouteMessage(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
157                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
158                         "Route Identifiers must be supplied");
159                 this.routeIdentifiers = routeIdentifiers;
160             }
161
162             List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
163                 return this.routeIdentifiers;
164             }
165
166             @Override
167             public String toString() {
168                 return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
169             }
170         }
171
172         public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
173             public AddOrUpdateRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
174                 super(routeIdentifiers);
175             }
176         }
177
178         public static final class RemoveRoutes extends AbstractRouteMessage {
179             public RemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
180                 super(routeIdentifiers);
181             }
182         }
183
184         public static final class UpdateRemoteEndpoints {
185             private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
186
187             UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
188                 this.endpoints = ImmutableMap.copyOf(endpoints);
189             }
190
191             public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
192                 return endpoints;
193             }
194         }
195     }
196 }