Guard lifecycle of contexts
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / ContextChainImpl.java
index 153945c307746a327b54d333f1743fbe10d9340b..094338dd5639faeca0fd6a564460f2eaa3b9504b 100644 (file)
@@ -7,8 +7,6 @@
  */
 package org.opendaylight.openflowplugin.impl.lifecycle;
 
-import static org.opendaylight.openflowplugin.api.openflow.OFPContext.ContextState;
-
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -18,12 +16,12 @@ import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
-import org.opendaylight.openflowplugin.api.ConnectionException;
 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -34,6 +32,7 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMaster
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -42,22 +41,22 @@ import org.slf4j.LoggerFactory;
 
 public class ContextChainImpl implements ContextChain {
     private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class);
+
     private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false);
     private final AtomicBoolean initialGathering = new AtomicBoolean(false);
     private final AtomicBoolean initialSubmitting = new AtomicBoolean(false);
     private final AtomicBoolean registryFilling = new AtomicBoolean(false);
     private final AtomicBoolean rpcRegistration = new AtomicBoolean(false);
     private final List<DeviceRemovedHandler> deviceRemovedHandlers = new CopyOnWriteArrayList<>();
-    private final List<OFPContext> contexts = new CopyOnWriteArrayList<>();
+    private final List<GuardedContext> contexts = new CopyOnWriteArrayList<>();
     private final List<ConnectionContext> auxiliaryConnections = new CopyOnWriteArrayList<>();
     private final ExecutorService executorService;
     private final ContextChainMastershipWatcher contextChainMastershipWatcher;
     private final DeviceInfo deviceInfo;
     private final ConnectionContext primaryConnection;
+    private final AtomicReference<ContextChainState> contextChainState =
+            new AtomicReference<>(ContextChainState.UNDEFINED);
     private AutoCloseable registration;
-    private ContextState state = ContextState.INITIALIZATION;
-
-    private volatile ContextChainState contextChainState = ContextChainState.UNDEFINED;
 
     ContextChainImpl(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher,
                      @Nonnull final ConnectionContext connectionContext,
@@ -70,7 +69,7 @@ public class ContextChainImpl implements ContextChain {
 
     @Override
     public <T extends OFPContext> void addContext(@Nonnull final T context) {
-        contexts.add(context);
+        contexts.add(new GuardedContextImpl(context));
     }
 
     @Override
@@ -78,11 +77,11 @@ public class ContextChainImpl implements ContextChain {
         LOG.info("Starting clustering services for node {}", deviceInfo);
 
         try {
-            contexts.forEach(this::initializeContextService);
+            contexts.forEach(OFPContext::instantiateServiceInstance);
             LOG.info("Started clustering services for node {}", deviceInfo);
         } catch (final Exception ex) {
             executorService.submit(() -> contextChainMastershipWatcher
-                    .onNotAbleToStartMastershipMandatory(deviceInfo, ex.getMessage()));
+                    .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString()));
         }
     }
 
@@ -92,9 +91,9 @@ public class ContextChainImpl implements ContextChain {
         contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
 
         final ListenableFuture<List<Void>> servicesToBeClosed = Futures
-                .successfulAsList(Lists.reverse(contexts)
+                .allAsList(Lists.reverse(contexts)
                         .stream()
-                        .map(this::closeContextService)
+                        .map(OFPContext::closeServiceInstance)
                         .collect(Collectors.toList()));
 
         return Futures.transform(servicesToBeClosed, (input) -> {
@@ -111,12 +110,12 @@ public class ContextChainImpl implements ContextChain {
 
     @Override
     public void close() {
-        if (ContextState.TERMINATION.equals(state)) {
+        if (ContextChainState.CLOSED.equals(contextChainState.get())) {
             LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
             return;
         }
 
-        state = ContextState.TERMINATION;
+        contextChainState.set(ContextChainState.CLOSED);
         contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
 
         // Close all connections to devices
@@ -149,13 +148,12 @@ public class ContextChainImpl implements ContextChain {
     @Override
     public void makeContextChainStateSlave() {
         unMasterMe();
-        changeState(ContextChainState.WORKING_SLAVE);
+        changeMastershipState(ContextChainState.WORKING_SLAVE);
     }
 
     @Override
     public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
         LOG.info("Registering clustering services for node {}", deviceInfo);
-        state = ContextState.WORKING;
         registration = Objects.requireNonNull(clusterSingletonServiceProvider
                 .registerClusterSingletonService(this));
         LOG.info("Registered clustering services for node {}", deviceInfo);
@@ -165,15 +163,12 @@ public class ContextChainImpl implements ContextChain {
     public void makeDeviceSlave() {
         unMasterMe();
 
-        contexts.stream()
-                .filter(DeviceContext.class::isInstance)
-                .map(DeviceContext.class::cast)
-                .findAny()
-                .ifPresent(deviceContext -> Futures
-                        .addCallback(
-                                deviceContext.makeDeviceSlave(),
-                                new DeviceSlaveCallback(),
-                                executorService));
+        contexts.forEach(context -> {
+            if (context.map(DeviceContext.class::isInstance)) {
+                Futures.addCallback(context.map(DeviceContext.class::cast).makeDeviceSlave(),
+                        new DeviceSlaveCallback(), executorService);
+            }
+        });
     }
 
     @Override
@@ -212,7 +207,7 @@ public class ContextChainImpl implements ContextChain {
             LOG.info("Device {} is able to work as master{}",
                     deviceInfo,
                     registryFilling.get() ? "." : " WITHOUT flow registry !!!");
-            changeState(ContextChainState.WORKING_MASTER);
+            changeMastershipState(ContextChainState.WORKING_MASTER);
         }
 
         return result;
@@ -220,7 +215,7 @@ public class ContextChainImpl implements ContextChain {
 
     @Override
     public boolean isClosing() {
-        return ContextState.TERMINATION.equals(state);
+        return ContextChainState.CLOSED.equals(contextChainState.get());
     }
 
     @Override
@@ -232,13 +227,15 @@ public class ContextChainImpl implements ContextChain {
 
     @Override
     public boolean continueInitializationAfterReconciliation() {
-        return contexts.stream()
-                .filter(StatisticsContext.class::isInstance)
-                .map(StatisticsContext.class::cast)
-                .findAny()
-                .map(StatisticsContext::initialSubmitAfterReconciliation)
-                .orElse(false) &&
-        isMastered(ContextChainMastershipState.INITIAL_SUBMIT);
+        final AtomicBoolean initialSubmit = new AtomicBoolean(false);
+
+        contexts.forEach(context -> {
+            if (context.map(StatisticsContext.class::isInstance)) {
+                initialSubmit.set(context.map(StatisticsContext.class::cast).initialSubmitAfterReconciliation());
+            }
+        });
+
+        return initialSubmit.get() && isMastered(ContextChainMastershipState.INITIAL_SUBMIT);
     }
 
     @Override
@@ -258,39 +255,21 @@ public class ContextChainImpl implements ContextChain {
         deviceRemovedHandlers.add(deviceRemovedHandler);
     }
 
-    private void changeState(final ContextChainState contextChainState) {
-        boolean propagate = this.contextChainState == ContextChainState.UNDEFINED;
-        this.contextChainState = contextChainState;
-
-        if (propagate) {
-            contexts.stream()
-                    .filter(ContextChainStateListener.class::isInstance)
-                    .map(ContextChainStateListener.class::cast)
-                    .forEach(listener -> listener.onStateAcquired(contextChainState));
-        }
-    }
-
-    private void initializeContextService(final OFPContext context) {
-        if (ConnectionContext.CONNECTION_STATE.WORKING.equals(primaryConnection.getConnectionState())) {
-            context.instantiateServiceInstance();
-        } else {
-            LOG.warn("Device connection for node {} doesn't exist anymore. Primary connection status: {}",
-                    deviceInfo,
-                    primaryConnection.getConnectionState());
+    private void changeMastershipState(final ContextChainState contextChainState) {
+        if (ContextChainState.CLOSED.equals(this.contextChainState.get())) {
+            return;
         }
-    }
 
-    private ListenableFuture<Void> closeContextService(final OFPContext context) {
-        if (ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState())) {
-            final String errMsg = String
-                    .format("Device connection for node %s doesn't exist anymore. Primary connection status: %s",
-                            deviceInfo.toString(),
-                            primaryConnection.getConnectionState());
+        boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get());
+        this.contextChainState.set(contextChainState);
 
-            return Futures.immediateFailedFuture(new ConnectionException(errMsg));
+        if (propagate) {
+            contexts.forEach(context -> {
+                if (context.map(ContextChainStateListener.class::isInstance)) {
+                    context.map(ContextChainStateListener.class::cast).onStateAcquired(contextChainState);
+                }
+            });
         }
-
-        return context.closeServiceInstance();
     }
 
     private void unMasterMe() {