BUG-7556: update version tracking
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.ImmutableSet;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Optional;
23 import java.util.Set;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
25 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
26 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
32 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
33 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
34
35 /**
36  * Registry to look up cluster nodes that have registered for a given RPC.
37  *
38  * <p>
39  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
40  * cluster wide information.
41  */
42 public class RpcRegistry extends BucketStore<RoutingTable> {
43     private final ActorRef rpcRegistrar;
44
45     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
46         super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker));
47         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
48     }
49
50     /**
51      * Create a new props instance for instantiating an RpcRegistry actor.
52      *
53      * @param config Provider configuration
54      * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
55      * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
56      * @return A new {@link Props} instance
57      */
58     public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
59             final ActorRef rpcRegistrar) {
60         return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
61     }
62
63     @Override
64     protected void handleCommand(final Object message) throws Exception {
65         if (message instanceof AddOrUpdateRoutes) {
66             receiveAddRoutes((AddOrUpdateRoutes) message);
67         } else if (message instanceof RemoveRoutes) {
68             receiveRemoveRoutes((RemoveRoutes) message);
69         } else {
70             super.handleCommand(message);
71         }
72     }
73
74     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
75         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
76         updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
77     }
78
79     /**
80      * Processes a RemoveRoutes message.
81      *
82      * @param msg contains list of route ids to remove
83      */
84     private void receiveRemoveRoutes(final RemoveRoutes msg) {
85         LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
86         updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
87     }
88
89     @Override
90     protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
91         rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
92     }
93
94     @Override
95     protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
96         final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
97
98         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
99             final RoutingTable table = e.getValue().getData();
100
101             final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
102             for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
103                 if (ri instanceof RouteIdentifierImpl) {
104                     final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
105                     rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
106                 } else {
107                     LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
108                 }
109             }
110
111             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
112                     : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
113         }
114
115         if (!endpoints.isEmpty()) {
116             rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
117         }
118     }
119
120     public static final class RemoteRpcEndpoint {
121         private final Set<DOMRpcIdentifier> rpcs;
122         private final ActorRef router;
123
124         RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
125             this.router = Preconditions.checkNotNull(router);
126             this.rpcs = ImmutableSet.copyOf(rpcs);
127         }
128
129         public ActorRef getRouter() {
130             return router;
131         }
132
133         public Set<DOMRpcIdentifier> getRpcs() {
134             return rpcs;
135         }
136     }
137
138     /**
139      * All messages used by the RpcRegistry.
140      */
141     public static class Messages {
142         abstract static class AbstractRouteMessage {
143             final List<RouteIdentifier<?, ?, ?>> routeIdentifiers;
144
145             AbstractRouteMessage(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
146                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
147                         "Route Identifiers must be supplied");
148                 this.routeIdentifiers = routeIdentifiers;
149             }
150
151             List<RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
152                 return this.routeIdentifiers;
153             }
154
155             @Override
156             public String toString() {
157                 return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
158             }
159         }
160
161         public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
162             public AddOrUpdateRoutes(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
163                 super(routeIdentifiers);
164             }
165         }
166
167         public static final class RemoveRoutes extends AbstractRouteMessage {
168             public RemoveRoutes(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
169                 super(routeIdentifiers);
170             }
171         }
172
173         public static final class UpdateRemoteEndpoints {
174             private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
175
176             UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
177                 this.endpoints = ImmutableMap.copyOf(endpoints);
178             }
179
180             public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
181                 return endpoints;
182             }
183         }
184     }
185 }