This ensures registrations are closed as soon as they are not
needed.
JIRA: CONTROLLER-1906
Change-Id: I3a391f202963852f47486b78748c8e2d7e97162a
Signed-off-by: EmmettCox <emmett.cox@est.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Address;
import akka.actor.Props;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
@Override
public void postStop() throws Exception {
@Override
public void postStop() throws Exception {
+ rpcRegs.values().forEach(ObjectRegistration::close);
+ actionRegs.values().forEach(ObjectRegistration::close);
actionRegs.clear();
super.postStop();
actionRegs.clear();
super.postStop();
* Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
* hence we register all new implementations before closing all registrations.
*/
* Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
* hence we register all new implementations before closing all registrations.
*/
+ final Collection<ObjectRegistration<?>> prevRegs = new ArrayList<>(rpcEndpoints.size());
+
for (Entry<Address, Optional<RemoteRpcEndpoint>> e : rpcEndpoints.entrySet()) {
LOG.debug("Updating RPC registrations for {}", e.getKey());
for (Entry<Address, Optional<RemoteRpcEndpoint>> e : rpcEndpoints.entrySet()) {
LOG.debug("Updating RPC registrations for {}", e.getKey());
+ final ObjectRegistration<DOMRpcImplementation> prevReg;
final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
if (maybeEndpoint.isPresent()) {
final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
if (maybeEndpoint.isPresent()) {
final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
- rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
- endpoint.getRpcs()));
+ prevReg = rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
+ endpoint.getRpcs()));
- rpcRegs.remove(e.getKey());
+ prevReg = rpcRegs.remove(e.getKey());
+ }
+
+ if (prevReg != null) {
+ prevRegs.add(prevReg);
+
+ prevRegs.forEach(ObjectRegistration::close);
}
/**
* Updates the action endpoints, Adding new registrations first before removing previous registrations.
*/
}
/**
* Updates the action endpoints, Adding new registrations first before removing previous registrations.
*/
- private void updateRemoteActionEndpoints(final Map<Address,
- Optional<ActionRegistry.RemoteActionEndpoint>> actionEndpoints) {
+ private void updateRemoteActionEndpoints(final Map<Address, Optional<RemoteActionEndpoint>> actionEndpoints) {
/*
* Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close
* the old registration. This minimizes churn observed by listeners, as they will not observe RPC
/*
* Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close
* the old registration. This minimizes churn observed by listeners, as they will not observe RPC
* Note that when an Action moves from one remote node to another, we also do not want to expose the gap,
* hence we register all new implementations before closing all registrations.
*/
* Note that when an Action moves from one remote node to another, we also do not want to expose the gap,
* hence we register all new implementations before closing all registrations.
*/
+ final Collection<ObjectRegistration<?>> prevRegs = new ArrayList<>(actionEndpoints.size());
+
for (Entry<Address, Optional<RemoteActionEndpoint>> e : actionEndpoints.entrySet()) {
for (Entry<Address, Optional<RemoteActionEndpoint>> e : actionEndpoints.entrySet()) {
- LOG.debug("Updating Action registrations for {}", e.getKey());
+ LOG.debug("Updating action registrations for {}", e.getKey());
+ final ObjectRegistration<DOMActionImplementation> prevReg;
final Optional<RemoteActionEndpoint> maybeEndpoint = e.getValue();
if (maybeEndpoint.isPresent()) {
final RemoteActionEndpoint endpoint = maybeEndpoint.get();
final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config);
final Optional<RemoteActionEndpoint> maybeEndpoint = e.getValue();
if (maybeEndpoint.isPresent()) {
final RemoteActionEndpoint endpoint = maybeEndpoint.get();
final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config);
- actionRegs.put(e.getKey(),
- actionProviderService.registerActionImplementation(impl, endpoint.getActions()));
+ prevReg = actionRegs.put(e.getKey(), actionProviderService.registerActionImplementation(impl,
+ endpoint.getActions()));
- actionRegs.remove(e.getKey());
+ prevReg = actionRegs.remove(e.getKey());
+ }
+
+ if (prevReg != null) {
+ prevRegs.add(prevReg);
+
+ prevRegs.forEach(ObjectRegistration::close);
inOrder.verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
eq(secondEndpoint.getRpcs()));
inOrder.verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
eq(secondEndpoint.getRpcs()));
+ // verify first registration is closed
+ inOrder.verify(oldReg).close();
verifyNoMoreInteractions(rpcService, oldReg, newReg);
}
verifyNoMoreInteractions(rpcService, oldReg, newReg);
}
eq(secondActionEndpoint.getActions()));
// verify first registration is closed
eq(secondActionEndpoint.getActions()));
// verify first registration is closed
-// inOrder.verify(oldReg).close();
+ inOrder.verify(oldActionReg).close();
verifyNoMoreInteractions(actionService, oldActionReg, newActionReg);
}
verifyNoMoreInteractions(actionService, oldActionReg, newActionReg);
}