2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.Address;
13 import akka.actor.Props;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.HashMap;
18 import java.util.Map.Entry;
19 import java.util.Optional;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
21 import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
22 import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
23 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
24 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
25 import org.opendaylight.mdsal.dom.api.DOMActionImplementation;
26 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
27 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
28 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
29 import org.opendaylight.yangtools.concepts.ObjectRegistration;
32 * Actor handling registration of RPCs and Actions available on remote nodes with the local
33 * {@link DOMRpcProviderService} and {@link DOMActionProviderService}.
35 final class OpsRegistrar extends AbstractUntypedActor {
36 private final Map<Address, ObjectRegistration<DOMRpcImplementation>> rpcRegs = new HashMap<>();
37 private final Map<Address, ObjectRegistration<DOMActionImplementation>> actionRegs = new HashMap<>();
38 private final DOMRpcProviderService rpcProviderService;
39 private final RemoteOpsProviderConfig config;
40 private final DOMActionProviderService actionProviderService;
42 OpsRegistrar(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService,
43 final DOMActionProviderService actionProviderService) {
44 this.config = requireNonNull(config);
45 this.rpcProviderService = requireNonNull(rpcProviderService);
46 this.actionProviderService = requireNonNull(actionProviderService);
49 public static Props props(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService,
50 final DOMActionProviderService actionProviderService) {
51 return Props.create(OpsRegistrar.class, requireNonNull(config),
52 requireNonNull(rpcProviderService, "DOMRpcProviderService cannot be null"),
53 requireNonNull(actionProviderService, "DOMActionProviderService cannot be null"));
57 public void postStop() throws Exception {
58 rpcRegs.values().forEach(ObjectRegistration::close);
60 actionRegs.values().forEach(ObjectRegistration::close);
67 protected void handleReceive(final Object message) {
68 if (message instanceof UpdateRemoteEndpoints) {
69 LOG.debug("Handling updateRemoteEndpoints message");
70 updateRemoteRpcEndpoints(((UpdateRemoteEndpoints) message).getRpcEndpoints());
71 } else if (message instanceof UpdateRemoteActionEndpoints) {
72 LOG.debug("Handling updateRemoteActionEndpoints message");
73 updateRemoteActionEndpoints(((UpdateRemoteActionEndpoints) message).getActionEndpoints());
75 unknownMessage(message);
79 private void updateRemoteRpcEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
81 * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close
82 * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
83 * unavailability which would occur if we were to do it the other way around.
85 * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
86 * hence we register all new implementations before closing all registrations.
88 final Collection<ObjectRegistration<?>> prevRegs = new ArrayList<>(rpcEndpoints.size());
90 for (Entry<Address, Optional<RemoteRpcEndpoint>> e : rpcEndpoints.entrySet()) {
91 LOG.debug("Updating RPC registrations for {}", e.getKey());
93 final ObjectRegistration<DOMRpcImplementation> prevReg;
94 final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
95 if (maybeEndpoint.isPresent()) {
96 final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
97 final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
98 prevReg = rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
101 prevReg = rpcRegs.remove(e.getKey());
104 if (prevReg != null) {
105 prevRegs.add(prevReg);
109 prevRegs.forEach(ObjectRegistration::close);
113 * Updates the action endpoints, Adding new registrations first before removing previous registrations.
115 private void updateRemoteActionEndpoints(final Map<Address, Optional<RemoteActionEndpoint>> actionEndpoints) {
117 * Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close
118 * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
119 * unavailability which would occur if we were to do it the other way around.
121 * Note that when an Action moves from one remote node to another, we also do not want to expose the gap,
122 * hence we register all new implementations before closing all registrations.
124 final Collection<ObjectRegistration<?>> prevRegs = new ArrayList<>(actionEndpoints.size());
126 for (Entry<Address, Optional<RemoteActionEndpoint>> e : actionEndpoints.entrySet()) {
127 LOG.debug("Updating action registrations for {}", e.getKey());
129 final ObjectRegistration<DOMActionImplementation> prevReg;
130 final Optional<RemoteActionEndpoint> maybeEndpoint = e.getValue();
131 if (maybeEndpoint.isPresent()) {
132 final RemoteActionEndpoint endpoint = maybeEndpoint.get();
133 final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config);
134 prevReg = actionRegs.put(e.getKey(), actionProviderService.registerActionImplementation(impl,
135 endpoint.getActions()));
137 prevReg = actionRegs.remove(e.getKey());
140 if (prevReg != null) {
141 prevRegs.add(prevReg);
145 prevRegs.forEach(ObjectRegistration::close);