2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.collect.ImmutableList;
18 import com.google.common.collect.ImmutableMap;
19 import com.google.common.collect.ImmutableSet;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
24 import java.util.Map.Entry;
25 import java.util.Optional;
27 import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
33 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
34 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
37 * Registry to look up cluster nodes that have registered for a given RPC.
40 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
41 * cluster wide information.
43 public class RpcRegistry extends BucketStoreActor<RoutingTable> {
44 private final ActorRef rpcRegistrar;
45 private final RemoteRpcRegistryMXBeanImpl mxBean;
47 public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
48 super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
49 this.rpcRegistrar = requireNonNull(rpcRegistrar);
50 this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
51 config.getAskDuration()), config.getAskDuration());
55 * Create a new props instance for instantiating an RpcRegistry actor.
57 * @param config Provider configuration
58 * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
59 * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
60 * @return A new {@link Props} instance
62 public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
63 final ActorRef rpcRegistrar) {
64 return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
68 public void postStop() {
70 this.mxBean.unregister();
74 protected void handleCommand(final Object message) throws Exception {
75 if (message instanceof AddOrUpdateRoutes) {
76 receiveAddRoutes((AddOrUpdateRoutes) message);
77 } else if (message instanceof RemoveRoutes) {
78 receiveRemoveRoutes((RemoveRoutes) message);
80 super.handleCommand(message);
84 private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
85 LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
86 updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
90 * Processes a RemoveRoutes message.
92 * @param msg contains list of route ids to remove
94 private void receiveRemoveRoutes(final RemoveRoutes msg) {
95 LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
96 updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
100 protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
101 rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
102 ActorRef.noSender());
106 protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
107 final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
109 for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
110 final RoutingTable table = e.getValue().getData();
112 final Collection<DOMRpcIdentifier> rpcs = table.getItems();
113 endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
114 : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
117 if (!endpoints.isEmpty()) {
118 rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
122 public static final class RemoteRpcEndpoint {
123 private final Set<DOMRpcIdentifier> rpcs;
124 private final ActorRef router;
127 public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
128 this.router = requireNonNull(router);
129 this.rpcs = ImmutableSet.copyOf(rpcs);
132 public ActorRef getRouter() {
136 public Set<DOMRpcIdentifier> getRpcs() {
142 * All messages used by the RpcRegistry.
144 public static class Messages {
145 abstract static class AbstractRouteMessage {
146 final List<DOMRpcIdentifier> rpcRouteIdentifiers;
148 AbstractRouteMessage(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
149 checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
150 "Route Identifiers must be supplied");
151 this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
154 List<DOMRpcIdentifier> getRouteIdentifiers() {
155 return this.rpcRouteIdentifiers;
159 public String toString() {
160 return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
164 public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
165 public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
166 super(rpcRouteIdentifiers);
171 public static final class RemoveRoutes extends AbstractRouteMessage {
172 public RemoveRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
173 super(rpcRouteIdentifiers);
177 public static final class UpdateRemoteEndpoints {
178 private final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints;
182 public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
183 this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
186 public Map<Address, Optional<RemoteRpcEndpoint>> getRpcEndpoints() {