Merge "Bug-4957 Cluster Role change fix"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleManagerImpl.java
index adf584e4b0936430df08ef5444da8506e32a3899..3ed9c45390a94cdd79a508e318b0421ebe6906af 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.openflowplugin.impl.role;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
@@ -35,7 +34,6 @@ import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipS
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
@@ -83,11 +81,6 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
     @Override
     public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
         LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
-        if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
-            // Roles are not supported before OF1.3, so move forward.
-            deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
-            return;
-        }
 
         final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
                 makeEntity(deviceContext.getDeviceState().getNodeId()),
@@ -153,9 +146,22 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
             LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId);
             final Optional<EntityOwnershipState> actState = entityOwnershipService.getOwnershipState(entity);
             if (actState.isPresent()) {
-                if (!actState.get().isOwner()) {
+                if (actState.get().isOwner()) {
+                    if (!txContexts.containsKey(roleContext.getTxEntity())) {
+                        try {
+                            txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext);
+                            roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
+                            roleContext.setupTxCandidate();
+                            // we'd like to wait for registration response
+                            return;
+                        } catch (final CandidateAlreadyRegisteredException e) {
+                            // NOOP
+                        }
+                    }
+                } else {
                     LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId);
                     contexts.remove(entity, roleContext);
+                    // TODO : is there a chance to have TxEntity ?
                 }
             } else {
                 LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity);
@@ -175,21 +181,26 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
     @Override
     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
         Preconditions.checkArgument(ownershipChange != null);
+        RoleContext roleCtxForClose = null;
         try {
             final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
             if (roleContext != null) {
+                roleCtxForClose = roleContext;
                 changeForEntity(ownershipChange, roleContext);
                 return;
             }
 
             final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
             if (txRoleContext != null) {
+                roleCtxForClose = txRoleContext;
                 changeForTxEntity(ownershipChange, txRoleContext);
                 return;
             }
         } catch (final InterruptedException e) {
             LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
-            // FIXME: consider forcibly closing this connection
+            if (roleCtxForClose != null) {
+                roleCtxForClose.close();
+            }
         }
 
         LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
@@ -207,6 +218,14 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
         final DeviceContext deviceContext = roleContext.getDeviceContext();
         final NodeId nodeId = roleContext.getDeviceState().getNodeId();
 
+        if (!roleContext.getDeviceState().isValid()
+                && RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) {
+            LOG.debug("Node {} ownership changed during closing process", roleContext.getDeviceState().getNodeId());
+            roleContext.close();
+            txCandidateGuard.release();
+            return;
+        }
+
         if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
             // SLAVE -> MASTER - acquired transition lock
             LOG.debug("Acquired tx-lock for entity {}", ownershipChange.getEntity());
@@ -316,21 +335,19 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
                 try {
                     LOG.debug("Node {} is marked as LEADER", nodeId);
                     Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
-                            "RoleCtx for TxEntity {} master Node {} is still not closed.",
-                            roleContext.getTxEntity(), nodeId);
+                            "RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId);
                     roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
 
                     // try to register tx-candidate via ownership service
                     roleContext.setupTxCandidate();
                 } catch (final CandidateAlreadyRegisteredException e) {
-                    LOG.debug("txCandidate registration failed");
+                    LOG.warn("txCandidate registration failed {}", roleContext.getDeviceState().getNodeId(), e);
                     // --- CLEAN UP ---
                     // withdraw context from map in order to have it as before
                     txContexts.remove(roleContext.getTxEntity(), roleContext);
                     // no more propagating any role - there is no txCandidate lock approaching
                     roleContext.setPropagatingRole(null);
-
-                    Throwables.propagate(e);
+                    roleContext.getDeviceContext().close();
                 }
                 return null;
             }
@@ -341,7 +358,6 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
         final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard();
         LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
         mainCandidateGuard.acquire();
-        //FIXME : check again implementation for double candidate scenario
         LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
 
         if (roleContext.getDeviceState().isValid()) {
@@ -361,9 +377,14 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
                 } else {
                     txProcessCallback = null;
                 }
-            } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.isOwner()) {
+            } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) {
                 // SLAVE -> MASTER
                 txProcessCallback = makeTxEntitySetupCallback(roleContext);
+            } else if (!ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
+                if (RoleContext.ROLE_CONTEXT_STATE.STARTING.equals(roleContext.getState())) {
+                    rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
+                }
+                txProcessCallback = null;
             } else {
                 LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
                 txProcessCallback = null;
@@ -375,22 +396,22 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
 
             // catching result
             Futures.addCallback(rolePropagationFx, new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(@Nullable final Void aVoid) {
-                            LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
-                                    ownershipChange.getEntity(), oldRole, newRole);
-                            mainCandidateGuard.release();
-                        }
+                @Override
+                public void onSuccess(@Nullable final Void aVoid) {
+                    LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
+                            ownershipChange.getEntity(), oldRole, newRole);
+                    roleContext.setPropagatingRole(newRole);
+                    mainCandidateGuard.release();
+                }
 
-                        @Override
-                        public void onFailure(final Throwable throwable) {
-                            LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
-                                    ownershipChange.getEntity(), oldRole, newRole);
-                            mainCandidateGuard.release();
-                            roleContext.getDeviceContext().close();
-                        }
-                    }
-            );
+                @Override
+                public void onFailure(final Throwable throwable) {
+                    LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
+                            ownershipChange.getEntity(), oldRole, newRole);
+                    mainCandidateGuard.release();
+                    roleContext.getDeviceContext().close();
+                }
+            });
 
         } else {
             LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
@@ -430,22 +451,23 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
         return delFuture;
     }
 
-    private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleChangeListener roleChangeListener) {
+
+    private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
         LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
-        Futures.addCallback(removeDeviceFromOperDS(roleChangeListener), new FutureCallback<Void>() {
+        Futures.addCallback(removeDeviceFromOperDS(roleContext), new FutureCallback<Void>() {
             @Override
             public void onSuccess(@Nullable final Void aVoid) {
-                LOG.debug("Freeing roleContext slot for device: {}", roleChangeListener.getDeviceState().getNodeId());
-                contexts.remove(ownershipChange.getEntity(), roleChangeListener);
-                ((RoleContext) roleChangeListener).suspendTxCandidate();
+                LOG.debug("Freeing roleContext slot for device: {}", roleContext.getDeviceState().getNodeId());
+                contexts.remove(ownershipChange.getEntity(), roleContext);
+                roleContext.suspendTxCandidate();
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
-                LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleChangeListener.getDeviceState()
+                LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleContext.getDeviceState()
                         .getNodeId(), throwable.getMessage());
-                contexts.remove(ownershipChange.getEntity(), roleChangeListener);
-                ((RoleContext) roleChangeListener).suspendTxCandidate();
+                contexts.remove(ownershipChange.getEntity(), roleContext);
+                roleContext.suspendTxCandidate();
             }
         });
     }