Bump to odlparent-4.0.13/yangtools-2.1.13/mdsal-3.0.7
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.ImmutableSet;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Optional;
24 import java.util.Set;
25 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
32 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
33 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
34
35 /**
36  * Registry to look up cluster nodes that have registered for a given RPC.
37  *
38  * <p>
39  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
40  * cluster wide information.
41  */
42 public class RpcRegistry extends BucketStoreActor<RoutingTable> {
43     private final ActorRef rpcRegistrar;
44     private final RemoteRpcRegistryMXBeanImpl mxBean;
45
46     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
47         super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
48         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
49         this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
50                 config.getAskDuration()), config.getAskDuration());
51     }
52
53     /**
54      * Create a new props instance for instantiating an RpcRegistry actor.
55      *
56      * @param config Provider configuration
57      * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
58      * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
59      * @return A new {@link Props} instance
60      */
61     public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
62             final ActorRef rpcRegistrar) {
63         return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
64     }
65
66     @Override
67     public void postStop() throws Exception {
68         try {
69             super.postStop();
70         } finally {
71             this.mxBean.unregister();
72         }
73     }
74
75     @Override
76     protected void handleCommand(final Object message) throws Exception {
77         if (message instanceof AddOrUpdateRoutes) {
78             receiveAddRoutes((AddOrUpdateRoutes) message);
79         } else if (message instanceof RemoveRoutes) {
80             receiveRemoveRoutes((RemoveRoutes) message);
81         } else {
82             super.handleCommand(message);
83         }
84     }
85
86     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
87         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
88         updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
89     }
90
91     /**
92      * Processes a RemoveRoutes message.
93      *
94      * @param msg contains list of route ids to remove
95      */
96     private void receiveRemoveRoutes(final RemoveRoutes msg) {
97         LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
98         updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
99     }
100
101     @Override
102     protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
103         rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
104     }
105
106     @Override
107     protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
108         final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
109
110         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
111             final RoutingTable table = e.getValue().getData();
112
113             final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
114             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
115                     : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
116         }
117
118         if (!endpoints.isEmpty()) {
119             rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
120         }
121     }
122
123     public static final class RemoteRpcEndpoint {
124         private final Set<DOMRpcIdentifier> rpcs;
125         private final ActorRef router;
126
127         @VisibleForTesting
128         public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
129             this.router = Preconditions.checkNotNull(router);
130             this.rpcs = ImmutableSet.copyOf(rpcs);
131         }
132
133         public ActorRef getRouter() {
134             return router;
135         }
136
137         public Set<DOMRpcIdentifier> getRpcs() {
138             return rpcs;
139         }
140     }
141
142     /**
143      * All messages used by the RpcRegistry.
144      */
145     public static class Messages {
146         abstract static class AbstractRouteMessage {
147             final List<DOMRpcIdentifier> routeIdentifiers;
148
149             AbstractRouteMessage(final Collection<DOMRpcIdentifier> routeIdentifiers) {
150                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
151                         "Route Identifiers must be supplied");
152                 this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers);
153             }
154
155             List<DOMRpcIdentifier> getRouteIdentifiers() {
156                 return this.routeIdentifiers;
157             }
158
159             @Override
160             public String toString() {
161                 return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
162             }
163         }
164
165         public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
166             public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
167                 super(routeIdentifiers);
168             }
169         }
170
171         public static final class RemoveRoutes extends AbstractRouteMessage {
172             public RemoveRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
173                 super(routeIdentifiers);
174             }
175         }
176
177         public static final class UpdateRemoteEndpoints {
178             private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
179
180             @VisibleForTesting
181             public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
182                 this.endpoints = ImmutableMap.copyOf(endpoints);
183             }
184
185             public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
186                 return endpoints;
187             }
188         }
189     }
190 }