X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Frole%2FRoleManagerImpl.java;h=3ed9c45390a94cdd79a508e318b0421ebe6906af;hb=e077fe8509dd91a31bb9d0c109370cb2d78aeabf;hp=dfdf345f30dcd9cd3d292621bdfab231bd9c50d9;hpb=0101f06d582b42003ef6c6b216c455c0f4198a24;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java index dfdf345f30..3ed9c45390 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java @@ -7,17 +7,42 @@ */ package org.opendaylight.openflowplugin.impl.role; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; +import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; +import org.opendaylight.openflowplugin.api.openflow.role.RoleChangeListener; import org.opendaylight.openflowplugin.api.openflow.role.RoleContext; import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; +import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,70 +52,423 @@ import org.slf4j.LoggerFactory; * * Hands over to StatisticsManager at the end. */ -public class RoleManagerImpl implements RoleManager { +public class RoleManagerImpl implements RoleManager, EntityOwnershipListener { private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class); private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler; - private EntityOwnershipService entityOwnershipService; - private final RpcProviderRegistry rpcProviderRegistry; - private final ConcurrentHashMap contexts = new ConcurrentHashMap<>(); - private final OpenflowOwnershipListener openflowOwnershipListener; - - public RoleManagerImpl(RpcProviderRegistry rpcProviderRegistry, EntityOwnershipService entityOwnershipService) { - this.entityOwnershipService = entityOwnershipService; - this.rpcProviderRegistry = rpcProviderRegistry; - this.openflowOwnershipListener = new OpenflowOwnershipListener(entityOwnershipService); + private final DataBroker dataBroker; + private final EntityOwnershipService entityOwnershipService; + private final ConcurrentMap contexts = new ConcurrentHashMap<>(); + private final ConcurrentMap txContexts = new ConcurrentHashMap<>(); + private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration; + private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration; + private final boolean switchFeaturesMandatory; + + public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final boolean switchFeaturesMandatory) { + this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService); + this.dataBroker = Preconditions.checkNotNull(dataBroker); + this.switchFeaturesMandatory = switchFeaturesMandatory; + this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this)); + this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this)); LOG.debug("Registering OpenflowOwnershipListener listening to all entity ownership changes"); - openflowOwnershipListener.init(); } @Override - public void setDeviceInitializationPhaseHandler(DeviceInitializationPhaseHandler handler) { + public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) { deviceInitializationPhaseHandler = handler; } @Override - public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) { + public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception { LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId()); - if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) { - // Roles are not supported before OF1.3, so move forward. + + final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService, + makeEntity(deviceContext.getDeviceState().getNodeId()), + makeTxEntity(deviceContext.getDeviceState().getNodeId())); + // if the device context gets closed (mostly on connection close), we would need to cleanup + deviceContext.addDeviceContextClosedHandler(this); + final RoleContext previousContext = contexts.putIfAbsent(roleContext.getEntity(), roleContext); + Verify.verify(previousContext == null, + "RoleCtx for master Node {} is still not close.", deviceContext.getDeviceState().getNodeId()); + + roleContext.initialization(); + } + + void getRoleContextLevelUp(final DeviceContext deviceContext) { + LOG.debug("Created role context for node {}", deviceContext.getDeviceState().getNodeId()); + LOG.debug("roleChangeFuture success for device:{}. Moving to StatisticsManager", deviceContext.getDeviceState().getNodeId()); + try { deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext); + } catch (final Exception e) { + LOG.info("failed to complete levelUp on next handler for device {}", + deviceContext.getDeviceState().getNodeId()); + deviceContext.close(); + return; } + } - RoleContext roleContext = new RoleContextImpl(deviceContext, rpcProviderRegistry, entityOwnershipService, openflowOwnershipListener); - contexts.put(deviceContext, roleContext); - LOG.debug("Created role context"); + @Override + public void close() throws Exception { + entityOwnershipListenerRegistration.close(); + txEntityOwnershipListenerRegistration.close(); + for (final Map.Entry roleContextEntry : contexts.entrySet()) { + // got here because last known role is LEADER and DS might need clearing up + final Entity entity = roleContextEntry.getKey(); + final Optional ownershipState = entityOwnershipService.getOwnershipState(entity); + final NodeId nodeId = roleContextEntry.getValue().getDeviceState().getNodeId(); + if (ownershipState.isPresent()) { + if ((!ownershipState.get().hasOwner())) { + LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; " + + "cleaning DS as being probably the last owner", nodeId); + removeDeviceFromOperDS(roleContextEntry.getValue()); + } else { + // NOOP - there is another owner + LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; " + + "leaving DS untouched", nodeId); + } + } else { + // TODO: is this safe? When could this happen? + LOG.warn("Last role is LEADER but ownershipService returned empty ownership info for node: {}; " + + "cleaning DS ANYWAY!", nodeId); + removeDeviceFromOperDS(roleContextEntry.getValue()); + } + } + contexts.clear(); + } - // if the device context gets closed (mostly on connection close), we would need to cleanup - deviceContext.addDeviceContextClosedHandler(roleContext); + @Override + public void onDeviceContextClosed(final DeviceContext deviceContext) { + final NodeId nodeId = deviceContext.getDeviceState().getNodeId(); + LOG.debug("onDeviceContextClosed for node {}", nodeId); + final Entity entity = makeEntity(nodeId); + final RoleContext roleContext = contexts.get(entity); + if (roleContext != null) { + LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId); + final Optional actState = entityOwnershipService.getOwnershipState(entity); + if (actState.isPresent()) { + if (actState.get().isOwner()) { + if (!txContexts.containsKey(roleContext.getTxEntity())) { + try { + txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext); + roleContext.setPropagatingRole(OfpRole.BECOMEMASTER); + roleContext.setupTxCandidate(); + // we'd like to wait for registration response + return; + } catch (final CandidateAlreadyRegisteredException e) { + // NOOP + } + } + } else { + LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId); + contexts.remove(entity, roleContext); + // TODO : is there a chance to have TxEntity ? + } + } else { + LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity); + } + roleContext.close(); + } + } + + private static Entity makeEntity(final NodeId nodeId) { + return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue()); + } + + private static Entity makeTxEntity(final NodeId nodeId) { + return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue()); + } + + @Override + public void ownershipChanged(final EntityOwnershipChange ownershipChange) { + Preconditions.checkArgument(ownershipChange != null); + RoleContext roleCtxForClose = null; + try { + final RoleContext roleContext = contexts.get(ownershipChange.getEntity()); + if (roleContext != null) { + roleCtxForClose = roleContext; + changeForEntity(ownershipChange, roleContext); + return; + } + + final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity()); + if (txRoleContext != null) { + roleCtxForClose = txRoleContext; + changeForTxEntity(ownershipChange, txRoleContext); + return; + } + } catch (final InterruptedException e) { + LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity()); + if (roleCtxForClose != null) { + roleCtxForClose.close(); + } + } + + LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification", + ownershipChange.getEntity(), ownershipChange); + } + + private void changeForTxEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext) + throws InterruptedException { + LOG.info("Received TX-EntityOwnershipChange:{}", ownershipChange); + final Semaphore txCandidateGuard = roleContext.getTxCandidateGuard(); + LOG.trace("txCandidate lock queue: " + txCandidateGuard.getQueueLength()); + txCandidateGuard.acquire(); + + ListenableFuture processingClosure; + final DeviceContext deviceContext = roleContext.getDeviceContext(); + final NodeId nodeId = roleContext.getDeviceState().getNodeId(); + + if (!roleContext.getDeviceState().isValid() + && RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) { + LOG.debug("Node {} ownership changed during closing process", roleContext.getDeviceState().getNodeId()); + roleContext.close(); + txCandidateGuard.release(); + return; + } + + if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) { + // SLAVE -> MASTER - acquired transition lock + LOG.debug("Acquired tx-lock for entity {}", ownershipChange.getEntity()); + roleContext.setTxLockOwned(true); + final OfpRole role = roleContext.getDeviceState().getRole(); + Verify.verify(OfpRole.BECOMEMASTER.equals(roleContext.getPropagatingRole()), + "Acquired tx-lock but current role = {}", role); + + switch (roleContext.getState()) { + case STARTING: + processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER); + // activate stats - accomplished automatically by chaging role in deviceState + // collect initial dynamic data from device + processingClosure = Futures.transform(processingClosure, new AsyncFunction() { + @Nullable + @Override + public ListenableFuture apply(@Nullable final Void aVoid) { + deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER); + return DeviceInitializationUtils.initializeNodeInformation( + deviceContext, switchFeaturesMandatory); + } + }); + break; + case WORKING: + // activate txChainManager, activate rpcs + processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER); + // activate stats - accomplished automatically by chaging role in deviceState + processingClosure = Futures.transform(processingClosure, new Function() { + @Nullable + @Override + public Void apply(@Nullable final Void aVoid) { + deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER); + return null; + } + }); + break; + //case TEARING_DOWN: + default: + //TODO: reconsider if there is really nothing to do when tearing down + processingClosure = Futures.immediateFuture(null); + break; + } + } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) { + // MASTER -> SLAVE - released tx-lock + LOG.debug("Released tx-lock for entity {}", ownershipChange.getEntity()); + roleContext.setTxLockOwned(false); + txContexts.remove(roleContext.getTxEntity(), roleContext); + processingClosure = Futures.immediateFuture(null); + } else { + LOG.debug("NOOP state transition for TxEntity {} ", roleContext.getTxEntity()); + processingClosure = Futures.immediateFuture(null); + } + + // handle result of executed steps + Futures.addCallback(processingClosure, new FutureCallback() + + { + @Override + public void onSuccess(@Nullable final Void aVoid) { + // propagating role must be BECOMEMASTER in order to run this processing + // removing it will disable redundand processing of BECOMEMASTER + roleContext.setPropagatingRole(null); - roleContext.facilitateRoleChange(new FutureCallback() { + txCandidateGuard.release(); + switch (roleContext.getState()) { + case STARTING: + LOG.debug("init steps protected by tx-lock for node {} are done.", nodeId); + roleContext.promoteStateToWorking(); + getRoleContextLevelUp(deviceContext); + break; + case WORKING: + LOG.debug("normal steps protected by tx-lock for node {} are done.", nodeId); + break; + case TEARING_DOWN: + LOG.debug("teardown steps protected by tx-lock for node {} are done.", nodeId); + break; + } + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("Unexpected error for Node {}, state={}, txLock={} -> terminating device context", + nodeId, roleContext.getState(), roleContext.isTxLockOwned(), throwable); + txCandidateGuard.release(); + deviceContext.close(); + } + } + + ); + } + + private static Function makeTxEntitySuspendCallback(final RoleContext roleChangeListener) { + return new Function() { @Override - public void onSuccess(Boolean aBoolean) { - LOG.debug("roleChangeFuture success for device:{}. Moving to StatisticsManager", deviceContext.getDeviceState().getNodeId()); - deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext); + public Void apply(final Void result) { + roleChangeListener.suspendTxCandidate(); + return null; } + }; + } + private Function makeTxEntitySetupCallback(final RoleContext roleContext) { + return new Function() { @Override - public void onFailure(Throwable throwable) { - LOG.error("RoleChange on device {} was not successful after several attempts. " + - "Closing the device Context, reconnect the device and start over", - deviceContext.getPrimaryConnectionContext().getNodeId().getValue(), throwable); + public Void apply(final Void result) { + final NodeId nodeId = roleContext.getDeviceState().getNodeId(); try { - deviceContext.close(); - } catch (Exception e) { - LOG.warn("Error closing device context for device:{}", - deviceContext.getPrimaryConnectionContext().getNodeId().getValue(), e); + LOG.debug("Node {} is marked as LEADER", nodeId); + Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null, + "RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId); + roleContext.setPropagatingRole(OfpRole.BECOMEMASTER); + + // try to register tx-candidate via ownership service + roleContext.setupTxCandidate(); + } catch (final CandidateAlreadyRegisteredException e) { + LOG.warn("txCandidate registration failed {}", roleContext.getDeviceState().getNodeId(), e); + // --- CLEAN UP --- + // withdraw context from map in order to have it as before + txContexts.remove(roleContext.getTxEntity(), roleContext); + // no more propagating any role - there is no txCandidate lock approaching + roleContext.setPropagatingRole(null); + roleContext.getDeviceContext().close(); } + return null; } - }); + }; } - @Override - public void close() throws Exception { - for (Map.Entry roleContextEntry : contexts.entrySet()) { - roleContextEntry.getValue().close(); + private void changeForEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext) throws InterruptedException { + final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard(); + LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength()); + mainCandidateGuard.acquire(); + LOG.info("Received EntityOwnershipChange:{}", ownershipChange); + + if (roleContext.getDeviceState().isValid()) { + LOG.debug("RoleChange for entity {}", ownershipChange.getEntity()); + final OfpRole newRole = ownershipChange.isOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE; + final OfpRole oldRole = ownershipChange.wasOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE; + + // propagation start point + ListenableFuture rolePropagationFx = Futures.immediateFuture(null); + final Function txProcessCallback; + + if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) { + // MASTER -> SLAVE + rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole); + if (RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) { + txProcessCallback = makeTxEntitySuspendCallback(roleContext); + } else { + txProcessCallback = null; + } + } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) { + // SLAVE -> MASTER + txProcessCallback = makeTxEntitySetupCallback(roleContext); + } else if (!ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) { + if (RoleContext.ROLE_CONTEXT_STATE.STARTING.equals(roleContext.getState())) { + rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole); + } + txProcessCallback = null; + } else { + LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole); + txProcessCallback = null; + } + + if (txProcessCallback != null) { + rolePropagationFx = Futures.transform(rolePropagationFx, txProcessCallback); + } + + // catching result + Futures.addCallback(rolePropagationFx, new FutureCallback() { + @Override + public void onSuccess(@Nullable final Void aVoid) { + LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}", + ownershipChange.getEntity(), oldRole, newRole); + roleContext.setPropagatingRole(newRole); + mainCandidateGuard.release(); + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}", + ownershipChange.getEntity(), oldRole, newRole); + mainCandidateGuard.release(); + roleContext.getDeviceContext().close(); + } + }); + + } else { + LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity()); + mainCandidateGuard.release(); + // expecting that this roleContext will get closed in a moment + // FIXME: reconsider location of following cleanup logic + if (!ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) { + unregistrationHelper(ownershipChange, roleContext); + } else if (ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) { + contexts.remove(ownershipChange.getEntity(), roleContext); + roleContext.suspendTxCandidate(); + } else { + LOG.info("Unexpected role change msg {} for entity {}", ownershipChange, ownershipChange.getEntity()); + } } - this.openflowOwnershipListener.close(); + } + + private CheckedFuture removeDeviceFromOperDS( + final RoleChangeListener roleChangeListener) { + Preconditions.checkArgument(roleChangeListener != null); + final DeviceState deviceState = roleChangeListener.getDeviceState(); + final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction(); + delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier()); + final CheckedFuture delFuture = delWtx.submit(); + Futures.addCallback(delFuture, new FutureCallback() { + + @Override + public void onSuccess(final Void result) { + LOG.debug("Delete Node {} was successful", deviceState.getNodeId()); + } + + @Override + public void onFailure(final Throwable t) { + LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t); + } + }); + return delFuture; + } + + + private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) { + LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange); + Futures.addCallback(removeDeviceFromOperDS(roleContext), new FutureCallback() { + @Override + public void onSuccess(@Nullable final Void aVoid) { + LOG.debug("Freeing roleContext slot for device: {}", roleContext.getDeviceState().getNodeId()); + contexts.remove(ownershipChange.getEntity(), roleContext); + roleContext.suspendTxCandidate(); + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleContext.getDeviceState() + .getNodeId(), throwable.getMessage()); + contexts.remove(ownershipChange.getEntity(), roleContext); + roleContext.suspendTxCandidate(); + } + }); } }