fe43a691d2d685ee0c9fa16f0251c3e925d1c5be
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.ImmutableSet;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Optional;
24 import java.util.Set;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
26 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
32 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
33
34 /**
35  * Registry to look up cluster nodes that have registered for a given RPC.
36  *
37  * <p>
38  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
39  * cluster wide information.
40  */
41 public class RpcRegistry extends BucketStoreActor<RoutingTable> {
42     private final ActorRef rpcRegistrar;
43     private final RemoteRpcRegistryMXBeanImpl mxBean;
44
45     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
46         super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
47         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
48         this.mxBean = new RemoteRpcRegistryMXBeanImpl(this);
49     }
50
51     /**
52      * Create a new props instance for instantiating an RpcRegistry actor.
53      *
54      * @param config Provider configuration
55      * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
56      * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
57      * @return A new {@link Props} instance
58      */
59     public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
60             final ActorRef rpcRegistrar) {
61         return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
62     }
63
64     @Override
65     public void postStop() {
66         super.postStop();
67         this.mxBean.unregister();
68     }
69
70     @Override
71     protected void handleCommand(final Object message) throws Exception {
72         if (message instanceof AddOrUpdateRoutes) {
73             receiveAddRoutes((AddOrUpdateRoutes) message);
74         } else if (message instanceof RemoveRoutes) {
75             receiveRemoveRoutes((RemoveRoutes) message);
76         } else {
77             super.handleCommand(message);
78         }
79     }
80
81     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
82         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
83         updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
84     }
85
86     /**
87      * Processes a RemoveRoutes message.
88      *
89      * @param msg contains list of route ids to remove
90      */
91     private void receiveRemoveRoutes(final RemoveRoutes msg) {
92         LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
93         updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
94     }
95
96     @Override
97     protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
98         rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
99     }
100
101     @Override
102     protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
103         final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
104
105         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
106             final RoutingTable table = e.getValue().getData();
107
108             final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
109             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
110                     : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
111         }
112
113         if (!endpoints.isEmpty()) {
114             rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
115         }
116     }
117
118     public static final class RemoteRpcEndpoint {
119         private final Set<DOMRpcIdentifier> rpcs;
120         private final ActorRef router;
121
122         @VisibleForTesting
123         public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
124             this.router = Preconditions.checkNotNull(router);
125             this.rpcs = ImmutableSet.copyOf(rpcs);
126         }
127
128         public ActorRef getRouter() {
129             return router;
130         }
131
132         public Set<DOMRpcIdentifier> getRpcs() {
133             return rpcs;
134         }
135     }
136
137     /**
138      * All messages used by the RpcRegistry.
139      */
140     public static class Messages {
141         abstract static class AbstractRouteMessage {
142             final List<DOMRpcIdentifier> routeIdentifiers;
143
144             AbstractRouteMessage(final Collection<DOMRpcIdentifier> routeIdentifiers) {
145                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
146                         "Route Identifiers must be supplied");
147                 this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers);
148             }
149
150             List<DOMRpcIdentifier> getRouteIdentifiers() {
151                 return this.routeIdentifiers;
152             }
153
154             @Override
155             public String toString() {
156                 return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
157             }
158         }
159
160         public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
161             public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
162                 super(routeIdentifiers);
163             }
164         }
165
166         public static final class RemoveRoutes extends AbstractRouteMessage {
167             public RemoveRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
168                 super(routeIdentifiers);
169             }
170         }
171
172         public static final class UpdateRemoteEndpoints {
173             private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
174
175             @VisibleForTesting
176             public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
177                 this.endpoints = ImmutableMap.copyOf(endpoints);
178             }
179
180             public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
181                 return endpoints;
182             }
183         }
184     }
185 }