Migrate Assert.assertThat()
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.collect.ImmutableList;
18 import com.google.common.collect.ImmutableMap;
19 import com.google.common.collect.ImmutableSet;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.Optional;
26 import java.util.Set;
27 import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
33 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
34 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
35
36 /**
37  * Registry to look up cluster nodes that have registered for a given RPC.
38  *
39  * <p>
40  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
41  * cluster wide information.
42  */
43 public class RpcRegistry extends BucketStoreActor<RoutingTable> {
44     private final ActorRef rpcRegistrar;
45     private RemoteRpcRegistryMXBeanImpl mxBean;
46
47     public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
48         super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
49         this.rpcRegistrar = requireNonNull(rpcRegistrar);
50
51     }
52
53     /**
54      * Create a new props instance for instantiating an RpcRegistry actor.
55      *
56      * @param config Provider configuration
57      * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
58      * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
59      * @return A new {@link Props} instance
60      */
61     public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
62                               final ActorRef rpcRegistrar) {
63         return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
64     }
65
66     @Override
67     public void preStart() {
68         super.preStart();
69         mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
70             getConfig().getAskDuration()), getConfig().getAskDuration());
71     }
72
73     @Override
74     public void postStop() throws Exception {
75         if (mxBean != null) {
76             mxBean.unregister();
77             mxBean = null;
78         }
79         super.postStop();
80     }
81
82     @Override
83     protected void handleCommand(final Object message) throws Exception {
84         if (message instanceof AddOrUpdateRoutes) {
85             receiveAddRoutes((AddOrUpdateRoutes) message);
86         } else if (message instanceof RemoveRoutes) {
87             receiveRemoveRoutes((RemoveRoutes) message);
88         } else {
89             super.handleCommand(message);
90         }
91     }
92
93     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
94         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
95         updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
96     }
97
98     /**
99      * Processes a RemoveRoutes message.
100      *
101      * @param msg contains list of route ids to remove
102      */
103     private void receiveRemoveRoutes(final RemoveRoutes msg) {
104         LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
105         updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
106     }
107
108     @Override
109     protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
110         rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
111                 ActorRef.noSender());
112     }
113
114     @Override
115     protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
116         final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
117
118         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
119             final RoutingTable table = e.getValue().getData();
120
121             final Collection<DOMRpcIdentifier> rpcs = table.getItems();
122             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
123                     : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
124         }
125
126         if (!endpoints.isEmpty()) {
127             rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
128         }
129     }
130
131     public static final class RemoteRpcEndpoint {
132         private final Set<DOMRpcIdentifier> rpcs;
133         private final ActorRef router;
134
135         @VisibleForTesting
136         public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
137             this.router = requireNonNull(router);
138             this.rpcs = ImmutableSet.copyOf(rpcs);
139         }
140
141         public ActorRef getRouter() {
142             return router;
143         }
144
145         public Set<DOMRpcIdentifier> getRpcs() {
146             return rpcs;
147         }
148     }
149
150     /**
151      * All messages used by the RpcRegistry.
152      */
153     public static class Messages {
154         abstract static class AbstractRouteMessage {
155             final List<DOMRpcIdentifier> rpcRouteIdentifiers;
156
157             AbstractRouteMessage(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
158                 checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
159                         "Route Identifiers must be supplied");
160                 this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
161             }
162
163             List<DOMRpcIdentifier> getRouteIdentifiers() {
164                 return this.rpcRouteIdentifiers;
165             }
166
167             @Override
168             public String toString() {
169                 return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
170             }
171         }
172
173         public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
174             public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
175                 super(rpcRouteIdentifiers);
176             }
177
178         }
179
180         public static final class RemoveRoutes extends AbstractRouteMessage {
181             public RemoveRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
182                 super(rpcRouteIdentifiers);
183             }
184         }
185
186         public static final class UpdateRemoteEndpoints {
187             private final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints;
188
189
190             @VisibleForTesting
191             public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
192                 this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
193             }
194
195             public Map<Address, Optional<RemoteRpcEndpoint>> getRpcEndpoints() {
196                 return rpcEndpoints;
197             }
198         }
199     }
200 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.