Improve cleanup after device disconnected event
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / LifecycleServiceImpl.java
index 885438df440cd099277c3ab7f47c517c88680ce8..7522fe54b08f6c1d8339a449e3d4cb2b3dbe3e95 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.impl.lifecycle;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -21,8 +22,11 @@ import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvid
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
@@ -33,57 +37,43 @@ import org.slf4j.LoggerFactory;
 public class LifecycleServiceImpl implements LifecycleService {
 
     private static final Logger LOG = LoggerFactory.getLogger(LifecycleServiceImpl.class);
-
-    private boolean inClosing = false;
     private DeviceContext deviceContext;
     private RpcContext rpcContext;
     private StatisticsContext statContext;
     private ClusterSingletonServiceRegistration registration;
     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
+    private final List<DeviceRemovedHandler> deviceRemovedHandlers = new ArrayList<>();
+    private volatile CONTEXT_STATE state = CONTEXT_STATE.INITIALIZATION;
 
 
     @Override
     public void instantiateServiceInstance() {
-        LOG.info("Starting clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
+        LOG.info("Starting clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
 
-        if (!this.clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
-            this.closeConnection();
+        if (!clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
+            closeConnection();
         }
-
     }
 
     @Override
     public ListenableFuture<Void> closeServiceInstance() {
-        LOG.info("Closing clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
-
         final boolean connectionInterrupted =
                 this.deviceContext
                         .getPrimaryConnectionContext()
                         .getConnectionState()
                         .equals(ConnectionContext.CONNECTION_STATE.RIP);
 
-        // If connection was interrupted and we are not trying to close service, then we received something
-        // we do not wanted to receive, so do not continue
-        if (connectionInterrupted && !inClosing) {
-            LOG.warn("Failed to close clustering MASTER services for node {} because they are already closed",
-                    LifecycleServiceImpl.this.deviceContext.getDeviceInfo().getLOGValue());
-
-            return Futures.immediateCancelledFuture();
-        }
-
         // Chain all jobs that will stop our services
         final List<ListenableFuture<Void>> futureList = new ArrayList<>();
         futureList.add(statContext.stopClusterServices(connectionInterrupted));
         futureList.add(rpcContext.stopClusterServices(connectionInterrupted));
         futureList.add(deviceContext.stopClusterServices(connectionInterrupted));
 
-        // When we stopped all jobs then we are not in closing state anymore (at least from plugin perspective)
         return Futures.transform(Futures.successfulAsList(futureList), new Function<List<Void>, Void>() {
             @Nullable
             @Override
             public Void apply(@Nullable List<Void> input) {
-                LOG.debug("Closed clustering MASTER services for node {}",
-                        LifecycleServiceImpl.this.deviceContext.getDeviceInfo().getLOGValue());
+                LOG.debug("Closed clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
                 return null;
             }
         });
@@ -91,33 +81,76 @@ public class LifecycleServiceImpl implements LifecycleService {
 
     @Override
     public ServiceGroupIdentifier getIdentifier() {
+        return getServiceIdentifier();
+    }
+
+    @Override
+    public CONTEXT_STATE getState() {
+        return this.state;
+    }
+
+    @Override
+    public ServiceGroupIdentifier getServiceIdentifier() {
         return deviceContext.getServiceIdentifier();
     }
 
+    @Override
+    public DeviceInfo getDeviceInfo() {
+        return deviceContext.getDeviceInfo();
+    }
 
     @Override
-    public void close() throws Exception {
-        // If we are still registered and we are not already closing, then close the registration
-        if (Objects.nonNull(registration) && !inClosing) {
-            inClosing = true;
-            registration.close();
-            registration = null;
+    public void close() {
+        if (CONTEXT_STATE.TERMINATION.equals(getState())){
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("LifecycleService is already in TERMINATION state.");
+            }
+        } else {
+            this.state = CONTEXT_STATE.TERMINATION;
+
+            // We are closing, so cleanup all managers now
+            deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(getDeviceInfo()));
+
+            // If we are still registered and we are not already closing, then close the registration
+            if (Objects.nonNull(registration)) {
+                try {
+                    LOG.debug("Closing clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
+                    registration.close();
+                } catch (Exception e) {
+                    LOG.debug("Failed to close clustering MASTER services for node {} with exception: ",
+                            getDeviceInfo().getLOGValue(), e);
+                }
+            }
         }
     }
 
     @Override
     public void registerService(final ClusterSingletonServiceProvider singletonServiceProvider) {
-        LOG.info("Registering clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
+        LOG.debug("Registered clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
 
-        //lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
+        // lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
         this.clusterInitializationPhaseHandler = deviceContext;
         this.deviceContext.setLifecycleInitializationPhaseHandler(this.statContext);
         this.statContext.setLifecycleInitializationPhaseHandler(this.rpcContext);
         this.rpcContext.setLifecycleInitializationPhaseHandler(this);
         //Set initial submit handler
         this.statContext.setInitialSubmitHandler(this.deviceContext);
-        //Register cluster singleton service
-        this.registration = singletonServiceProvider.registerClusterSingletonService(this);
+
+        // Register cluster singleton service
+        try {
+            this.registration = Verify.verifyNotNull(singletonServiceProvider.registerClusterSingletonService(this));
+            LOG.info("Registered clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
+        } catch (Exception e) {
+            LOG.warn("Failed to register cluster singleton service for node {}, with exception: {}", getDeviceInfo(), e);
+            closeConnection();
+        }
+    }
+
+    @Override
+    public void registerDeviceRemovedHandler(final DeviceRemovedHandler deviceRemovedHandler) {
+        if (!deviceRemovedHandlers.contains(deviceRemovedHandler)) {
+            deviceRemovedHandlers.add(deviceRemovedHandler);
+        }
     }
 
     @Override
@@ -142,6 +175,10 @@ public class LifecycleServiceImpl implements LifecycleService {
 
     @Override
     public void closeConnection() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Closing connection for node {}.", getDeviceInfo().getLOGValue());
+        }
+
         this.deviceContext.shutdownConnection();
     }
 
@@ -157,11 +194,11 @@ public class LifecycleServiceImpl implements LifecycleService {
 
     @Override
     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
-
-        if (ConnectionContext.CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
+        if (CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Connection to the device {} was interrupted.", this.deviceContext.getDeviceInfo().getLOGValue());
+                LOG.debug("Connection to the device {} was interrupted.", getDeviceInfo().getLOGValue());
             }
+
             return false;
         }
 
@@ -172,7 +209,7 @@ public class LifecycleServiceImpl implements LifecycleService {
     private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
         private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
 
-        public DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
+        DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
         }
 
@@ -195,7 +232,7 @@ public class LifecycleServiceImpl implements LifecycleService {
                         .filter(Objects::nonNull)
                         .count();
 
-                LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceContext.getDeviceInfo().getLOGValue());
+                LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, getDeviceInfo().getLOGValue());
             }
         }
 
@@ -203,10 +240,10 @@ public class LifecycleServiceImpl implements LifecycleService {
         public void onFailure(Throwable t) {
             if (deviceFlowRegistryFill.isCancelled()) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceContext.getDeviceInfo().getLOGValue());
+                    LOG.debug("Cancelled filling flow registry with flows for node: {}", getDeviceInfo().getLOGValue());
                 }
             } else {
-                LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceContext.getDeviceInfo().getLOGValue(), t);
+                LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", getDeviceInfo().getLOGValue(), t);
             }
         }
     }