Add support for reusable streaming
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcRegistrar.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc;
9
10 import akka.actor.Address;
11 import akka.actor.Props;
12 import com.google.common.base.Preconditions;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.Map.Entry;
18 import java.util.Optional;
19 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
21 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
22 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
23 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
24
25 /**
26  * Actor handling registration of RPCs available on remote nodes with the local {@link DOMRpcProviderService}.
27  *
28  * @author Robert Varga
29  */
30 final class RpcRegistrar extends AbstractUntypedActor {
31     private final Map<Address, DOMRpcImplementationRegistration<?>> regs = new HashMap<>();
32     private final DOMRpcProviderService rpcProviderService;
33     private final RemoteRpcProviderConfig config;
34
35     RpcRegistrar(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) {
36         this.config = Preconditions.checkNotNull(config);
37         this.rpcProviderService = Preconditions.checkNotNull(rpcProviderService);
38     }
39
40     public static Props props(final RemoteRpcProviderConfig config, final DOMRpcProviderService rpcProviderService) {
41         Preconditions.checkNotNull(rpcProviderService, "DOMRpcProviderService cannot be null");
42         return Props.create(RpcRegistrar.class, config, rpcProviderService);
43     }
44
45     @Override
46     public void postStop() throws Exception {
47         regs.values().forEach(DOMRpcImplementationRegistration::close);
48         regs.clear();
49
50         super.postStop();
51     }
52
53     @Override
54     protected void handleReceive(final Object message) {
55         if (message instanceof UpdateRemoteEndpoints) {
56             updateRemoteEndpoints(((UpdateRemoteEndpoints) message).getEndpoints());
57         } else {
58             unknownMessage(message);
59         }
60     }
61
62     private void updateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
63         /*
64          * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close
65          * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
66          * unavailability which would occur if we were to do it the other way around.
67          *
68          * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
69          * hence we register all new implementations before closing all registrations.
70          */
71         final Collection<DOMRpcImplementationRegistration<?>> prevRegs = new ArrayList<>(endpoints.size());
72
73         for (Entry<Address, Optional<RemoteRpcEndpoint>> e : endpoints.entrySet()) {
74             LOG.debug("Updating RPC registrations for {}", e.getKey());
75
76             final DOMRpcImplementationRegistration<?> prevReg;
77             final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
78             if (maybeEndpoint.isPresent()) {
79                 final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
80                 final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
81                 prevReg = regs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
82                     endpoint.getRpcs()));
83             } else {
84                 prevReg = regs.remove(e.getKey());
85             }
86
87             if (prevReg != null) {
88                 prevRegs.add(prevReg);
89             }
90         }
91
92         for (DOMRpcImplementationRegistration<?> r : prevRegs) {
93             r.close();
94         }
95     }
96 }