Make sure registrations are closed
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / OpsRegistrar.java
index fdda197..a16bbd4 100644 (file)
@@ -11,12 +11,13 @@ import static java.util.Objects.requireNonNull;
 
 import akka.actor.Address;
 import akka.actor.Props;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.registry.ActionRegistry;
 import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
@@ -54,7 +55,9 @@ final class OpsRegistrar extends AbstractUntypedActor {
 
     @Override
     public void postStop() throws Exception {
+        rpcRegs.values().forEach(ObjectRegistration::close);
         rpcRegs.clear();
+        actionRegs.values().forEach(ObjectRegistration::close);
         actionRegs.clear();
 
         super.postStop();
@@ -82,26 +85,34 @@ final class OpsRegistrar extends AbstractUntypedActor {
          * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
          * hence we register all new implementations before closing all registrations.
          */
+        final Collection<ObjectRegistration<?>> prevRegs = new ArrayList<>(rpcEndpoints.size());
+
         for (Entry<Address, Optional<RemoteRpcEndpoint>> e : rpcEndpoints.entrySet()) {
             LOG.debug("Updating RPC registrations for {}", e.getKey());
 
+            final ObjectRegistration<DOMRpcImplementation> prevReg;
             final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
             if (maybeEndpoint.isPresent()) {
                 final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
                 final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
-                rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
-                        endpoint.getRpcs()));
+                prevReg = rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
+                    endpoint.getRpcs()));
             } else {
-                rpcRegs.remove(e.getKey());
+                prevReg = rpcRegs.remove(e.getKey());
+            }
+
+            if (prevReg != null) {
+                prevRegs.add(prevReg);
             }
         }
+
+        prevRegs.forEach(ObjectRegistration::close);
     }
 
     /**
      * Updates the action endpoints, Adding new registrations first before removing previous registrations.
      */
-    private void updateRemoteActionEndpoints(final Map<Address,
-            Optional<ActionRegistry.RemoteActionEndpoint>> actionEndpoints) {
+    private void updateRemoteActionEndpoints(final Map<Address, Optional<RemoteActionEndpoint>> actionEndpoints) {
         /*
          * Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close
          * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
@@ -110,18 +121,27 @@ final class OpsRegistrar extends AbstractUntypedActor {
          * Note that when an Action moves from one remote node to another, we also do not want to expose the gap,
          * hence we register all new implementations before closing all registrations.
          */
+        final Collection<ObjectRegistration<?>> prevRegs = new ArrayList<>(actionEndpoints.size());
+
         for (Entry<Address, Optional<RemoteActionEndpoint>> e : actionEndpoints.entrySet()) {
-            LOG.debug("Updating Action registrations for {}", e.getKey());
+            LOG.debug("Updating action registrations for {}", e.getKey());
 
+            final ObjectRegistration<DOMActionImplementation> prevReg;
             final Optional<RemoteActionEndpoint> maybeEndpoint = e.getValue();
             if (maybeEndpoint.isPresent()) {
                 final RemoteActionEndpoint endpoint = maybeEndpoint.get();
                 final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config);
-                actionRegs.put(e.getKey(),
-                        actionProviderService.registerActionImplementation(impl, endpoint.getActions()));
+                prevReg = actionRegs.put(e.getKey(), actionProviderService.registerActionImplementation(impl,
+                    endpoint.getActions()));
             } else {
-                actionRegs.remove(e.getKey());
+                prevReg = actionRegs.remove(e.getKey());
+            }
+
+            if (prevReg != null) {
+                prevRegs.add(prevReg);
             }
         }
+
+        prevRegs.forEach(ObjectRegistration::close);
     }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.