Bug 5596 Created lifecycle service
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleManagerImpl.java
index 98236fad1aeb3bed349f423a1642dc181d549c43..716a9b38db51703378414e955d81d5611589b1ba 100644 (file)
@@ -7,73 +7,90 @@
  */
 package org.opendaylight.openflowplugin.impl.role;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.base.Verify;
-import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Map;
+import io.netty.util.TimerTask;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
-import org.opendaylight.openflowplugin.api.openflow.role.RoleChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
-import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
+import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
+import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
- * On receipt of the ownership notification, makes an rpc call to SalRoleSevice.
+ * On receipt of the ownership notification, makes an rpc call to SalRoleService.
  *
  * Hands over to StatisticsManager at the end.
  */
-public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
+public class RoleManagerImpl implements RoleManager, EntityOwnershipListener, ServiceChangeListener {
     private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
 
+    // Maximum limit of timeout retries when cleaning DS, to prevent infinite recursive loops
+    private static final int MAX_CLEAN_DS_RETRIES = 3;
+
     private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
+    private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
     private final DataBroker dataBroker;
     private final EntityOwnershipService entityOwnershipService;
-    private final ConcurrentMap<Entity, RoleContext> contexts = new ConcurrentHashMap<>();
-    private final ConcurrentMap<Entity, RoleContext> txContexts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<DeviceInfo, RoleContext> contexts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Entity, RoleContext> watchingEntities = new ConcurrentHashMap<>();
     private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
     private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
-    private final boolean switchFeaturesMandatory;
+    private List<RoleChangeListener> listeners = new ArrayList<>();
+
+    private final LifecycleConductor conductor;
 
-    public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final boolean switchFeaturesMandatory) {
+    public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final LifecycleConductor lifecycleConductor) {
         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
-        this.switchFeaturesMandatory = switchFeaturesMandatory;
         this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
         this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
-        LOG.debug("Registering OpenflowOwnershipListener listening to all entity ownership changes");
+        this.conductor = lifecycleConductor;
+        LOG.debug("Register OpenflowOwnershipListener to all entity ownership changes");
     }
 
     @Override
@@ -82,312 +99,326 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
     }
 
     @Override
-    public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
-        LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
-        if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
-            // Roles are not supported before OF1.3, so move forward.
-            deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
-            return;
-        }
-
-        final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
-                makeEntity(deviceContext.getDeviceState().getNodeId()),
-                makeTxEntity(deviceContext.getDeviceState().getNodeId()));
-        // if the device context gets closed (mostly on connection close), we would need to cleanup
-        deviceContext.addDeviceContextClosedHandler(this);
-        final RoleContext previousContext = contexts.putIfAbsent(roleContext.getEntity(), roleContext);
-        Verify.verify(previousContext == null,
-                "RoleCtx for master Node {} is still not close.", deviceContext.getDeviceState().getNodeId());
-
-        roleContext.initialization();
-        @Deprecated
-        final ListenableFuture<OfpRole> roleChangeFuture = SettableFuture.<OfpRole> create();
-
-        final ListenableFuture<Void> txFreeFuture = Futures.transform(roleChangeFuture, new AsyncFunction<OfpRole, Void>() {
-            @Override
-            public ListenableFuture<Void> apply(final OfpRole input) throws Exception {
-                final ListenableFuture<Void> nextFuture;
-                if (OfpRole.BECOMEMASTER.equals(input)) {
-                    LOG.debug("Node {} has marked as LEADER", deviceContext.getDeviceState().getNodeId());
-                    Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
-                            "RoleCtx for TxEntity {} master Node {} is still not close.",
-                            roleContext.getTxEntity(), deviceContext.getDeviceState().getNodeId());
-//                    nextFuture = roleContext.setupTxCandidate();
-                            nextFuture = Futures.immediateFuture(null);
-                } else {
-                    LOG.debug("Node {} was marked as FOLLOWER", deviceContext.getDeviceState().getNodeId());
-                    nextFuture = Futures.immediateFuture(null);
-                }
-                return nextFuture;
-            }
-        });
-
-        final ListenableFuture<Void> initDeviceFuture = Futures.transform(txFreeFuture, new AsyncFunction<Void, Void>() {
-            @Override
-            public ListenableFuture<Void> apply(final Void input) throws Exception {
-                LOG.debug("Node {} will be initialized", deviceContext.getDeviceState().getNodeId());
-                return DeviceInitializationUtils.initializeNodeInformation(deviceContext, switchFeaturesMandatory);
-            }
-        });
-
-        Futures.addCallback(initDeviceFuture, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                LOG.debug("Initialization Node {} is done.", deviceContext.getDeviceState().getNodeId());
-                try {
-                    getRoleContextLevelUp(deviceContext);
-                } catch (final Exception e) {
-                    deviceContext.close();
-                }
-            }
-
-            @Override
-            public void onFailure(final Throwable t) {
-                LOG.warn("Unexpected error for Node {} initialization", deviceContext.getDeviceState().getNodeId(), t);
-                deviceContext.close();
-            }
-        });
-    }
-
-    void getRoleContextLevelUp(final DeviceContext deviceContext) throws Exception {
-        LOG.debug("Created role context for node {}", deviceContext.getDeviceState().getNodeId());
-        LOG.debug("roleChangeFuture success for device:{}. Moving to StatisticsManager", deviceContext.getDeviceState().getNodeId());
-        deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
+    public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo) throws Exception {
+        final DeviceContext deviceContext = Preconditions.checkNotNull(conductor.getDeviceContext(deviceInfo));
+        final RoleContext roleContext = new RoleContextImpl(deviceInfo, entityOwnershipService, makeEntity(deviceInfo.getNodeId()), makeTxEntity(deviceInfo.getNodeId()), conductor);
+        roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
+        Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getNodeId());
+        makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, true);
+        /* First start to watch entity so we don't miss any notification, and then try to register in EOS */
+        watchingEntities.put(roleContext.getEntity(), roleContext);
+        notifyListenersRoleInitializationDone(roleContext.getDeviceInfo(), roleContext.initialization());
+        deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceInfo);
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
+        LOG.debug("Close method on role manager was called.");
         entityOwnershipListenerRegistration.close();
         txEntityOwnershipListenerRegistration.close();
-        for (final Map.Entry<Entity, RoleContext> roleContextEntry : contexts.entrySet()) {
+        for (final Iterator<RoleContext> iterator = Iterators.consumingIterator(contexts.values().iterator()); iterator.hasNext();) {
             // got here because last known role is LEADER and DS might need clearing up
-            final Entity entity = roleContextEntry.getKey();
-            final Optional<EntityOwnershipState> ownershipState = entityOwnershipService.getOwnershipState(entity);
-            final NodeId nodeId = roleContextEntry.getValue().getDeviceState().getNodeId();
-            if (ownershipState.isPresent()) {
-                if ((!ownershipState.get().hasOwner())) {
-                    LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; " +
-                            "cleaning DS as being probably the last owner", nodeId);
-                    removeDeviceFromOperDS(roleContextEntry.getValue());
-                } else {
-                    // NOOP - there is another owner
-                    LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; " +
-                            "leaving DS untouched", nodeId);
-                }
-            } else {
-                // TODO: is this safe? When could this happen?
-                LOG.warn("Last role is LEADER but ownershipService returned empty ownership info for node: {}; " +
-                        "cleaning DS ANYWAY!", nodeId);
-                removeDeviceFromOperDS(roleContextEntry.getValue());
+            final RoleContext roleContext = iterator.next();
+            watchingEntities.remove(roleContext.getEntity());
+            watchingEntities.remove(roleContext.getTxEntity());
+            contexts.remove(roleContext.getDeviceInfo());
+            if (roleContext.isTxCandidateRegistered()) {
+                LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.", roleContext.getDeviceInfo().getNodeId().getValue());
+                removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
             }
+            roleContext.close();
         }
-        contexts.clear();
     }
 
     @Override
-    public void onDeviceContextClosed(final DeviceContext deviceContext) {
-        final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
-        LOG.debug("onDeviceContextClosed for node {}", nodeId);
-        final Entity entity = makeEntity(nodeId);
-        final RoleContext roleContext = contexts.get(entity);
+    public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
+        LOG.trace("onDeviceContextLevelDown for node {}", deviceInfo.getNodeId());
+        final RoleContext roleContext = contexts.remove(deviceInfo);
         if (roleContext != null) {
-            LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId);
-            final Optional<EntityOwnershipState> actState = entityOwnershipService.getOwnershipState(entity);
-            if (actState.isPresent()) {
-                if (!actState.get().isOwner()) {
-                    LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId);
-                    contexts.remove(entity, roleContext);
-                }
-            } else {
-                LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity);
+            LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", deviceInfo.getNodeId());
+            roleContext.setState(OFPContext.CONTEXT_STATE.TERMINATION);
+            roleContext.unregisterCandidate(roleContext.getEntity());
+            if (roleContext.isTxCandidateRegistered()) {
+                LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.", deviceInfo.getNodeId().getValue());
+                removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
             }
             roleContext.close();
         }
+        deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
     }
 
-    private static Entity makeEntity(final NodeId nodeId) {
+    @VisibleForTesting
+    static Entity makeEntity(final NodeId nodeId) {
         return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
     }
 
-    private static Entity makeTxEntity(final NodeId nodeId) {
+    @VisibleForTesting
+    static Entity makeTxEntity(final NodeId nodeId) {
         return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
     }
 
     @Override
     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
+
         Preconditions.checkArgument(ownershipChange != null);
-        try {
-            final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
-            if (roleContext != null) {
-                changeForEntity(ownershipChange, roleContext);
-                return;
-            }
+        final RoleContext roleContext = watchingEntities.get(ownershipChange.getEntity());
+
+        if (Objects.nonNull(roleContext) && !roleContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
 
-            final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
-            if (txRoleContext != null) {
-                changeForTxEntity(ownershipChange, txRoleContext);
-                return;
+            LOG.debug("Received EOS message: wasOwner:{} isOwner:{} hasOwner:{} inJeopardy:{} for entity type {} and node {}",
+                    ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner(), ownershipChange.inJeopardy(),
+                    ownershipChange.getEntity().getType(),
+                    roleContext.getDeviceInfo().getNodeId());
+
+            if (ownershipChange.getEntity().equals(roleContext.getEntity())) {
+                changeOwnershipForMainEntity(ownershipChange, roleContext);
+            } else {
+                changeOwnershipForTxEntity(ownershipChange, roleContext);
             }
-        } catch (final InterruptedException e) {
-            LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
-            // FIXME: consider forcibly closing this connection
+
+        } else {
+
+            LOG.debug("Role context for entity type {} is in state closing, disregarding ownership change notification.", ownershipChange.getEntity().getType());
+            watchingEntities.remove(ownershipChange.getEntity());
+
         }
 
-        LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
-                ownershipChange.getEntity(), ownershipChange);
     }
 
-    private void changeForTxEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleChangeListener roleTxChangeListener) throws InterruptedException {
-        LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
-        final Semaphore txCandidateGuard = roleTxChangeListener.getTxCandidateGuard();
-        LOG.trace("txCandidate lock queue: " + txCandidateGuard.getQueueLength());
-        txCandidateGuard.acquire();
-
-        if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
-            // MASTER -> SLAVE - left transition lock
-            txContexts.remove(roleTxChangeListener.getTxEntity(), roleTxChangeListener);
-            txCandidateGuard.release();
-        } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
-            // SLAVE -> MASTER - acquired transition lock
-            LOG.debug("TxRoleChange for entity {}", ownershipChange.getEntity());
-            final OfpRole role = roleTxChangeListener.getDeviceState().getRole();
-            Verify.verify(OfpRole.BECOMEMASTER.equals(role),
-                    "Acquired txCandidate lock but current role = {}", role);
+    @VisibleForTesting
+    void changeOwnershipForMainEntity(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
 
+        if (roleContext.isMainCandidateRegistered()) {
+            LOG.debug("Main-EntityOwnershipRegistration is active for entity type {} and node {}",
+                    ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
+            if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && !ownershipChange.inJeopardy()) {
+                // SLAVE -> MASTER
+                LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
+                if (roleContext.registerCandidate(roleContext.getTxEntity())) {
+                    LOG.debug("Starting watching tx entity for node {}", roleContext.getDeviceInfo().getNodeId());
+                    watchingEntities.putIfAbsent(roleContext.getTxEntity(), roleContext);
+                }
+            } else if ((ownershipChange.wasOwner() && !ownershipChange.isOwner()) || (ownershipChange.inJeopardy())) {
+                // MASTER -> SLAVE
+                LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
+                conductor.addOneTimeListenerWhenServicesChangesDone(this, roleContext.getDeviceInfo());
+                makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, false);
+            }
         } else {
-            LOG.debug("NOOP state transition for TxEntity {} ", roleTxChangeListener.getTxEntity());
-            txCandidateGuard.release();
+            LOG.debug("Main-EntityOwnershipRegistration is not active for entity type {} and node {}",
+                    ownershipChange.getEntity(), roleContext.getDeviceInfo().getNodeId());
+            watchingEntities.remove(ownershipChange.getEntity(), roleContext);
+            if (roleContext.isTxCandidateRegistered()) {
+                LOG.debug("tx candidate still registered for node {}, probably connection lost, trying to unregister tx candidate", roleContext.getDeviceInfo().getNodeId());
+                roleContext.unregisterCandidate(roleContext.getTxEntity());
+                if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && !ownershipChange.hasOwner()) {
+                    LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
+                    removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
+                }
+            } else {
+                contexts.remove(roleContext.getDeviceInfo(), roleContext);
+                roleContext.close();
+                conductor.closeConnection(roleContext.getDeviceInfo());
+            }
         }
     }
 
-    private static Function<Void, Void> makeTxEntitySuspendCallback(final RoleContext roleChangeListener) {
-        return new Function<Void, Void>() {
-            @Override
-            public Void apply(final Void result) {
-                roleChangeListener.suspendTxCandidate();
-                return null;
+    @VisibleForTesting
+    void changeOwnershipForTxEntity(final EntityOwnershipChange ownershipChange,
+            @Nonnull final RoleContext roleContext) {
+
+        if (roleContext.isTxCandidateRegistered()) {
+            LOG.debug("Tx-EntityOwnershipRegistration is active for entity type {} and node {}",
+                    ownershipChange.getEntity().getType(),
+                    roleContext.getDeviceInfo().getNodeId());
+            if (ownershipChange.inJeopardy()) {
+                LOG.warn("Getting 'inJeopardy' flag from EOS. Removing txCandidate and stopping watching txCandidate.");
+                watchingEntities.remove(roleContext.getTxEntity());
+                roleContext.unregisterCandidate(roleContext.getTxEntity());
+            } else {
+                if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
+                    // SLAVE -> MASTER
+                    LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
+                    makeDeviceRoleChange(OfpRole.BECOMEMASTER, roleContext, false);
+                } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
+                    // MASTER -> SLAVE
+                    LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
+                    LOG.warn("Tx-EntityOwnershipRegistration lost leadership entity type {} and node {}",
+                            ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
+                    watchingEntities.remove(roleContext.getTxEntity(), roleContext);
+                    watchingEntities.remove(roleContext.getEntity(), roleContext);
+                    roleContext.unregisterCandidate(roleContext.getEntity());
+                    roleContext.unregisterCandidate(roleContext.getTxEntity());
+                    if (!ownershipChange.hasOwner()) {
+                        LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
+                        removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
+                    } else {
+                        contexts.remove(roleContext.getDeviceInfo(), roleContext);
+                        roleContext.close();
+                        conductor.closeConnection(roleContext.getDeviceInfo());
+                    }
+                }
             }
-        };
+        } else {
+            LOG.debug("Tx-EntityOwnershipRegistration is not active for entity type {} and node {}", ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
+            watchingEntities.remove(roleContext.getTxEntity(), roleContext);
+            contexts.remove(roleContext.getDeviceInfo(), roleContext);
+            roleContext.close();
+            conductor.closeConnection(roleContext.getDeviceInfo());
+        }
     }
 
-    private static Function<Void, Void> makeTxEntitySetupCallback(final RoleContext roleChangeListener) {
-        return new Function<Void, Void>() {
+    @VisibleForTesting
+    void makeDeviceRoleChange(final OfpRole role, final RoleContext roleContext, final Boolean init) {
+        final ListenableFuture<RpcResult<SetRoleOutput>> roleChangeFuture = sendRoleChangeToDevice(role, roleContext);
+        Futures.addCallback(roleChangeFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
             @Override
-            public Void apply(final Void result) {
-                try {
-                    roleChangeListener.setupTxCandidate();
-                } catch (final CandidateAlreadyRegisteredException e) {
-                    LOG.debug("txCandidate registration failed");
-                    Throwables.propagate(e);
+            public void onSuccess(@Nullable final RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+                LOG.info("Role {} successfully set on device {}", role, roleContext.getDeviceInfo().getNodeId());
+                if (!init) {
+                    notifyListenersRoleChangeOnDevice(roleContext.getDeviceInfo(), role);
                 }
-                return null;
             }
-        };
-    }
 
-    private void changeForEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleChangeListener) throws InterruptedException {
-        final Semaphore mainCandidateGuard = roleChangeListener.getMainCandidateGuard();
-        LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
-        mainCandidateGuard.acquire();
-        //FIXME : check again implementation for double candidate scenario
-        LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
-
-        if (roleChangeListener.getDeviceState().isValid()) {
-            LOG.debug("RoleChange for entity {}", ownershipChange.getEntity());
-            final OfpRole newRole = ownershipChange.isOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
-            final OfpRole oldRole = ownershipChange.wasOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
-            // send even if they are same. we do the check for duplicates in SalRoleService and maintain a lastKnownRole
-            ListenableFuture<Void> rolePropagatedFx = roleChangeListener.onRoleChanged(oldRole, newRole);
-            final Function<Void, Void> txProcessCallback;
-            if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
-                // MASTER -> SLAVE
-                txProcessCallback = makeTxEntitySuspendCallback(roleChangeListener);
-            } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.isOwner()) {
-                // FIXME : make different code path deviceContext.onClusterRoleChange(newRole); has to call from onTxRoleChange (for master)
-                // SLAVE -> MASTER
-                txProcessCallback = makeTxEntitySetupCallback(roleChangeListener);
-            } else {
-                LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
-                txProcessCallback = null;
-            }
-
-            if (txProcessCallback != null) {
-                rolePropagatedFx = Futures.transform(rolePropagatedFx, txProcessCallback);
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                LOG.warn("Unable to set role {} on device {}", role, roleContext.getDeviceInfo().getNodeId());
+                conductor.closeConnection(roleContext.getDeviceInfo());
             }
+        });
+    }
 
-            Futures.addCallback(rolePropagatedFx, new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(@Nullable final Void aVoid) {
-                            LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
-                                    ownershipChange.getEntity(), oldRole, newRole);
-                            mainCandidateGuard.release();
-                        }
-
-                        @Override
-                        public void onFailure(final Throwable throwable) {
-                            LOG.warn("Main candidate role propagation failed for entity: {}, {} -> {}",
-                                    ownershipChange.getEntity(), oldRole, newRole);
-                            mainCandidateGuard.release();
-                            // FIXME: here we shall disconnect probably - in order to avoid inconsistent state
-                        }
-                    }
-            );
-
+    @VisibleForTesting
+    ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole, final RoleContext roleContext) {
+        LOG.debug("Sending new role {} to device {}", newRole, roleContext.getDeviceInfo().getNodeId());
+        final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
+        final Short version = roleContext.getDeviceInfo().getVersion();
+        if (null == version) {
+            LOG.debug("Device version is null");
+            return Futures.immediateFuture(null);
+        }
+        if (version < OFConstants.OFP_VERSION_1_3) {
+            LOG.debug("Device version not support ROLE");
+            return Futures.immediateFuture(null);
         } else {
-            LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
-            mainCandidateGuard.release();
-            // expecting that this roleContext will get closed in a moment
-            // FIXME: reconsider location of following cleanup logic
-            if (!ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
-                unregistrationHelper(ownershipChange, roleChangeListener);
-            } else if (ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
-                contexts.remove(ownershipChange.getEntity(), roleChangeListener);
-                roleChangeListener.suspendTxCandidate();
-            } else {
-                LOG.info("Unexpected role change msg {} for entity {}", ownershipChange, ownershipChange.getEntity());
-            }
+            final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
+                    .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(roleContext.getDeviceInfo().getNodeId()))).build();
+            setRoleOutputFuture = roleContext.getSalRoleService().setRole(setRoleInput);
+            final TimerTask timerTask = timeout -> {
+                if (!setRoleOutputFuture.isDone()) {
+                    LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, roleContext.getDeviceInfo().getNodeId());
+                    setRoleOutputFuture.cancel(true);
+                }
+            };
+            conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
         }
+        return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
     }
 
-    private CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperDS(
-            final RoleChangeListener roleChangeListener) {
-        Preconditions.checkArgument(roleChangeListener != null);
-        final DeviceState deviceState = roleChangeListener.getDeviceState();
+    @VisibleForTesting
+    CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo, final int numRetries) {
         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
-        delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier());
+        delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()));
         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
-        Futures.addCallback(delFuture, new FutureCallback<Void>() {
 
+        Futures.addCallback(delFuture, new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                LOG.debug("Delete Node {} was successful", deviceState.getNodeId());
+                LOG.debug("Delete Node {} was successful", deviceInfo);
+                final RoleContext roleContext = contexts.remove(deviceInfo);
+                if (roleContext != null) {
+                    roleContext.close();
+                }
             }
 
             @Override
-            public void onFailure(final Throwable t) {
-                LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
+            public void onFailure(@Nonnull final Throwable t) {
+                // If we have any retries left, we will try to clean the datastore again
+                if (numRetries > 0) {
+                    // We "used" one retry here, so decrement it
+                    final int curRetries = numRetries - 1;
+                    LOG.debug("Delete node {} failed with exception {}. Trying again (retries left: {})", deviceInfo.getNodeId(), t, curRetries);
+                    // Recursive call to this method with "one less" retry
+                    removeDeviceFromOperationalDS(deviceInfo, curRetries);
+                    return;
+                }
+
+                // No retries left, so we will just close the role context, and ignore datastore cleanup
+                LOG.warn("Delete node {} failed with exception {}. No retries left, aborting", deviceInfo.getNodeId(), t);
+                final RoleContext roleContext = contexts.remove(deviceInfo);
+                if (roleContext != null) {
+                    roleContext.close();
+                }
             }
         });
+
         return delFuture;
     }
 
-    private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleChangeListener roleChangeListener) {
-        LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
-        Futures.addCallback(removeDeviceFromOperDS(roleChangeListener), new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(@Nullable final Void aVoid) {
-                LOG.debug("Freeing roleContext slot for device: {}", roleChangeListener.getDeviceState().getNodeId());
-                contexts.remove(ownershipChange.getEntity(), roleChangeListener);
-                ((RoleContext) roleChangeListener).suspendTxCandidate();
-            }
+    @Override
+    public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
+        deviceTerminationPhaseHandler = handler;
+    }
 
-            @Override
-            public void onFailure(final Throwable throwable) {
-                LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleChangeListener.getDeviceState()
-                        .getNodeId(), throwable.getMessage());
-                contexts.remove(ownershipChange.getEntity(), roleChangeListener);
-                ((RoleContext) roleChangeListener).suspendTxCandidate();
-            }
-        });
+    @Override
+    public void servicesChangeDone(final DeviceInfo deviceInfo, final boolean success) {
+        LOG.debug("Services stopping done for node {} as " + (success ? "successful" : "unsuccessful"), deviceInfo);
+        final RoleContext roleContext = contexts.get(deviceInfo);
+        if (null != roleContext) {
+            /* Services stopped or failure */
+            roleContext.unregisterCandidate(roleContext.getTxEntity());
+        }
+    }
+
+    @VisibleForTesting
+    RoleContext getRoleContext(final DeviceInfo deviceInfo){
+        return contexts.get(deviceInfo);
+    }
+
+    /**
+     * This method is only for testing
+     */
+    @VisibleForTesting
+    void setRoleContext(DeviceInfo deviceInfo, RoleContext roleContext){
+        if(!contexts.containsKey(deviceInfo)) {
+            contexts.put(deviceInfo, roleContext);
+        }
+    }
+
+    @Override
+    public void addRoleChangeListener(final RoleChangeListener roleChangeListener) {
+        this.listeners.add(roleChangeListener);
+    }
+
+    /**
+     * Invoked when initialization phase is done
+     * @param deviceInfo node identification
+     * @param success true if initialization done ok, false otherwise
+     */
+    @VisibleForTesting
+    void notifyListenersRoleInitializationDone(final DeviceInfo deviceInfo, final boolean success){
+        LOG.debug("Notifying registered listeners for role initialization done, no. of listeners {}", listeners.size());
+        for (final RoleChangeListener listener : listeners) {
+            listener.roleInitializationDone(deviceInfo, success);
+        }
+    }
+
+    /**
+     * Notifies registered listener on role change. Role is the new role on device
+     * If initialization phase is true, we may skip service starting
+     * @param deviceInfo
+     * @param role new role meant to be set on device
+     */
+    @VisibleForTesting
+    void notifyListenersRoleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole role){
+        LOG.debug("Notifying registered listeners for role change, no. of listeners {}", listeners.size());
+        for (final RoleChangeListener listener : listeners) {
+            listener.roleChangeOnDevice(deviceInfo, role);
+        }
+    }
+
+    @Override
+    public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
+        return (T) contexts.get(deviceInfo);
     }
 }