Bump odlparent to 5.0.2
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.collect.ImmutableList;
18 import com.google.common.collect.ImmutableMap;
19 import com.google.common.collect.ImmutableSet;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.Optional;
26 import java.util.Set;
27 import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
33 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
34 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
35
36 /**
37  * Registry to look up cluster nodes that have registered for a given RPC.
38  *
39  * <p>
40  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
41  * cluster wide information.
42  */
43 public class RpcRegistry extends BucketStoreActor<RoutingTable> {
44     private final ActorRef rpcRegistrar;
45     private final RemoteRpcRegistryMXBeanImpl mxBean;
46
47     public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
48         super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
49         this.rpcRegistrar = requireNonNull(rpcRegistrar);
50         this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
51                 config.getAskDuration()), config.getAskDuration());
52     }
53
54     /**
55      * Create a new props instance for instantiating an RpcRegistry actor.
56      *
57      * @param config Provider configuration
58      * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
59      * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
60      * @return A new {@link Props} instance
61      */
62     public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
63                               final ActorRef rpcRegistrar) {
64         return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
65     }
66
67     @Override
68     public void postStop() throws Exception {
69         super.postStop();
70         this.mxBean.unregister();
71     }
72
73     @Override
74     protected void handleCommand(final Object message) throws Exception {
75         if (message instanceof AddOrUpdateRoutes) {
76             receiveAddRoutes((AddOrUpdateRoutes) message);
77         } else if (message instanceof RemoveRoutes) {
78             receiveRemoveRoutes((RemoveRoutes) message);
79         } else {
80             super.handleCommand(message);
81         }
82     }
83
84     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
85         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
86         updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
87     }
88
89     /**
90      * Processes a RemoveRoutes message.
91      *
92      * @param msg contains list of route ids to remove
93      */
94     private void receiveRemoveRoutes(final RemoveRoutes msg) {
95         LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
96         updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
97     }
98
99     @Override
100     protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
101         rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
102                 ActorRef.noSender());
103     }
104
105     @Override
106     protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
107         final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
108
109         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
110             final RoutingTable table = e.getValue().getData();
111
112             final Collection<DOMRpcIdentifier> rpcs = table.getItems();
113             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
114                     : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
115         }
116
117         if (!endpoints.isEmpty()) {
118             rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
119         }
120     }
121
122     public static final class RemoteRpcEndpoint {
123         private final Set<DOMRpcIdentifier> rpcs;
124         private final ActorRef router;
125
126         @VisibleForTesting
127         public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
128             this.router = requireNonNull(router);
129             this.rpcs = ImmutableSet.copyOf(rpcs);
130         }
131
132         public ActorRef getRouter() {
133             return router;
134         }
135
136         public Set<DOMRpcIdentifier> getRpcs() {
137             return rpcs;
138         }
139     }
140
141     /**
142      * All messages used by the RpcRegistry.
143      */
144     public static class Messages {
145         abstract static class AbstractRouteMessage {
146             final List<DOMRpcIdentifier> rpcRouteIdentifiers;
147
148             AbstractRouteMessage(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
149                 checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
150                         "Route Identifiers must be supplied");
151                 this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
152             }
153
154             List<DOMRpcIdentifier> getRouteIdentifiers() {
155                 return this.rpcRouteIdentifiers;
156             }
157
158             @Override
159             public String toString() {
160                 return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
161             }
162         }
163
164         public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
165             public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
166                 super(rpcRouteIdentifiers);
167             }
168
169         }
170
171         public static final class RemoveRoutes extends AbstractRouteMessage {
172             public RemoveRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
173                 super(rpcRouteIdentifiers);
174             }
175         }
176
177         public static final class UpdateRemoteEndpoints {
178             private final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints;
179
180
181             @VisibleForTesting
182             public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
183                 this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
184             }
185
186             public Map<Address, Optional<RemoteRpcEndpoint>> getRpcEndpoints() {
187                 return rpcEndpoints;
188             }
189         }
190     }
191 }