Make sure registrations are closed 30/83530/4
authorEmmettCox <emmett.cox@est.tech>
Thu, 8 Aug 2019 11:01:42 +0000 (12:01 +0100)
committerRobert Varga <nite@hq.sk>
Tue, 13 Aug 2019 08:25:16 +0000 (08:25 +0000)
This ensures registrations are closed as soon as they are not
needed.

JIRA: CONTROLLER-1906
Change-Id: I3a391f202963852f47486b78748c8e2d7e97162a
Signed-off-by: EmmettCox <emmett.cox@est.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsRegistrar.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/OpsRegistrarTest.java

index fdda197c3bf02f6dedf64859cf042438d483bf41..a16bbd4eb5236413d036bccc3a6cbf6c922dfb34 100644 (file)
@@ -11,12 +11,13 @@ import static java.util.Objects.requireNonNull;
 
 import akka.actor.Address;
 import akka.actor.Props;
 
 import akka.actor.Address;
 import akka.actor.Props;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
 import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
@@ -54,7 +55,9 @@ final class OpsRegistrar extends AbstractUntypedActor {
 
     @Override
     public void postStop() throws Exception {
 
     @Override
     public void postStop() throws Exception {
+        rpcRegs.values().forEach(ObjectRegistration::close);
         rpcRegs.clear();
         rpcRegs.clear();
+        actionRegs.values().forEach(ObjectRegistration::close);
         actionRegs.clear();
 
         super.postStop();
         actionRegs.clear();
 
         super.postStop();
@@ -82,26 +85,34 @@ final class OpsRegistrar extends AbstractUntypedActor {
          * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
          * hence we register all new implementations before closing all registrations.
          */
          * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
          * hence we register all new implementations before closing all registrations.
          */
+        final Collection<ObjectRegistration<?>> prevRegs = new ArrayList<>(rpcEndpoints.size());
+
         for (Entry<Address, Optional<RemoteRpcEndpoint>> e : rpcEndpoints.entrySet()) {
             LOG.debug("Updating RPC registrations for {}", e.getKey());
 
         for (Entry<Address, Optional<RemoteRpcEndpoint>> e : rpcEndpoints.entrySet()) {
             LOG.debug("Updating RPC registrations for {}", e.getKey());
 
+            final ObjectRegistration<DOMRpcImplementation> prevReg;
             final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
             if (maybeEndpoint.isPresent()) {
                 final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
                 final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
             final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
             if (maybeEndpoint.isPresent()) {
                 final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
                 final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
-                rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
-                        endpoint.getRpcs()));
+                prevReg = rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
+                    endpoint.getRpcs()));
             } else {
             } else {
-                rpcRegs.remove(e.getKey());
+                prevReg = rpcRegs.remove(e.getKey());
+            }
+
+            if (prevReg != null) {
+                prevRegs.add(prevReg);
             }
         }
             }
         }
+
+        prevRegs.forEach(ObjectRegistration::close);
     }
 
     /**
      * Updates the action endpoints, Adding new registrations first before removing previous registrations.
      */
     }
 
     /**
      * Updates the action endpoints, Adding new registrations first before removing previous registrations.
      */
-    private void updateRemoteActionEndpoints(final Map<Address,
-            Optional<ActionRegistry.RemoteActionEndpoint>> actionEndpoints) {
+    private void updateRemoteActionEndpoints(final Map<Address, Optional<RemoteActionEndpoint>> actionEndpoints) {
         /*
          * Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close
          * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
         /*
          * Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close
          * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
@@ -110,18 +121,27 @@ final class OpsRegistrar extends AbstractUntypedActor {
          * Note that when an Action moves from one remote node to another, we also do not want to expose the gap,
          * hence we register all new implementations before closing all registrations.
          */
          * Note that when an Action moves from one remote node to another, we also do not want to expose the gap,
          * hence we register all new implementations before closing all registrations.
          */
+        final Collection<ObjectRegistration<?>> prevRegs = new ArrayList<>(actionEndpoints.size());
+
         for (Entry<Address, Optional<RemoteActionEndpoint>> e : actionEndpoints.entrySet()) {
         for (Entry<Address, Optional<RemoteActionEndpoint>> e : actionEndpoints.entrySet()) {
-            LOG.debug("Updating Action registrations for {}", e.getKey());
+            LOG.debug("Updating action registrations for {}", e.getKey());
 
 
+            final ObjectRegistration<DOMActionImplementation> prevReg;
             final Optional<RemoteActionEndpoint> maybeEndpoint = e.getValue();
             if (maybeEndpoint.isPresent()) {
                 final RemoteActionEndpoint endpoint = maybeEndpoint.get();
                 final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config);
             final Optional<RemoteActionEndpoint> maybeEndpoint = e.getValue();
             if (maybeEndpoint.isPresent()) {
                 final RemoteActionEndpoint endpoint = maybeEndpoint.get();
                 final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config);
-                actionRegs.put(e.getKey(),
-                        actionProviderService.registerActionImplementation(impl, endpoint.getActions()));
+                prevReg = actionRegs.put(e.getKey(), actionProviderService.registerActionImplementation(impl,
+                    endpoint.getActions()));
             } else {
             } else {
-                actionRegs.remove(e.getKey());
+                prevReg = actionRegs.remove(e.getKey());
+            }
+
+            if (prevReg != null) {
+                prevRegs.add(prevReg);
             }
         }
             }
         }
+
+        prevRegs.forEach(ObjectRegistration::close);
     }
 }
     }
 }
index 59db730d0152be3289361960bd3da96a98e1ccc8..1bbb22ed7a02874a9bd1acf2e893736ff4ee7d01 100644 (file)
@@ -154,6 +154,8 @@ public class OpsRegistrarTest {
         inOrder.verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
                 eq(secondEndpoint.getRpcs()));
 
         inOrder.verify(rpcService).registerRpcImplementation(any(RemoteRpcImplementation.class),
                 eq(secondEndpoint.getRpcs()));
 
+        // verify first registration is closed
+        inOrder.verify(oldReg).close();
 
         verifyNoMoreInteractions(rpcService, oldReg, newReg);
     }
 
         verifyNoMoreInteractions(rpcService, oldReg, newReg);
     }
@@ -175,7 +177,7 @@ public class OpsRegistrarTest {
                 eq(secondActionEndpoint.getActions()));
 
         // verify first registration is closed
                 eq(secondActionEndpoint.getActions()));
 
         // verify first registration is closed
-//        inOrder.verify(oldReg).close();
+        inOrder.verify(oldActionReg).close();
 
         verifyNoMoreInteractions(actionService, oldActionReg, newActionReg);
     }
 
         verifyNoMoreInteractions(actionService, oldActionReg, newActionReg);
     }