import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.util.Timeout;
import io.netty.util.TimerTask;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
public class RoleManagerImpl implements RoleManager, EntityOwnershipListener, ServiceChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
+ // Maximum limit of timeout retries when cleaning DS, to prevent infinite recursive loops
+ private static final int MAX_CLEAN_DS_RETRIES = 3;
+
private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
private final DataBroker dataBroker;
private final EntityOwnershipService entityOwnershipService;
- private final ConcurrentMap<NodeId, RoleContext> contexts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DeviceInfo, RoleContext> contexts = new ConcurrentHashMap<>();
private final ConcurrentMap<Entity, RoleContext> watchingEntities = new ConcurrentHashMap<>();
private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
}
@Override
- public void onDeviceContextLevelUp(@CheckForNull final NodeId nodeId) throws Exception {
- final DeviceContext deviceContext = Preconditions.checkNotNull(conductor.getDeviceContext(nodeId));
- final RoleContext roleContext = new RoleContextImpl(nodeId, entityOwnershipService, makeEntity(nodeId), makeTxEntity(nodeId), conductor);
+ public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo) throws Exception {
+ final DeviceContext deviceContext = Preconditions.checkNotNull(conductor.getDeviceContext(deviceInfo));
+ final RoleContext roleContext = new RoleContextImpl(deviceInfo, entityOwnershipService, makeEntity(deviceInfo.getNodeId()), makeTxEntity(deviceInfo.getNodeId()), conductor);
roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
- Verify.verify(contexts.putIfAbsent(nodeId, roleContext) == null, "Role context for master Node %s is still not closed.", nodeId);
+ Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getNodeId());
makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, true);
- notifyListenersRoleInitializationDone(roleContext.getNodeId(), roleContext.initialization());
+ /* First start to watch entity so we don't miss any notification, and then try to register in EOS */
watchingEntities.put(roleContext.getEntity(), roleContext);
- deviceInitializationPhaseHandler.onDeviceContextLevelUp(nodeId);
+ notifyListenersRoleInitializationDone(roleContext.getDeviceInfo(), roleContext.initialization());
+ deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceInfo);
}
@Override
final RoleContext roleContext = iterator.next();
watchingEntities.remove(roleContext.getEntity());
watchingEntities.remove(roleContext.getTxEntity());
- contexts.remove(roleContext.getNodeId());
+ contexts.remove(roleContext.getDeviceInfo());
if (roleContext.isTxCandidateRegistered()) {
- LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.");
- removeDeviceFromOperationalDS(roleContext.getNodeId());
- } else {
- roleContext.close();
+ LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.", roleContext.getDeviceInfo().getNodeId().getValue());
+ removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
}
+ roleContext.close();
}
}
@Override
- public void onDeviceContextLevelDown(final DeviceContext deviceContext) {
- final NodeId nodeId = deviceContext.getPrimaryConnectionContext().getNodeId();
- LOG.trace("onDeviceContextLevelDown for node {}", nodeId);
- final RoleContext roleContext = contexts.get(nodeId);
+ public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
+ LOG.trace("onDeviceContextLevelDown for node {}", deviceInfo.getNodeId());
+ final RoleContext roleContext = contexts.remove(deviceInfo);
if (roleContext != null) {
- LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", nodeId);
- if (roleContext.isMainCandidateRegistered()) {
- roleContext.unregisterCandidate(roleContext.getEntity());
- } else {
- contexts.remove(nodeId, roleContext);
- roleContext.close();
+ LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", deviceInfo.getNodeId());
+ roleContext.setState(OFPContext.CONTEXT_STATE.TERMINATION);
+ roleContext.unregisterCandidate(roleContext.getEntity());
+ if (roleContext.isTxCandidateRegistered()) {
+ LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.", deviceInfo.getNodeId().getValue());
+ removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
}
+ roleContext.close();
}
- deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceContext);
+ deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
}
@VisibleForTesting
Preconditions.checkArgument(ownershipChange != null);
final RoleContext roleContext = watchingEntities.get(ownershipChange.getEntity());
- LOG.debug("Received EOS message: wasOwner:{} isOwner:{} hasOwner:{} for entity type {} and node {}",
- ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner(),
- ownershipChange.getEntity().getType(),
- roleContext != null ? roleContext.getNodeId() : "-> no watching entity, disregarding notification <-");
+ if (Objects.nonNull(roleContext) && !roleContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
+
+ LOG.debug("Received EOS message: wasOwner:{} isOwner:{} hasOwner:{} inJeopardy:{} for entity type {} and node {}",
+ ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner(), ownershipChange.inJeopardy(),
+ ownershipChange.getEntity().getType(),
+ roleContext.getDeviceInfo().getNodeId());
- if (roleContext != null) {
if (ownershipChange.getEntity().equals(roleContext.getEntity())) {
changeOwnershipForMainEntity(ownershipChange, roleContext);
} else {
changeOwnershipForTxEntity(ownershipChange, roleContext);
}
+
} else {
- LOG.debug("OwnershipChange {}", ownershipChange);
+
+ LOG.debug("Role context for entity type {} is in state closing, disregarding ownership change notification.", ownershipChange.getEntity().getType());
+ watchingEntities.remove(ownershipChange.getEntity());
+
}
}
if (roleContext.isMainCandidateRegistered()) {
LOG.debug("Main-EntityOwnershipRegistration is active for entity type {} and node {}",
- ownershipChange.getEntity().getType(), roleContext.getNodeId());
- if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
+ ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
+ if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && !ownershipChange.inJeopardy()) {
// SLAVE -> MASTER
- LOG.debug("SLAVE to MASTER for node {}", roleContext.getNodeId());
+ LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
if (roleContext.registerCandidate(roleContext.getTxEntity())) {
- LOG.debug("Starting watching tx entity for node {}", roleContext.getNodeId());
+ LOG.debug("Starting watching tx entity for node {}", roleContext.getDeviceInfo().getNodeId());
watchingEntities.putIfAbsent(roleContext.getTxEntity(), roleContext);
}
- } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
+ } else if ((ownershipChange.wasOwner() && !ownershipChange.isOwner()) || (ownershipChange.inJeopardy())) {
// MASTER -> SLAVE
- LOG.debug("MASTER to SLAVE for node {}", roleContext.getNodeId());
- conductor.addOneTimeListenerWhenServicesChangesDone(this, roleContext.getNodeId());
+ LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
+ conductor.addOneTimeListenerWhenServicesChangesDone(this, roleContext.getDeviceInfo());
makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, false);
}
} else {
LOG.debug("Main-EntityOwnershipRegistration is not active for entity type {} and node {}",
- ownershipChange.getEntity(), roleContext.getNodeId());
+ ownershipChange.getEntity(), roleContext.getDeviceInfo().getNodeId());
watchingEntities.remove(ownershipChange.getEntity(), roleContext);
if (roleContext.isTxCandidateRegistered()) {
- LOG.debug("tx candidate still registered for node {}, probably connection lost, trying to unregister tx candidate", roleContext.getNodeId());
+ LOG.debug("tx candidate still registered for node {}, probably connection lost, trying to unregister tx candidate", roleContext.getDeviceInfo().getNodeId());
roleContext.unregisterCandidate(roleContext.getTxEntity());
if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && !ownershipChange.hasOwner()) {
- LOG.debug("Trying to remove from operational node: {}", roleContext.getNodeId());
- removeDeviceFromOperationalDS(roleContext.getNodeId());
+ LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
+ removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
}
} else {
- final NodeId nodeId = roleContext.getNodeId();
- contexts.remove(nodeId, roleContext);
+ contexts.remove(roleContext.getDeviceInfo(), roleContext);
roleContext.close();
- conductor.closeConnection(nodeId);
+ conductor.closeConnection(roleContext.getDeviceInfo());
}
}
}
if (roleContext.isTxCandidateRegistered()) {
LOG.debug("Tx-EntityOwnershipRegistration is active for entity type {} and node {}",
ownershipChange.getEntity().getType(),
- roleContext.getNodeId());
- if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
- // SLAVE -> MASTER
- LOG.debug("SLAVE to MASTER for node {}", roleContext.getNodeId());
- makeDeviceRoleChange(OfpRole.BECOMEMASTER, roleContext,false);
- } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
- // MASTER -> SLAVE
- LOG.debug("MASTER to SLAVE for node {}", roleContext.getNodeId());
- LOG.warn("Tx-EntityOwnershipRegistration lost leadership entity type {} and node {}",
- ownershipChange.getEntity().getType(),roleContext.getNodeId());
- watchingEntities.remove(roleContext.getTxEntity(), roleContext);
- watchingEntities.remove(roleContext.getEntity(), roleContext);
- roleContext.unregisterCandidate(roleContext.getEntity());
+ roleContext.getDeviceInfo().getNodeId());
+ if (ownershipChange.inJeopardy()) {
+ LOG.warn("Getting 'inJeopardy' flag from EOS. Removing txCandidate and stopping watching txCandidate.");
+ watchingEntities.remove(roleContext.getTxEntity());
roleContext.unregisterCandidate(roleContext.getTxEntity());
- if (!ownershipChange.hasOwner()) {
- LOG.debug("Trying to remove from operational node: {}", roleContext.getNodeId());
- removeDeviceFromOperationalDS(roleContext.getNodeId());
- } else {
- final NodeId nodeId = roleContext.getNodeId();
- contexts.remove(nodeId, roleContext);
- roleContext.close();
- conductor.closeConnection(nodeId);
+ } else {
+ if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
+ // SLAVE -> MASTER
+ LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
+ makeDeviceRoleChange(OfpRole.BECOMEMASTER, roleContext, false);
+ } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
+ // MASTER -> SLAVE
+ LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
+ LOG.warn("Tx-EntityOwnershipRegistration lost leadership entity type {} and node {}",
+ ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
+ watchingEntities.remove(roleContext.getTxEntity(), roleContext);
+ watchingEntities.remove(roleContext.getEntity(), roleContext);
+ roleContext.unregisterCandidate(roleContext.getEntity());
+ roleContext.unregisterCandidate(roleContext.getTxEntity());
+ if (!ownershipChange.hasOwner()) {
+ LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
+ removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
+ } else {
+ contexts.remove(roleContext.getDeviceInfo(), roleContext);
+ roleContext.close();
+ conductor.closeConnection(roleContext.getDeviceInfo());
+ }
}
}
} else {
- LOG.debug("Tx-EntityOwnershipRegistration is not active for entity {}", ownershipChange.getEntity().getType());
+ LOG.debug("Tx-EntityOwnershipRegistration is not active for entity type {} and node {}", ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
watchingEntities.remove(roleContext.getTxEntity(), roleContext);
- final NodeId nodeId = roleContext.getNodeId();
- contexts.remove(nodeId, roleContext);
+ contexts.remove(roleContext.getDeviceInfo(), roleContext);
roleContext.close();
- conductor.closeConnection(nodeId);
+ conductor.closeConnection(roleContext.getDeviceInfo());
}
}
Futures.addCallback(roleChangeFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
@Override
public void onSuccess(@Nullable final RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
- LOG.info("Role {} successfully set on device {}", role, roleContext.getNodeId());
- notifyListenersRoleChangeOnDevice(roleContext.getNodeId(), true, role, init);
+ LOG.info("Role {} successfully set on device {}", role, roleContext.getDeviceInfo().getNodeId());
+ if (!init) {
+ notifyListenersRoleChangeOnDevice(roleContext.getDeviceInfo(), role);
+ }
}
@Override
public void onFailure(@Nonnull final Throwable throwable) {
- LOG.warn("Unable to set role {} on device {}", role, roleContext.getNodeId());
- notifyListenersRoleChangeOnDevice(roleContext.getNodeId(), false, role, init);
+ LOG.warn("Unable to set role {} on device {}", role, roleContext.getDeviceInfo().getNodeId());
+ conductor.closeConnection(roleContext.getDeviceInfo());
}
});
}
-
- private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole, final RoleContext roleContext) {
- LOG.debug("Sending new role {} to device {}", newRole, roleContext.getNodeId());
+ @VisibleForTesting
+ ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole, final RoleContext roleContext) {
+ LOG.debug("Sending new role {} to device {}", newRole, roleContext.getDeviceInfo().getNodeId());
final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
- final Short version = conductor.gainVersionSafely(roleContext.getNodeId());
+ final Short version = roleContext.getDeviceInfo().getVersion();
if (null == version) {
LOG.debug("Device version is null");
return Futures.immediateFuture(null);
return Futures.immediateFuture(null);
} else {
final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
- .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(roleContext.getNodeId()))).build();
+ .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(roleContext.getDeviceInfo().getNodeId()))).build();
setRoleOutputFuture = roleContext.getSalRoleService().setRole(setRoleInput);
- final TimerTask timerTask = new TimerTask() {
-
- @Override
- public void run(final Timeout timeout) throws Exception {
- if (!setRoleOutputFuture.isDone()) {
- LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, roleContext.getNodeId());
- setRoleOutputFuture.cancel(true);
- }
+ final TimerTask timerTask = timeout -> {
+ if (!setRoleOutputFuture.isDone()) {
+ LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, roleContext.getDeviceInfo().getNodeId());
+ setRoleOutputFuture.cancel(true);
}
};
conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
}
@VisibleForTesting
- CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final NodeId nodeId) {
-
+ CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo, final int numRetries) {
final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
- delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(nodeId));
+ delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()));
final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
- Futures.addCallback(delFuture, new FutureCallback<Void>() {
+ Futures.addCallback(delFuture, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- LOG.debug("Delete Node {} was successful", nodeId);
- final RoleContext roleContext = contexts.remove(nodeId);
+ LOG.debug("Delete Node {} was successful", deviceInfo);
+ final RoleContext roleContext = contexts.remove(deviceInfo);
if (roleContext != null) {
roleContext.close();
}
@Override
public void onFailure(@Nonnull final Throwable t) {
- LOG.warn("Delete Node {} failed. {}", nodeId, t);
- contexts.remove(nodeId);
- final RoleContext roleContext = contexts.remove(nodeId);
+ // If we have any retries left, we will try to clean the datastore again
+ if (numRetries > 0) {
+ // We "used" one retry here, so decrement it
+ final int curRetries = numRetries - 1;
+ LOG.debug("Delete node {} failed with exception {}. Trying again (retries left: {})", deviceInfo.getNodeId(), t, curRetries);
+ // Recursive call to this method with "one less" retry
+ removeDeviceFromOperationalDS(deviceInfo, curRetries);
+ return;
+ }
+
+ // No retries left, so we will just close the role context, and ignore datastore cleanup
+ LOG.warn("Delete node {} failed with exception {}. No retries left, aborting", deviceInfo.getNodeId(), t);
+ final RoleContext roleContext = contexts.remove(deviceInfo);
if (roleContext != null) {
roleContext.close();
}
}
});
+
return delFuture;
}
}
@Override
- public void servicesChangeDone(final NodeId nodeId, final boolean success) {
- LOG.debug("Services stopping done for node {} as " + (success ? "successful" : "unsuccessful"), nodeId);
- final RoleContext roleContext = contexts.get(nodeId);
+ public void servicesChangeDone(final DeviceInfo deviceInfo, final boolean success) {
+ LOG.debug("Services stopping done for node {} as " + (success ? "successful" : "unsuccessful"), deviceInfo);
+ final RoleContext roleContext = contexts.get(deviceInfo);
if (null != roleContext) {
/* Services stopped or failure */
roleContext.unregisterCandidate(roleContext.getTxEntity());
}
@VisibleForTesting
- RoleContext getRoleContext(final NodeId nodeId){
- return contexts.get(nodeId);
+ RoleContext getRoleContext(final DeviceInfo deviceInfo){
+ return contexts.get(deviceInfo);
+ }
+
+ /**
+ * This method is only for testing
+ */
+ @VisibleForTesting
+ void setRoleContext(DeviceInfo deviceInfo, RoleContext roleContext){
+ if(!contexts.containsKey(deviceInfo)) {
+ contexts.put(deviceInfo, roleContext);
+ }
}
@Override
/**
* Invoked when initialization phase is done
- * @param nodeId node identification
+ * @param deviceInfo node identification
* @param success true if initialization done ok, false otherwise
*/
@VisibleForTesting
- void notifyListenersRoleInitializationDone(final NodeId nodeId, final boolean success){
+ void notifyListenersRoleInitializationDone(final DeviceInfo deviceInfo, final boolean success){
LOG.debug("Notifying registered listeners for role initialization done, no. of listeners {}", listeners.size());
for (final RoleChangeListener listener : listeners) {
- listener.roleInitializationDone(nodeId, success);
+ listener.roleInitializationDone(deviceInfo, success);
}
}
/**
* Notifies registered listener on role change. Role is the new role on device
* If initialization phase is true, we may skip service starting
- * @param success true if role change on device done ok, false otherwise
+ * @param deviceInfo
* @param role new role meant to be set on device
- * @param initializationPhase if true, then skipp services start
*/
@VisibleForTesting
- void notifyListenersRoleChangeOnDevice(final NodeId nodeId, final boolean success, final OfpRole role, final boolean initializationPhase){
+ void notifyListenersRoleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole role){
LOG.debug("Notifying registered listeners for role change, no. of listeners {}", listeners.size());
for (final RoleChangeListener listener : listeners) {
- listener.roleChangeOnDevice(nodeId, success, role, initializationPhase);
+ listener.roleChangeOnDevice(deviceInfo, role);
}
}
+ @Override
+ public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
+ return (T) contexts.get(deviceInfo);
+ }
}