Teach sal-remoterpc-connector to route actions
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / ActionRegistry.java
1 /*
2  * Copyright (c) 2019 Nordix Foundation.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.ImmutableSet;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Optional;
22 import java.util.Set;
23 import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
27 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteActionRegistryMXBeanImpl;
28 import org.opendaylight.mdsal.dom.api.DOMActionInstance;
29
30 /**
31  * Registry to look up cluster nodes that have registered for a given Action.
32  *
33  * <p>
34  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
35  * cluster wide information.
36  */
37 public class ActionRegistry extends BucketStoreActor<ActionRoutingTable> {
38     private final ActorRef rpcRegistrar;
39     private final RemoteActionRegistryMXBeanImpl mxBean;
40
41     public ActionRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
42                           final ActorRef rpcRegistrar) {
43         super(config, config.getRpcRegistryPersistenceId(), new ActionRoutingTable(rpcInvoker, ImmutableSet.of()));
44         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
45         this.mxBean = new RemoteActionRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
46                 config.getAskDuration()), config.getAskDuration());
47     }
48
49     /**
50      * Create a new props instance for instantiating an ActionRegistry actor.
51      *
52      * @param config Provider configuration
53      * @param opsRegistrar Local RPC provider interface, used to register routers to remote nodes
54      * @param opsInvoker Actor handling RPC invocation requests from remote nodes
55      * @return A new {@link Props} instance
56      */
57     public static Props props(final RemoteOpsProviderConfig config, final ActorRef opsInvoker,
58                               final ActorRef opsRegistrar) {
59         return Props.create(ActionRegistry.class, config, opsInvoker, opsRegistrar);
60     }
61
62     @Override
63     public void postStop() {
64         super.postStop();
65         this.mxBean.unregister();
66     }
67
68     @Override
69     protected void handleCommand(final Object message) throws Exception {
70         if (message instanceof ActionRegistry.Messages.UpdateActions) {
71             LOG.debug("handling updatesActionRoutes message");
72             updatesActionRoutes((Messages.UpdateActions) message);
73         } else {
74             super.handleCommand(message);
75         }
76     }
77
78     private void updatesActionRoutes(final Messages.UpdateActions msg) {
79         LOG.debug("addedActions: {}", msg.getAddedActions());
80         LOG.debug("removedActions: {}", msg.getRemovedActions());
81         updateLocalBucket(getLocalData().updateActions(msg.getAddedActions(), msg.getRemovedActions()));
82     }
83
84     @Override
85     protected void onBucketRemoved(final Address address, final Bucket<ActionRoutingTable> bucket) {
86         rpcRegistrar.tell(new Messages.UpdateRemoteActionEndpoints(ImmutableMap.of(address, Optional.empty())),
87             ActorRef.noSender());
88     }
89
90     @Override
91     protected void onBucketsUpdated(final Map<Address, Bucket<ActionRoutingTable>> buckets) {
92         LOG.debug("Updating buckets for action registry");
93         final Map<Address, Optional<RemoteActionEndpoint>> endpoints = new HashMap<>(buckets.size());
94
95         for (Map.Entry<Address, Bucket<ActionRoutingTable>> e : buckets.entrySet()) {
96             final ActionRoutingTable table = e.getValue().getData();
97
98             final Collection<DOMActionInstance> actions = table.getItems();
99             endpoints.put(e.getKey(), actions.isEmpty() ? Optional.empty()
100                 : Optional.of(new RemoteActionEndpoint(table.getInvoker(), actions)));
101         }
102
103         if (!endpoints.isEmpty()) {
104             rpcRegistrar.tell(new Messages.UpdateRemoteActionEndpoints(endpoints), ActorRef.noSender());
105         }
106     }
107
108     public static final class RemoteActionEndpoint {
109         private final Set<DOMActionInstance> actions;
110         private final ActorRef router;
111
112         @VisibleForTesting
113         public RemoteActionEndpoint(final ActorRef router, final Collection<DOMActionInstance> actions) {
114             this.router = Preconditions.checkNotNull(router);
115             this.actions = ImmutableSet.copyOf(actions);
116         }
117
118         public ActorRef getRouter() {
119             return router;
120         }
121
122         public Set<DOMActionInstance> getActions() {
123             return actions;
124         }
125     }
126
127         /**
128          * All messages used by the ActionRegistry.
129          */
130     public static class Messages {
131         abstract static class AbstractActionRouteMessage {
132             final Collection<DOMActionInstance> addedActions;
133             final Collection<DOMActionInstance> removedActions;
134
135             AbstractActionRouteMessage(final Collection<DOMActionInstance> addedActions,
136                                        final Collection<DOMActionInstance> removedActions) {
137                 this.addedActions = ImmutableList.copyOf(addedActions);
138                 this.removedActions = ImmutableList.copyOf(removedActions);
139             }
140
141             Collection<DOMActionInstance> getAddedActions() {
142                 return this.addedActions;
143             }
144
145             Collection<DOMActionInstance> getRemovedActions() {
146                 return this.removedActions;
147             }
148
149
150             @Override
151             public String toString() {
152                 return "ContainsRoute{" + "addedActions=" + addedActions + " removedActions=" + removedActions + '}';
153             }
154         }
155
156
157         public static final class UpdateActions extends AbstractActionRouteMessage {
158             public UpdateActions(final Collection<DOMActionInstance> addedActions,
159                                  final Collection<DOMActionInstance> removedActions) {
160                 super(addedActions, removedActions);
161             }
162
163         }
164
165         public static final class UpdateRemoteActionEndpoints {
166             private final Map<Address, Optional<RemoteActionEndpoint>> actionEndpoints;
167
168             @VisibleForTesting
169             public UpdateRemoteActionEndpoints(final Map<Address, Optional<RemoteActionEndpoint>>
170                                                                    actionEndpoints) {
171                 this.actionEndpoints = ImmutableMap.copyOf(actionEndpoints);
172             }
173
174             public Map<Address, Optional<RemoteActionEndpoint>> getActionEndpoints() {
175                 return actionEndpoints;
176             }
177         }
178     }
179 }