Teach sal-remoterpc-connector to route actions
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.ImmutableSet;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Optional;
24 import java.util.Set;
25 import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
31 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
32 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
33
34 /**
35  * Registry to look up cluster nodes that have registered for a given RPC.
36  *
37  * <p>
38  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
39  * cluster wide information.
40  */
41 public class RpcRegistry extends BucketStoreActor<RoutingTable> {
42     private final ActorRef rpcRegistrar;
43     private final RemoteRpcRegistryMXBeanImpl mxBean;
44
45     public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
46         super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
47         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
48         this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
49                 config.getAskDuration()), config.getAskDuration());
50     }
51
52     /**
53      * Create a new props instance for instantiating an RpcRegistry actor.
54      *
55      * @param config Provider configuration
56      * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
57      * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
58      * @return A new {@link Props} instance
59      */
60     public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
61                               final ActorRef rpcRegistrar) {
62         return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
63     }
64
65     @Override
66     public void postStop() {
67         super.postStop();
68         this.mxBean.unregister();
69     }
70
71     @Override
72     protected void handleCommand(final Object message) throws Exception {
73         if (message instanceof AddOrUpdateRoutes) {
74             receiveAddRoutes((AddOrUpdateRoutes) message);
75         } else if (message instanceof RemoveRoutes) {
76             receiveRemoveRoutes((RemoveRoutes) message);
77         } else {
78             super.handleCommand(message);
79         }
80     }
81
82     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
83         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
84         updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
85     }
86
87     /**
88      * Processes a RemoveRoutes message.
89      *
90      * @param msg contains list of route ids to remove
91      */
92     private void receiveRemoveRoutes(final RemoveRoutes msg) {
93         LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
94         updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
95     }
96
97     @Override
98     protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
99         rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
100                 ActorRef.noSender());
101     }
102
103     @Override
104     protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
105         final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
106
107         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
108             final RoutingTable table = e.getValue().getData();
109
110             final Collection<DOMRpcIdentifier> rpcs = table.getItems();
111             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
112                     : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
113         }
114
115         if (!endpoints.isEmpty()) {
116             rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
117         }
118     }
119
120     public static final class RemoteRpcEndpoint {
121         private final Set<DOMRpcIdentifier> rpcs;
122         private final ActorRef router;
123
124         @VisibleForTesting
125         public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
126             this.router = Preconditions.checkNotNull(router);
127             this.rpcs = ImmutableSet.copyOf(rpcs);
128         }
129
130         public ActorRef getRouter() {
131             return router;
132         }
133
134         public Set<DOMRpcIdentifier> getRpcs() {
135             return rpcs;
136         }
137     }
138
139     /**
140      * All messages used by the RpcRegistry.
141      */
142     public static class Messages {
143         abstract static class AbstractRouteMessage {
144             final List<DOMRpcIdentifier> rpcRouteIdentifiers;
145
146             AbstractRouteMessage(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
147                 Preconditions.checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
148                         "Route Identifiers must be supplied");
149                 this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
150             }
151
152             List<DOMRpcIdentifier> getRouteIdentifiers() {
153                 return this.rpcRouteIdentifiers;
154             }
155
156             @Override
157             public String toString() {
158                 return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
159             }
160         }
161
162         public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
163             public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
164                 super(rpcRouteIdentifiers);
165             }
166
167         }
168
169         public static final class RemoveRoutes extends AbstractRouteMessage {
170             public RemoveRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
171                 super(rpcRouteIdentifiers);
172             }
173         }
174
175         public static final class UpdateRemoteEndpoints {
176             private final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints;
177
178
179             @VisibleForTesting
180             public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
181                 this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
182             }
183
184             public Map<Address, Optional<RemoteRpcEndpoint>> getRpcEndpoints() {
185                 return rpcEndpoints;
186             }
187         }
188     }
189 }