Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / OpsRegistrar.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.Address;
13 import akka.actor.Props;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.Optional;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
21 import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
22 import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
23 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
24 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
25 import org.opendaylight.mdsal.dom.api.DOMActionImplementation;
26 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
27 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
28 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
29 import org.opendaylight.yangtools.concepts.ObjectRegistration;
30
31 /**
32  * Actor handling registration of RPCs and Actions available on remote nodes with the local
33  * {@link DOMRpcProviderService} and {@link DOMActionProviderService}.
34  */
35 final class OpsRegistrar extends AbstractUntypedActor {
36     private final Map<Address, ObjectRegistration<DOMRpcImplementation>> rpcRegs = new HashMap<>();
37     private final Map<Address, ObjectRegistration<DOMActionImplementation>> actionRegs = new HashMap<>();
38     private final DOMRpcProviderService rpcProviderService;
39     private final RemoteOpsProviderConfig config;
40     private final DOMActionProviderService actionProviderService;
41
42     OpsRegistrar(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService,
43                  final DOMActionProviderService actionProviderService) {
44         this.config = requireNonNull(config);
45         this.rpcProviderService = requireNonNull(rpcProviderService);
46         this.actionProviderService = requireNonNull(actionProviderService);
47     }
48
49     public static Props props(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService,
50                               final DOMActionProviderService actionProviderService) {
51         return Props.create(OpsRegistrar.class, requireNonNull(config),
52             requireNonNull(rpcProviderService, "DOMRpcProviderService cannot be null"),
53             requireNonNull(actionProviderService, "DOMActionProviderService cannot be null"));
54     }
55
56     @Override
57     public void postStop() throws Exception {
58         rpcRegs.values().forEach(ObjectRegistration::close);
59         rpcRegs.clear();
60         actionRegs.values().forEach(ObjectRegistration::close);
61         actionRegs.clear();
62
63         super.postStop();
64     }
65
66     @Override
67     protected void handleReceive(final Object message) {
68         if (message instanceof UpdateRemoteEndpoints) {
69             LOG.debug("Handling updateRemoteEndpoints message");
70             updateRemoteRpcEndpoints(((UpdateRemoteEndpoints) message).getRpcEndpoints());
71         } else if (message instanceof UpdateRemoteActionEndpoints) {
72             LOG.debug("Handling updateRemoteActionEndpoints message");
73             updateRemoteActionEndpoints(((UpdateRemoteActionEndpoints) message).getActionEndpoints());
74         } else {
75             unknownMessage(message);
76         }
77     }
78
79     private void updateRemoteRpcEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
80         /*
81          * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close
82          * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
83          * unavailability which would occur if we were to do it the other way around.
84          *
85          * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
86          * hence we register all new implementations before closing all registrations.
87          */
88         final Collection<ObjectRegistration<?>> prevRegs = new ArrayList<>(rpcEndpoints.size());
89
90         for (Entry<Address, Optional<RemoteRpcEndpoint>> e : rpcEndpoints.entrySet()) {
91             LOG.debug("Updating RPC registrations for {}", e.getKey());
92
93             final ObjectRegistration<DOMRpcImplementation> prevReg;
94             final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
95             if (maybeEndpoint.isPresent()) {
96                 final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
97                 final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
98                 prevReg = rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
99                     endpoint.getRpcs()));
100             } else {
101                 prevReg = rpcRegs.remove(e.getKey());
102             }
103
104             if (prevReg != null) {
105                 prevRegs.add(prevReg);
106             }
107         }
108
109         prevRegs.forEach(ObjectRegistration::close);
110     }
111
112     /**
113      * Updates the action endpoints, Adding new registrations first before removing previous registrations.
114      */
115     private void updateRemoteActionEndpoints(final Map<Address, Optional<RemoteActionEndpoint>> actionEndpoints) {
116         /*
117          * Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close
118          * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
119          * unavailability which would occur if we were to do it the other way around.
120          *
121          * Note that when an Action moves from one remote node to another, we also do not want to expose the gap,
122          * hence we register all new implementations before closing all registrations.
123          */
124         final Collection<ObjectRegistration<?>> prevRegs = new ArrayList<>(actionEndpoints.size());
125
126         for (Entry<Address, Optional<RemoteActionEndpoint>> e : actionEndpoints.entrySet()) {
127             LOG.debug("Updating action registrations for {}", e.getKey());
128
129             final ObjectRegistration<DOMActionImplementation> prevReg;
130             final Optional<RemoteActionEndpoint> maybeEndpoint = e.getValue();
131             if (maybeEndpoint.isPresent()) {
132                 final RemoteActionEndpoint endpoint = maybeEndpoint.get();
133                 final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config);
134                 prevReg = actionRegs.put(e.getKey(), actionProviderService.registerActionImplementation(impl,
135                     endpoint.getActions()));
136             } else {
137                 prevReg = actionRegs.remove(e.getKey());
138             }
139
140             if (prevReg != null) {
141                 prevRegs.add(prevReg);
142             }
143         }
144
145         prevRegs.forEach(ObjectRegistration::close);
146     }
147 }