2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.ImmutableSet;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.List;
22 import java.util.Map.Entry;
23 import java.util.Optional;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
26 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
34 * Registry to look up cluster nodes that have registered for a given RPC.
37 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
38 * cluster wide information.
40 public class RpcRegistry extends BucketStoreActor<RoutingTable> {
41 private final ActorRef rpcRegistrar;
43 public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
44 super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
45 this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
49 * Create a new props instance for instantiating an RpcRegistry actor.
51 * @param config Provider configuration
52 * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
53 * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
54 * @return A new {@link Props} instance
56 public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
57 final ActorRef rpcRegistrar) {
58 return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
62 protected void handleCommand(final Object message) throws Exception {
63 if (message instanceof AddOrUpdateRoutes) {
64 receiveAddRoutes((AddOrUpdateRoutes) message);
65 } else if (message instanceof RemoveRoutes) {
66 receiveRemoveRoutes((RemoveRoutes) message);
68 super.handleCommand(message);
72 private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
73 LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
74 updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
78 * Processes a RemoveRoutes message.
80 * @param msg contains list of route ids to remove
82 private void receiveRemoveRoutes(final RemoveRoutes msg) {
83 LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
84 updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
88 protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
89 rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
93 protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
94 final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
96 for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
97 final RoutingTable table = e.getValue().getData();
99 final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
100 endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
101 : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
104 if (!endpoints.isEmpty()) {
105 rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
109 public static final class RemoteRpcEndpoint {
110 private final Set<DOMRpcIdentifier> rpcs;
111 private final ActorRef router;
114 public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
115 this.router = Preconditions.checkNotNull(router);
116 this.rpcs = ImmutableSet.copyOf(rpcs);
119 public ActorRef getRouter() {
123 public Set<DOMRpcIdentifier> getRpcs() {
129 * All messages used by the RpcRegistry.
131 public static class Messages {
132 abstract static class AbstractRouteMessage {
133 final List<DOMRpcIdentifier> routeIdentifiers;
135 AbstractRouteMessage(final Collection<DOMRpcIdentifier> routeIdentifiers) {
136 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
137 "Route Identifiers must be supplied");
138 this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers);
141 List<DOMRpcIdentifier> getRouteIdentifiers() {
142 return this.routeIdentifiers;
146 public String toString() {
147 return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
151 public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
152 public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
153 super(routeIdentifiers);
157 public static final class RemoveRoutes extends AbstractRouteMessage {
158 public RemoveRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
159 super(routeIdentifiers);
163 public static final class UpdateRemoteEndpoints {
164 private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
167 public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
168 this.endpoints = ImmutableMap.copyOf(endpoints);
171 public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {