*/
package org.opendaylight.openflowplugin.impl.role;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.base.Verify;
-import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Map;
+import io.netty.util.TimerTask;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
-import org.opendaylight.openflowplugin.api.openflow.role.RoleChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
-import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
+import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
+import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
- * On receipt of the ownership notification, makes an rpc call to SalRoleSevice.
+ * On receipt of the ownership notification, makes an rpc call to SalRoleService.
*
* Hands over to StatisticsManager at the end.
*/
-public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
+public class RoleManagerImpl implements RoleManager, EntityOwnershipListener, ServiceChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
+ // Maximum limit of timeout retries when cleaning DS, to prevent infinite recursive loops
+ private static final int MAX_CLEAN_DS_RETRIES = 3;
+
private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
+ private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
private final DataBroker dataBroker;
private final EntityOwnershipService entityOwnershipService;
- private final ConcurrentMap<Entity, RoleContext> contexts = new ConcurrentHashMap<>();
- private final ConcurrentMap<Entity, RoleContext> txContexts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DeviceInfo, RoleContext> contexts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Entity, RoleContext> watchingEntities = new ConcurrentHashMap<>();
private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
- private final boolean switchFeaturesMandatory;
+ private List<RoleChangeListener> listeners = new ArrayList<>();
+
+ private final LifecycleConductor conductor;
- public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final boolean switchFeaturesMandatory) {
+ public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final LifecycleConductor lifecycleConductor) {
this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
this.dataBroker = Preconditions.checkNotNull(dataBroker);
- this.switchFeaturesMandatory = switchFeaturesMandatory;
this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
- LOG.debug("Registering OpenflowOwnershipListener listening to all entity ownership changes");
+ this.conductor = lifecycleConductor;
+ LOG.debug("Register OpenflowOwnershipListener to all entity ownership changes");
}
@Override
}
@Override
- public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
- LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
- if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
- // Roles are not supported before OF1.3, so move forward.
- deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
- return;
- }
-
- final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
- makeEntity(deviceContext.getDeviceState().getNodeId()),
- makeTxEntity(deviceContext.getDeviceState().getNodeId()));
- // if the device context gets closed (mostly on connection close), we would need to cleanup
- deviceContext.addDeviceContextClosedHandler(this);
- final RoleContext previousContext = contexts.putIfAbsent(roleContext.getEntity(), roleContext);
- Verify.verify(previousContext == null,
- "RoleCtx for master Node {} is still not close.", deviceContext.getDeviceState().getNodeId());
-
- roleContext.initialization();
- }
-
- void getRoleContextLevelUp(final DeviceContext deviceContext) {
- LOG.debug("Created role context for node {}", deviceContext.getDeviceState().getNodeId());
- LOG.debug("roleChangeFuture success for device:{}. Moving to StatisticsManager", deviceContext.getDeviceState().getNodeId());
- try {
- deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
- } catch (final Exception e) {
- LOG.info("failed to complete levelUp on next handler for device {}",
- deviceContext.getDeviceState().getNodeId());
- deviceContext.close();
- return;
- }
+ public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo) throws Exception {
+ final DeviceContext deviceContext = Preconditions.checkNotNull(conductor.getDeviceContext(deviceInfo));
+ final RoleContext roleContext = new RoleContextImpl(deviceInfo, entityOwnershipService, makeEntity(deviceInfo.getNodeId()), makeTxEntity(deviceInfo.getNodeId()), conductor);
+ roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
+ Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getNodeId());
+ makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, true);
+ /* First start to watch entity so we don't miss any notification, and then try to register in EOS */
+ watchingEntities.put(roleContext.getEntity(), roleContext);
+ notifyListenersRoleInitializationDone(roleContext.getDeviceInfo(), roleContext.initialization());
+ deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceInfo);
}
@Override
- public void close() throws Exception {
+ public void close() {
+ LOG.debug("Close method on role manager was called.");
entityOwnershipListenerRegistration.close();
txEntityOwnershipListenerRegistration.close();
- for (final Map.Entry<Entity, RoleContext> roleContextEntry : contexts.entrySet()) {
+ for (final Iterator<RoleContext> iterator = Iterators.consumingIterator(contexts.values().iterator()); iterator.hasNext();) {
// got here because last known role is LEADER and DS might need clearing up
- final Entity entity = roleContextEntry.getKey();
- final Optional<EntityOwnershipState> ownershipState = entityOwnershipService.getOwnershipState(entity);
- final NodeId nodeId = roleContextEntry.getValue().getDeviceState().getNodeId();
- if (ownershipState.isPresent()) {
- if ((!ownershipState.get().hasOwner())) {
- LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; " +
- "cleaning DS as being probably the last owner", nodeId);
- removeDeviceFromOperDS(roleContextEntry.getValue());
- } else {
- // NOOP - there is another owner
- LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; " +
- "leaving DS untouched", nodeId);
- }
- } else {
- // TODO: is this safe? When could this happen?
- LOG.warn("Last role is LEADER but ownershipService returned empty ownership info for node: {}; " +
- "cleaning DS ANYWAY!", nodeId);
- removeDeviceFromOperDS(roleContextEntry.getValue());
+ final RoleContext roleContext = iterator.next();
+ watchingEntities.remove(roleContext.getEntity());
+ watchingEntities.remove(roleContext.getTxEntity());
+ contexts.remove(roleContext.getDeviceInfo());
+ if (roleContext.isTxCandidateRegistered()) {
+ LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.", roleContext.getDeviceInfo().getNodeId().getValue());
+ removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
}
+ roleContext.close();
}
- contexts.clear();
}
@Override
- public void onDeviceContextClosed(final DeviceContext deviceContext) {
- final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
- LOG.debug("onDeviceContextClosed for node {}", nodeId);
- final Entity entity = makeEntity(nodeId);
- final RoleContext roleContext = contexts.get(entity);
+ public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
+ LOG.trace("onDeviceContextLevelDown for node {}", deviceInfo.getNodeId());
+ final RoleContext roleContext = contexts.remove(deviceInfo);
if (roleContext != null) {
- LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId);
- final Optional<EntityOwnershipState> actState = entityOwnershipService.getOwnershipState(entity);
- if (actState.isPresent()) {
- if (!actState.get().isOwner()) {
- LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId);
- contexts.remove(entity, roleContext);
- }
- } else {
- LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity);
+ LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", deviceInfo.getNodeId());
+ roleContext.setState(OFPContext.CONTEXT_STATE.TERMINATION);
+ roleContext.unregisterCandidate(roleContext.getEntity());
+ if (roleContext.isTxCandidateRegistered()) {
+ LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.", deviceInfo.getNodeId().getValue());
+ removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
}
roleContext.close();
}
+ deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
}
- private static Entity makeEntity(final NodeId nodeId) {
+ @VisibleForTesting
+ static Entity makeEntity(final NodeId nodeId) {
return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
}
- private static Entity makeTxEntity(final NodeId nodeId) {
+ @VisibleForTesting
+ static Entity makeTxEntity(final NodeId nodeId) {
return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
}
@Override
public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
+
Preconditions.checkArgument(ownershipChange != null);
- try {
- final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
- if (roleContext != null) {
- changeForEntity(ownershipChange, roleContext);
- return;
- }
+ final RoleContext roleContext = watchingEntities.get(ownershipChange.getEntity());
- final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
- if (txRoleContext != null) {
- changeForTxEntity(ownershipChange, txRoleContext);
- return;
- }
- } catch (final InterruptedException e) {
- LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
- // FIXME: consider forcibly closing this connection
- }
+ if (Objects.nonNull(roleContext) && !roleContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
- LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
- ownershipChange.getEntity(), ownershipChange);
- }
+ LOG.debug("Received EOS message: wasOwner:{} isOwner:{} hasOwner:{} inJeopardy:{} for entity type {} and node {}",
+ ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner(), ownershipChange.inJeopardy(),
+ ownershipChange.getEntity().getType(),
+ roleContext.getDeviceInfo().getNodeId());
- private void changeForTxEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext)
- throws InterruptedException {
- LOG.info("Received TX-EntityOwnershipChange:{}", ownershipChange);
- final Semaphore txCandidateGuard = roleContext.getTxCandidateGuard();
- LOG.trace("txCandidate lock queue: " + txCandidateGuard.getQueueLength());
- txCandidateGuard.acquire();
-
- ListenableFuture<Void> processingClosure;
- final DeviceContext deviceContext = roleContext.getDeviceContext();
- final NodeId nodeId = roleContext.getDeviceState().getNodeId();
-
- if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
- // SLAVE -> MASTER - acquired transition lock
- LOG.debug("Acquired tx-lock for entity {}", ownershipChange.getEntity());
- roleContext.setTxLockOwned(true);
- final OfpRole role = roleContext.getDeviceState().getRole();
- Verify.verify(OfpRole.BECOMEMASTER.equals(roleContext.getPropagatingRole()),
- "Acquired tx-lock but current role = {}", role);
-
- switch (roleContext.getState()) {
- case STARTING:
- processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
- // activate stats - accomplished automatically by chaging role in deviceState
- // collect initial dynamic data from device
- processingClosure = Futures.transform(processingClosure, new AsyncFunction<Void, Void>() {
- @Nullable
- @Override
- public ListenableFuture<Void> apply(@Nullable final Void aVoid) {
- deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
- return DeviceInitializationUtils.initializeNodeInformation(
- deviceContext, switchFeaturesMandatory);
- }
- });
- break;
- case WORKING:
- // activate txChainManager, activate rpcs
- processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
- // activate stats - accomplished automatically by chaging role in deviceState
- processingClosure = Futures.transform(processingClosure, new Function<Void, Void>() {
- @Nullable
- @Override
- public Void apply(@Nullable final Void aVoid) {
- deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
- return null;
- }
- });
- break;
- //case TEARING_DOWN:
- default:
- //TODO: reconsider if there is really nothing to do when tearing down
- processingClosure = Futures.immediateFuture(null);
- break;
+ if (ownershipChange.getEntity().equals(roleContext.getEntity())) {
+ changeOwnershipForMainEntity(ownershipChange, roleContext);
+ } else {
+ changeOwnershipForTxEntity(ownershipChange, roleContext);
}
- } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
- // MASTER -> SLAVE - released tx-lock
- LOG.debug("Released tx-lock for entity {}", ownershipChange.getEntity());
- roleContext.setTxLockOwned(false);
- txContexts.remove(roleContext.getTxEntity(), roleContext);
- processingClosure = Futures.immediateFuture(null);
+
} else {
- LOG.debug("NOOP state transition for TxEntity {} ", roleContext.getTxEntity());
- processingClosure = Futures.immediateFuture(null);
- }
- // handle result of executed steps
- Futures.addCallback(processingClosure, new FutureCallback<Void>()
-
- {
- @Override
- public void onSuccess(@Nullable final Void aVoid) {
- // propagating role must be BECOMEMASTER in order to run this processing
- // removing it will disable redundand processing of BECOMEMASTER
- roleContext.setPropagatingRole(null);
-
- txCandidateGuard.release();
- switch (roleContext.getState()) {
- case STARTING:
- LOG.debug("init steps protected by tx-lock for node {} are done.", nodeId);
- roleContext.promoteStateToWorking();
- getRoleContextLevelUp(deviceContext);
- break;
- case WORKING:
- LOG.debug("normal steps protected by tx-lock for node {} are done.", nodeId);
- break;
- case TEARING_DOWN:
- LOG.debug("teardown steps protected by tx-lock for node {} are done.", nodeId);
- break;
- }
- }
+ LOG.debug("Role context for entity type {} is in state closing, disregarding ownership change notification.", ownershipChange.getEntity().getType());
+ watchingEntities.remove(ownershipChange.getEntity());
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Unexpected error for Node {}, state={}, txLock={} -> terminating device context",
- nodeId, roleContext.getState(), roleContext.isTxLockOwned(), throwable);
- txCandidateGuard.release();
- deviceContext.close();
- }
- }
+ }
- );
}
- private static Function<Void, Void> makeTxEntitySuspendCallback(final RoleContext roleChangeListener) {
- return new Function<Void, Void>() {
- @Override
- public Void apply(final Void result) {
- roleChangeListener.suspendTxCandidate();
- return null;
+ @VisibleForTesting
+ void changeOwnershipForMainEntity(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
+
+ if (roleContext.isMainCandidateRegistered()) {
+ LOG.debug("Main-EntityOwnershipRegistration is active for entity type {} and node {}",
+ ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
+ if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && !ownershipChange.inJeopardy()) {
+ // SLAVE -> MASTER
+ LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
+ if (roleContext.registerCandidate(roleContext.getTxEntity())) {
+ LOG.debug("Starting watching tx entity for node {}", roleContext.getDeviceInfo().getNodeId());
+ watchingEntities.putIfAbsent(roleContext.getTxEntity(), roleContext);
+ }
+ } else if ((ownershipChange.wasOwner() && !ownershipChange.isOwner()) || (ownershipChange.inJeopardy())) {
+ // MASTER -> SLAVE
+ LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
+ conductor.addOneTimeListenerWhenServicesChangesDone(this, roleContext.getDeviceInfo());
+ makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, false);
+ }
+ } else {
+ LOG.debug("Main-EntityOwnershipRegistration is not active for entity type {} and node {}",
+ ownershipChange.getEntity(), roleContext.getDeviceInfo().getNodeId());
+ watchingEntities.remove(ownershipChange.getEntity(), roleContext);
+ if (roleContext.isTxCandidateRegistered()) {
+ LOG.debug("tx candidate still registered for node {}, probably connection lost, trying to unregister tx candidate", roleContext.getDeviceInfo().getNodeId());
+ roleContext.unregisterCandidate(roleContext.getTxEntity());
+ if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && !ownershipChange.hasOwner()) {
+ LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
+ removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
+ }
+ } else {
+ contexts.remove(roleContext.getDeviceInfo(), roleContext);
+ roleContext.close();
+ conductor.closeConnection(roleContext.getDeviceInfo());
}
- };
+ }
}
- private Function<Void, Void> makeTxEntitySetupCallback(final RoleContext roleContext) {
- return new Function<Void, Void>() {
- @Override
- public Void apply(final Void result) {
- final NodeId nodeId = roleContext.getDeviceState().getNodeId();
- try {
- LOG.debug("Node {} is marked as LEADER", nodeId);
- Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
- "RoleCtx for TxEntity {} master Node {} is still not closed.",
- roleContext.getTxEntity(), nodeId);
- roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
-
- // try to register tx-candidate via ownership service
- roleContext.setupTxCandidate();
- } catch (final CandidateAlreadyRegisteredException e) {
- LOG.debug("txCandidate registration failed");
- // --- CLEAN UP ---
- // withdraw context from map in order to have it as before
- txContexts.remove(roleContext.getTxEntity(), roleContext);
- // no more propagating any role - there is no txCandidate lock approaching
- roleContext.setPropagatingRole(null);
-
- Throwables.propagate(e);
+ @VisibleForTesting
+ void changeOwnershipForTxEntity(final EntityOwnershipChange ownershipChange,
+ @Nonnull final RoleContext roleContext) {
+
+ if (roleContext.isTxCandidateRegistered()) {
+ LOG.debug("Tx-EntityOwnershipRegistration is active for entity type {} and node {}",
+ ownershipChange.getEntity().getType(),
+ roleContext.getDeviceInfo().getNodeId());
+ if (ownershipChange.inJeopardy()) {
+ LOG.warn("Getting 'inJeopardy' flag from EOS. Removing txCandidate and stopping watching txCandidate.");
+ watchingEntities.remove(roleContext.getTxEntity());
+ roleContext.unregisterCandidate(roleContext.getTxEntity());
+ } else {
+ if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
+ // SLAVE -> MASTER
+ LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
+ makeDeviceRoleChange(OfpRole.BECOMEMASTER, roleContext, false);
+ } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
+ // MASTER -> SLAVE
+ LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
+ LOG.warn("Tx-EntityOwnershipRegistration lost leadership entity type {} and node {}",
+ ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
+ watchingEntities.remove(roleContext.getTxEntity(), roleContext);
+ watchingEntities.remove(roleContext.getEntity(), roleContext);
+ roleContext.unregisterCandidate(roleContext.getEntity());
+ roleContext.unregisterCandidate(roleContext.getTxEntity());
+ if (!ownershipChange.hasOwner()) {
+ LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
+ removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
+ } else {
+ contexts.remove(roleContext.getDeviceInfo(), roleContext);
+ roleContext.close();
+ conductor.closeConnection(roleContext.getDeviceInfo());
+ }
}
- return null;
}
- };
+ } else {
+ LOG.debug("Tx-EntityOwnershipRegistration is not active for entity type {} and node {}", ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
+ watchingEntities.remove(roleContext.getTxEntity(), roleContext);
+ contexts.remove(roleContext.getDeviceInfo(), roleContext);
+ roleContext.close();
+ conductor.closeConnection(roleContext.getDeviceInfo());
+ }
}
- private void changeForEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext) throws InterruptedException {
- final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard();
- LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
- mainCandidateGuard.acquire();
- //FIXME : check again implementation for double candidate scenario
- LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
-
- if (roleContext.getDeviceState().isValid()) {
- LOG.debug("RoleChange for entity {}", ownershipChange.getEntity());
- final OfpRole newRole = ownershipChange.isOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
- final OfpRole oldRole = ownershipChange.wasOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
-
- // propagation start point
- ListenableFuture<Void> rolePropagationFx = Futures.immediateFuture(null);
- final Function<Void, Void> txProcessCallback;
-
- if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
- // MASTER -> SLAVE
- rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
- if (RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) {
- txProcessCallback = makeTxEntitySuspendCallback(roleContext);
- } else {
- txProcessCallback = null;
+ @VisibleForTesting
+ void makeDeviceRoleChange(final OfpRole role, final RoleContext roleContext, final Boolean init) {
+ final ListenableFuture<RpcResult<SetRoleOutput>> roleChangeFuture = sendRoleChangeToDevice(role, roleContext);
+ Futures.addCallback(roleChangeFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
+ @Override
+ public void onSuccess(@Nullable final RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ LOG.info("Role {} successfully set on device {}", role, roleContext.getDeviceInfo().getNodeId());
+ if (!init) {
+ notifyListenersRoleChangeOnDevice(roleContext.getDeviceInfo(), role);
}
- } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.isOwner()) {
- // SLAVE -> MASTER
- txProcessCallback = makeTxEntitySetupCallback(roleContext);
- } else {
- LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
- txProcessCallback = null;
}
- if (txProcessCallback != null) {
- rolePropagationFx = Futures.transform(rolePropagationFx, txProcessCallback);
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ LOG.warn("Unable to set role {} on device {}", role, roleContext.getDeviceInfo().getNodeId());
+ conductor.closeConnection(roleContext.getDeviceInfo());
}
+ });
+ }
- // catching result
- Futures.addCallback(rolePropagationFx, new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable final Void aVoid) {
- LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
- ownershipChange.getEntity(), oldRole, newRole);
- mainCandidateGuard.release();
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
- ownershipChange.getEntity(), oldRole, newRole);
- mainCandidateGuard.release();
- roleContext.getDeviceContext().close();
- }
- }
- );
-
+ @VisibleForTesting
+ ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole, final RoleContext roleContext) {
+ LOG.debug("Sending new role {} to device {}", newRole, roleContext.getDeviceInfo().getNodeId());
+ final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
+ final Short version = roleContext.getDeviceInfo().getVersion();
+ if (null == version) {
+ LOG.debug("Device version is null");
+ return Futures.immediateFuture(null);
+ }
+ if (version < OFConstants.OFP_VERSION_1_3) {
+ LOG.debug("Device version not support ROLE");
+ return Futures.immediateFuture(null);
} else {
- LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
- mainCandidateGuard.release();
- // expecting that this roleContext will get closed in a moment
- // FIXME: reconsider location of following cleanup logic
- if (!ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
- unregistrationHelper(ownershipChange, roleContext);
- } else if (ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
- contexts.remove(ownershipChange.getEntity(), roleContext);
- roleContext.suspendTxCandidate();
- } else {
- LOG.info("Unexpected role change msg {} for entity {}", ownershipChange, ownershipChange.getEntity());
- }
+ final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
+ .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(roleContext.getDeviceInfo().getNodeId()))).build();
+ setRoleOutputFuture = roleContext.getSalRoleService().setRole(setRoleInput);
+ final TimerTask timerTask = timeout -> {
+ if (!setRoleOutputFuture.isDone()) {
+ LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, roleContext.getDeviceInfo().getNodeId());
+ setRoleOutputFuture.cancel(true);
+ }
+ };
+ conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
}
+ return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
}
- private CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperDS(
- final RoleChangeListener roleChangeListener) {
- Preconditions.checkArgument(roleChangeListener != null);
- final DeviceState deviceState = roleChangeListener.getDeviceState();
+ @VisibleForTesting
+ CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo, final int numRetries) {
final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
- delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier());
+ delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()));
final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
- Futures.addCallback(delFuture, new FutureCallback<Void>() {
+ Futures.addCallback(delFuture, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- LOG.debug("Delete Node {} was successful", deviceState.getNodeId());
+ LOG.debug("Delete Node {} was successful", deviceInfo);
+ final RoleContext roleContext = contexts.remove(deviceInfo);
+ if (roleContext != null) {
+ roleContext.close();
+ }
}
@Override
- public void onFailure(final Throwable t) {
- LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
+ public void onFailure(@Nonnull final Throwable t) {
+ // If we have any retries left, we will try to clean the datastore again
+ if (numRetries > 0) {
+ // We "used" one retry here, so decrement it
+ final int curRetries = numRetries - 1;
+ LOG.debug("Delete node {} failed with exception {}. Trying again (retries left: {})", deviceInfo.getNodeId(), t, curRetries);
+ // Recursive call to this method with "one less" retry
+ removeDeviceFromOperationalDS(deviceInfo, curRetries);
+ return;
+ }
+
+ // No retries left, so we will just close the role context, and ignore datastore cleanup
+ LOG.warn("Delete node {} failed with exception {}. No retries left, aborting", deviceInfo.getNodeId(), t);
+ final RoleContext roleContext = contexts.remove(deviceInfo);
+ if (roleContext != null) {
+ roleContext.close();
+ }
}
});
+
return delFuture;
}
- private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleChangeListener roleChangeListener) {
- LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
- Futures.addCallback(removeDeviceFromOperDS(roleChangeListener), new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable final Void aVoid) {
- LOG.debug("Freeing roleContext slot for device: {}", roleChangeListener.getDeviceState().getNodeId());
- contexts.remove(ownershipChange.getEntity(), roleChangeListener);
- ((RoleContext) roleChangeListener).suspendTxCandidate();
- }
+ @Override
+ public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
+ deviceTerminationPhaseHandler = handler;
+ }
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleChangeListener.getDeviceState()
- .getNodeId(), throwable.getMessage());
- contexts.remove(ownershipChange.getEntity(), roleChangeListener);
- ((RoleContext) roleChangeListener).suspendTxCandidate();
- }
- });
+ @Override
+ public void servicesChangeDone(final DeviceInfo deviceInfo, final boolean success) {
+ LOG.debug("Services stopping done for node {} as " + (success ? "successful" : "unsuccessful"), deviceInfo);
+ final RoleContext roleContext = contexts.get(deviceInfo);
+ if (null != roleContext) {
+ /* Services stopped or failure */
+ roleContext.unregisterCandidate(roleContext.getTxEntity());
+ }
+ }
+
+ @VisibleForTesting
+ RoleContext getRoleContext(final DeviceInfo deviceInfo){
+ return contexts.get(deviceInfo);
+ }
+
+ /**
+ * This method is only for testing
+ */
+ @VisibleForTesting
+ void setRoleContext(DeviceInfo deviceInfo, RoleContext roleContext){
+ if(!contexts.containsKey(deviceInfo)) {
+ contexts.put(deviceInfo, roleContext);
+ }
+ }
+
+ @Override
+ public void addRoleChangeListener(final RoleChangeListener roleChangeListener) {
+ this.listeners.add(roleChangeListener);
+ }
+
+ /**
+ * Invoked when initialization phase is done
+ * @param deviceInfo node identification
+ * @param success true if initialization done ok, false otherwise
+ */
+ @VisibleForTesting
+ void notifyListenersRoleInitializationDone(final DeviceInfo deviceInfo, final boolean success){
+ LOG.debug("Notifying registered listeners for role initialization done, no. of listeners {}", listeners.size());
+ for (final RoleChangeListener listener : listeners) {
+ listener.roleInitializationDone(deviceInfo, success);
+ }
+ }
+
+ /**
+ * Notifies registered listener on role change. Role is the new role on device
+ * If initialization phase is true, we may skip service starting
+ * @param deviceInfo
+ * @param role new role meant to be set on device
+ */
+ @VisibleForTesting
+ void notifyListenersRoleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole role){
+ LOG.debug("Notifying registered listeners for role change, no. of listeners {}", listeners.size());
+ for (final RoleChangeListener listener : listeners) {
+ listener.roleChangeOnDevice(deviceInfo, role);
+ }
+ }
+
+ @Override
+ public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
+ return (T) contexts.get(deviceInfo);
}
}