Guard lifecycle of contexts
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / ContextChainImpl.java
index 6a17898ea5675a492b025b66477715b570a4c036..094338dd5639faeca0fd6a564460f2eaa3b9504b 100644 (file)
@@ -7,8 +7,6 @@
  */
 package org.opendaylight.openflowplugin.impl.lifecycle;
 
-import static org.opendaylight.openflowplugin.api.openflow.OFPContext.ContextState;
-
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -16,10 +14,9 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -35,6 +32,7 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMaster
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -43,23 +41,22 @@ import org.slf4j.LoggerFactory;
 
 public class ContextChainImpl implements ContextChain {
     private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class);
+
     private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false);
     private final AtomicBoolean initialGathering = new AtomicBoolean(false);
     private final AtomicBoolean initialSubmitting = new AtomicBoolean(false);
     private final AtomicBoolean registryFilling = new AtomicBoolean(false);
     private final AtomicBoolean rpcRegistration = new AtomicBoolean(false);
     private final List<DeviceRemovedHandler> deviceRemovedHandlers = new CopyOnWriteArrayList<>();
-    private final List<OFPContext> contexts = new CopyOnWriteArrayList<>();
+    private final List<GuardedContext> contexts = new CopyOnWriteArrayList<>();
     private final List<ConnectionContext> auxiliaryConnections = new CopyOnWriteArrayList<>();
     private final ExecutorService executorService;
     private final ContextChainMastershipWatcher contextChainMastershipWatcher;
     private final DeviceInfo deviceInfo;
     private final ConnectionContext primaryConnection;
+    private final AtomicReference<ContextChainState> contextChainState =
+            new AtomicReference<>(ContextChainState.UNDEFINED);
     private AutoCloseable registration;
-    private ContextState state = ContextState.INITIALIZATION;
-    private Future<?> initFuture;
-
-    private volatile ContextChainState contextChainState = ContextChainState.UNDEFINED;
 
     ContextChainImpl(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher,
                      @Nonnull final ConnectionContext connectionContext,
@@ -72,25 +69,20 @@ public class ContextChainImpl implements ContextChain {
 
     @Override
     public <T extends OFPContext> void addContext(@Nonnull final T context) {
-        contexts.add(context);
+        contexts.add(new GuardedContextImpl(context));
     }
 
     @Override
     public void instantiateServiceInstance() {
         LOG.info("Starting clustering services for node {}", deviceInfo);
 
-        initFuture = executorService.submit(() -> {
-            try {
-                contexts.forEach(context -> {
-                    if (ConnectionContext.CONNECTION_STATE.WORKING.equals(primaryConnection.getConnectionState())) {
-                        context.instantiateServiceInstance();
-                    }
-                });
-                LOG.info("Started clustering services for node {}", deviceInfo);
-            } catch (final Exception ex) {
-                contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo, ex.getMessage());
-            }
-        });
+        try {
+            contexts.forEach(OFPContext::instantiateServiceInstance);
+            LOG.info("Started clustering services for node {}", deviceInfo);
+        } catch (final Exception ex) {
+            executorService.submit(() -> contextChainMastershipWatcher
+                    .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString()));
+        }
     }
 
     @Override
@@ -99,7 +91,7 @@ public class ContextChainImpl implements ContextChain {
         contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
 
         final ListenableFuture<List<Void>> servicesToBeClosed = Futures
-                .successfulAsList(Lists.reverse(contexts)
+                .allAsList(Lists.reverse(contexts)
                         .stream()
                         .map(OFPContext::closeServiceInstance)
                         .collect(Collectors.toList()));
@@ -118,35 +110,19 @@ public class ContextChainImpl implements ContextChain {
 
     @Override
     public void close() {
-        if (ContextState.TERMINATION.equals(state)) {
+        if (ContextChainState.CLOSED.equals(contextChainState.get())) {
             LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
             return;
         }
 
-        state = ContextState.TERMINATION;
+        contextChainState.set(ContextChainState.CLOSED);
         contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
 
-        // If we somehow have initialization still running, cancel it
-        if (Objects.nonNull(initFuture)) {
-            if (!initFuture.isCancelled() && !initFuture.isDone()) {
-                LOG.info("Waiting for finishing the running initialization process for node {}", deviceInfo);
-
-                try {
-                    initFuture.get();
-                } catch (InterruptedException | ExecutionException e) {
-                    LOG.warn("Failed to await running initialization for node {}: {}", deviceInfo, e);
-                }
-            }
-
-            initFuture = null;
-        }
-
         // Close all connections to devices
         auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false));
         auxiliaryConnections.clear();
         primaryConnection.closeConnection(true);
 
-
         // Close all contexts (device, statistics, rpc)
         contexts.forEach(OFPContext::close);
         contexts.clear();
@@ -172,13 +148,12 @@ public class ContextChainImpl implements ContextChain {
     @Override
     public void makeContextChainStateSlave() {
         unMasterMe();
-        changeState(ContextChainState.WORKING_SLAVE);
+        changeMastershipState(ContextChainState.WORKING_SLAVE);
     }
 
     @Override
     public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
         LOG.info("Registering clustering services for node {}", deviceInfo);
-        state = ContextState.WORKING;
         registration = Objects.requireNonNull(clusterSingletonServiceProvider
                 .registerClusterSingletonService(this));
         LOG.info("Registered clustering services for node {}", deviceInfo);
@@ -188,15 +163,12 @@ public class ContextChainImpl implements ContextChain {
     public void makeDeviceSlave() {
         unMasterMe();
 
-        contexts.stream()
-                .filter(DeviceContext.class::isInstance)
-                .map(DeviceContext.class::cast)
-                .findAny()
-                .ifPresent(deviceContext -> Futures
-                        .addCallback(
-                                deviceContext.makeDeviceSlave(),
-                                new DeviceSlaveCallback(),
-                                executorService));
+        contexts.forEach(context -> {
+            if (context.map(DeviceContext.class::isInstance)) {
+                Futures.addCallback(context.map(DeviceContext.class::cast).makeDeviceSlave(),
+                        new DeviceSlaveCallback(), executorService);
+            }
+        });
     }
 
     @Override
@@ -235,7 +207,7 @@ public class ContextChainImpl implements ContextChain {
             LOG.info("Device {} is able to work as master{}",
                     deviceInfo,
                     registryFilling.get() ? "." : " WITHOUT flow registry !!!");
-            changeState(ContextChainState.WORKING_MASTER);
+            changeMastershipState(ContextChainState.WORKING_MASTER);
         }
 
         return result;
@@ -243,7 +215,7 @@ public class ContextChainImpl implements ContextChain {
 
     @Override
     public boolean isClosing() {
-        return ContextState.TERMINATION.equals(state);
+        return ContextChainState.CLOSED.equals(contextChainState.get());
     }
 
     @Override
@@ -255,12 +227,15 @@ public class ContextChainImpl implements ContextChain {
 
     @Override
     public boolean continueInitializationAfterReconciliation() {
-        return isMastered(ContextChainMastershipState.INITIAL_SUBMIT) && contexts.stream()
-                .filter(StatisticsContext.class::isInstance)
-                .map(StatisticsContext.class::cast)
-                .findAny()
-                .map(StatisticsContext::initialSubmitAfterReconciliation)
-                .orElse(false);
+        final AtomicBoolean initialSubmit = new AtomicBoolean(false);
+
+        contexts.forEach(context -> {
+            if (context.map(StatisticsContext.class::isInstance)) {
+                initialSubmit.set(context.map(StatisticsContext.class::cast).initialSubmitAfterReconciliation());
+            }
+        });
+
+        return initialSubmit.get() && isMastered(ContextChainMastershipState.INITIAL_SUBMIT);
     }
 
     @Override
@@ -280,15 +255,20 @@ public class ContextChainImpl implements ContextChain {
         deviceRemovedHandlers.add(deviceRemovedHandler);
     }
 
-    private void changeState(final ContextChainState contextChainState) {
-        boolean propagate = this.contextChainState == ContextChainState.UNDEFINED;
-        this.contextChainState = contextChainState;
+    private void changeMastershipState(final ContextChainState contextChainState) {
+        if (ContextChainState.CLOSED.equals(this.contextChainState.get())) {
+            return;
+        }
+
+        boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get());
+        this.contextChainState.set(contextChainState);
 
         if (propagate) {
-            contexts.stream()
-                    .filter(ContextChainStateListener.class::isInstance)
-                    .map(ContextChainStateListener.class::cast)
-                    .forEach(listener -> listener.onStateAcquired(contextChainState));
+            contexts.forEach(context -> {
+                if (context.map(ContextChainStateListener.class::isInstance)) {
+                    context.map(ContextChainStateListener.class::cast).onStateAcquired(contextChainState);
+                }
+            });
         }
     }