Bug-4957 Cluster Role change fix
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
index c9a17b66ff0ad51d79760821b3823975eaf4db1e..70bd4532cce3e29254d2b34225474eca487ae99f 100644 (file)
@@ -9,7 +9,9 @@ package org.opendaylight.openflowplugin.impl.device;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -33,6 +35,7 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
@@ -58,6 +61,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRe
 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
+import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
@@ -71,6 +75,7 @@ import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
+import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
 import org.opendaylight.openflowplugin.impl.util.MdSalRegistratorUtils;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
@@ -148,6 +153,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private RpcContext rpcContext;
     private ExtensionConverterProvider extensionConverterProvider;
 
+    private final boolean switchFeaturesMandatory;
+    private StatisticsContext statCtx;
+
 
     @VisibleForTesting
     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
@@ -156,7 +164,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
                       @Nonnull final HashedWheelTimer hashedWheelTimer,
                       @Nonnull final MessageSpy _messageSpy,
                       @Nonnull final OutboundQueueProvider outboundQueueProvider,
-            @Nonnull final TranslatorLibrary translatorLibrary) {
+                      @Nonnull final TranslatorLibrary translatorLibrary,
+                      final boolean switchFeaturesMandatory) {
+        this.switchFeaturesMandatory = switchFeaturesMandatory;
         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
         this.deviceState = Preconditions.checkNotNull(deviceState);
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -198,7 +208,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public Long getReservedXid() {
+    public Long reservedXidForDeviceMessage() {
         return outboundQueueProvider.reserveEntry();
     }
 
@@ -228,35 +238,99 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public ListenableFuture<Void> onClusterRoleChange(@CheckForNull final OfpRole role) {
-        LOG.debug("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
+    public ListenableFuture<Void> onClusterRoleChange(final OfpRole oldRole, @CheckForNull final OfpRole role) {
+        LOG.trace("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
         Preconditions.checkArgument(role != null);
-        if (rpcContext != null) {
-            // TODO : implement interface for onClusterRoleChange method
-            MdSalRegistratorUtils.registerServices(rpcContext, this, role);
+        if (role.equals(oldRole)) {
+            LOG.debug("Demanded role change for device {} is not change OldRole: {}, NewRole {}", deviceState.getNodeId(), oldRole, role);
+            return Futures.immediateFuture(null);
         }
-
-        final ListenableFuture<Void> nextStepFuture;
         if (OfpRole.BECOMEMASTER.equals(role)) {
-            transactionChainManager.activateTransactionManager();
-            nextStepFuture = Futures.immediateCheckedFuture(null);
-            getDeviceState().setRole(role);
+            if (!deviceState.deviceSynchronized()) {
+                LOG.debug("Setup Device Ctx {} for Master Role", getDeviceState().getNodeId());
+                transactionChainManager.activateTransactionManager();
+                return Futures.immediateCheckedFuture(null);
+            }
+            /* Relevant for no initial Slave-to-Master scenario in cluster */
+            return asyncClusterRoleChange(role);
         } else if (OfpRole.BECOMESLAVE.equals(role)) {
-            nextStepFuture = transactionChainManager.deactivateTransactionManager();
-            Futures.transform(nextStepFuture, new Function<Void, Void>() {
-                @Nullable
-                @Override
-                public Void apply(@Nullable final Void aVoid) {
-                    getDeviceState().setRole(role);
-                    return null;
-                }
-            });
+            if (rpcContext != null) {
+                MdSalRegistratorUtils.registerSlaveServices(rpcContext, role);
+            }
+            return transactionChainManager.deactivateTransactionManager();
         } else {
-            LOG.warn("Unknow OFCluster Role {} for Node {}", role, deviceState.getNodeId());
-            nextStepFuture = Futures.immediateCheckedFuture(null);
+            LOG.warn("Unknown OFCluster Role {} for Node {}", role, deviceState.getNodeId());
+            if (rpcContext != null) {
+                MdSalRegistratorUtils.unregisterServices(rpcContext);
+            }
+            return transactionChainManager.deactivateTransactionManager();
+        }
+    }
+
+    /*
+     * we don't have active TxManager so anything will not be stored to DS yet, but we have
+     * check all NodeInformation for statistics otherwise statistics will not contains
+     * all possible MultipartTypes for polling in StatTypeList
+     */
+    private ListenableFuture<Void> asyncClusterRoleChange(final OfpRole role) {
+        if (statCtx == null) {
+            final String errMsg = String.format("DeviceCtx {} is up but we are missing StatisticsContext", deviceState.getNodeId());
+            LOG.warn(errMsg);
+            return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
         }
 
-        return nextStepFuture;
+        final InstanceIdentifier<FlowCapableNode> ofNodeII = deviceState.getNodeInstanceIdentifier()
+                .augmentation(FlowCapableNode.class);
+        final ReadOnlyTransaction readTx = getReadTransaction();
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readOfNodeFuture = readTx.read(
+                LogicalDatastoreType.OPERATIONAL, ofNodeII);
+
+        final ListenableFuture<Void> nodeInitInfoFuture = Futures.transform(readOfNodeFuture,
+                new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+                    @Override
+                    public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) throws Exception {
+                        if (!input.isPresent() || input.get().getTable().isEmpty()) {
+                            /* Last master close fail scenario so we would like to activate TxManager */
+                            LOG.debug("Operational DS for Device {} has to be replaced", deviceState.getNodeId());
+                            getDeviceState().setDeviceSynchronized(false);
+                            transactionChainManager.activateTransactionManager();
+                        }
+                        return DeviceInitializationUtils.initializeNodeInformation(DeviceContextImpl.this, switchFeaturesMandatory);
+                    }
+                });
+
+        final ListenableFuture<Boolean> statPollFuture = Futures.transform(nodeInitInfoFuture,
+                new AsyncFunction<Void, Boolean>() {
+
+                    @Override
+                    public ListenableFuture<Boolean> apply(final Void input) throws Exception {
+                        getStatisticsContext().statListForCollectingInitialization();
+                        if (getDeviceState().deviceSynchronized()) {
+                            return Futures.immediateFuture(Boolean.TRUE);
+                        }
+                        return getStatisticsContext().gatherDynamicData();
+                    }
+                });
+
+        return Futures.transform(statPollFuture, new Function<Boolean, Void>() {
+
+            @Override
+            public Void apply(final Boolean input) {
+                if (!input.booleanValue()) {
+                    LOG.warn("Get Initial Device {} information fails", getDeviceState().getNodeId());
+                    DeviceContextImpl.this.close();
+                    return null;
+                }
+                LOG.debug("Get Initial Device {} information is successful", getDeviceState().getNodeId());
+                if (null != getRpcContext()) {
+                    MdSalRegistratorUtils.registerMasterServices(getRpcContext(), DeviceContextImpl.this, role);
+                }
+                getDeviceState().setDeviceSynchronized(true);
+                getDeviceState().setStatisticsPollingEnabledProp(true);
+                transactionChainManager.activateTransactionManager();
+                return null;
+            }
+        });
     }
 
     @Override
@@ -482,13 +556,13 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
                 @Override
                 public void onSuccess(final Void result) {
-                    LOG.info("Delete Node {} was successfull.", deviceState.getNodeId());
+                    LOG.info("TxChain {} was shutdown successfull.", deviceState.getNodeId());
                     tearDownClean();
                 }
 
                 @Override
                 public void onFailure(final Throwable t) {
-                    LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
+                    LOG.warn("Shutdown TxChain {} fail.", deviceState.getNodeId(), t);
                     tearDownClean();
                 }
             });
@@ -609,4 +683,14 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     public ExtensionConverterProvider getExtensionConverterProvider() {
         return extensionConverterProvider;
     }
+
+    @Override
+    public void setStatisticsContext(final StatisticsContext statisticsContext) {
+        this.statCtx = statisticsContext;
+    }
+
+    @Override
+    public StatisticsContext getStatisticsContext() {
+        return statCtx;
+    }
 }