2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.ImmutableSet;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.Map.Entry;
22 import java.util.Optional;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
25 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
26 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
32 import org.opendaylight.controller.sal.connector.api.RpcRouter;
33 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
34 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
37 * Registry to look up cluster nodes that have registered for a given RPC.
40 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
41 * cluster wide information.
43 public class RpcRegistry extends BucketStore<RoutingTable> {
44 private final ActorRef rpcRegistrar;
46 public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
47 super(config, new RoutingTable(rpcInvoker));
48 this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
52 * Create a new props instance for instantiating an RpcRegistry actor.
54 * @param config Provider configuration
55 * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
56 * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
57 * @return A new {@link Props} instance
59 public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
60 final ActorRef rpcRegistrar) {
61 return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
65 protected void handleReceive(final Object message) throws Exception {
66 if (message instanceof AddOrUpdateRoutes) {
67 receiveAddRoutes((AddOrUpdateRoutes) message);
68 } else if (message instanceof RemoveRoutes) {
69 receiveRemoveRoutes((RemoveRoutes) message);
71 super.handleReceive(message);
75 private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
76 LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
78 RoutingTable table = getLocalBucket().getData().copy();
79 for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
80 table.addRoute(routeId);
83 updateLocalBucket(table);
87 * @param msg contains list of route ids to remove
89 private void receiveRemoveRoutes(final RemoveRoutes msg) {
90 RoutingTable table = getLocalBucket().getData().copy();
91 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
92 table.removeRoute(routeId);
95 updateLocalBucket(table);
99 protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
100 rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
104 protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
105 final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
107 for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
108 final RoutingTable table = e.getValue().getData();
110 final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
111 for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
112 if (ri instanceof RouteIdentifierImpl) {
113 final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
114 rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
116 LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
120 endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
121 : Optional.of(new RemoteRpcEndpoint(table.getRouter(), rpcs)));
124 if (!endpoints.isEmpty()) {
125 rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
129 public static final class RemoteRpcEndpoint {
130 private final Set<DOMRpcIdentifier> rpcs;
131 private final ActorRef router;
133 RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
134 this.router = Preconditions.checkNotNull(router);
135 this.rpcs = ImmutableSet.copyOf(rpcs);
138 public ActorRef getRouter() {
142 public Set<DOMRpcIdentifier> getRpcs() {
148 * All messages used by the RpcRegistry
150 public static class Messages {
151 abstract static class AbstractRouteMessage {
152 final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
154 AbstractRouteMessage(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
155 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
156 "Route Identifiers must be supplied");
157 this.routeIdentifiers = routeIdentifiers;
160 List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
161 return this.routeIdentifiers;
165 public String toString() {
166 return "ContainsRoute{" +
167 "routeIdentifiers=" + routeIdentifiers +
172 public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
173 public AddOrUpdateRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
174 super(routeIdentifiers);
178 public static final class RemoveRoutes extends AbstractRouteMessage {
179 public RemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
180 super(routeIdentifiers);
184 public static final class UpdateRemoteEndpoints {
185 private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
187 UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
188 this.endpoints = ImmutableMap.copyOf(endpoints);
191 public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {