805e47a0dd05e7b52082f705f523b8870cd5bb02
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.ImmutableMap;
16 import com.google.common.collect.ImmutableSet;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Optional;
23 import java.util.Set;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
25 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
31
32 /**
33  * Registry to look up cluster nodes that have registered for a given RPC.
34  *
35  * <p>
36  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
37  * cluster wide information.
38  */
39 public class RpcRegistry extends BucketStoreActor<RoutingTable> {
40     private final ActorRef rpcRegistrar;
41
42     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
43         super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
44         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
45     }
46
47     /**
48      * Create a new props instance for instantiating an RpcRegistry actor.
49      *
50      * @param config Provider configuration
51      * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
52      * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
53      * @return A new {@link Props} instance
54      */
55     public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
56             final ActorRef rpcRegistrar) {
57         return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
58     }
59
60     @Override
61     protected void handleCommand(final Object message) throws Exception {
62         if (message instanceof AddOrUpdateRoutes) {
63             receiveAddRoutes((AddOrUpdateRoutes) message);
64         } else if (message instanceof RemoveRoutes) {
65             receiveRemoveRoutes((RemoveRoutes) message);
66         } else {
67             super.handleCommand(message);
68         }
69     }
70
71     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
72         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
73         updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
74     }
75
76     /**
77      * Processes a RemoveRoutes message.
78      *
79      * @param msg contains list of route ids to remove
80      */
81     private void receiveRemoveRoutes(final RemoveRoutes msg) {
82         LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
83         updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
84     }
85
86     @Override
87     protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
88         rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
89     }
90
91     @Override
92     protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
93         final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
94
95         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
96             final RoutingTable table = e.getValue().getData();
97
98             final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
99             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
100                     : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
101         }
102
103         if (!endpoints.isEmpty()) {
104             rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
105         }
106     }
107
108     public static final class RemoteRpcEndpoint {
109         private final Set<DOMRpcIdentifier> rpcs;
110         private final ActorRef router;
111
112         RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
113             this.router = Preconditions.checkNotNull(router);
114             this.rpcs = ImmutableSet.copyOf(rpcs);
115         }
116
117         public ActorRef getRouter() {
118             return router;
119         }
120
121         public Set<DOMRpcIdentifier> getRpcs() {
122             return rpcs;
123         }
124     }
125
126     /**
127      * All messages used by the RpcRegistry.
128      */
129     public static class Messages {
130         abstract static class AbstractRouteMessage {
131             final List<DOMRpcIdentifier> routeIdentifiers;
132
133             AbstractRouteMessage(final Collection<DOMRpcIdentifier> routeIdentifiers) {
134                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
135                         "Route Identifiers must be supplied");
136                 this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers);
137             }
138
139             List<DOMRpcIdentifier> getRouteIdentifiers() {
140                 return this.routeIdentifiers;
141             }
142
143             @Override
144             public String toString() {
145                 return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
146             }
147         }
148
149         public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
150             public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
151                 super(routeIdentifiers);
152             }
153         }
154
155         public static final class RemoveRoutes extends AbstractRouteMessage {
156             public RemoveRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
157                 super(routeIdentifiers);
158             }
159         }
160
161         public static final class UpdateRemoteEndpoints {
162             private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
163
164             UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
165                 this.endpoints = ImmutableMap.copyOf(endpoints);
166             }
167
168             public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
169                 return endpoints;
170             }
171         }
172     }
173 }