import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.CheckedFuture;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
@Override
public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
- if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
- // Roles are not supported before OF1.3, so move forward.
- deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
- return;
- }
final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
makeEntity(deviceContext.getDeviceState().getNodeId()),
LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId);
final Optional<EntityOwnershipState> actState = entityOwnershipService.getOwnershipState(entity);
if (actState.isPresent()) {
- if (!actState.get().isOwner()) {
+ if (actState.get().isOwner()) {
+ if (!txContexts.containsKey(roleContext.getTxEntity())) {
+ try {
+ txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext);
+ roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
+ roleContext.setupTxCandidate();
+ // we'd like to wait for registration response
+ return;
+ } catch (final CandidateAlreadyRegisteredException e) {
+ // NOOP
+ }
+ }
+ } else {
LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId);
contexts.remove(entity, roleContext);
+ // TODO : is there a chance to have TxEntity ?
}
} else {
LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity);
@Override
public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
Preconditions.checkArgument(ownershipChange != null);
+ RoleContext roleCtxForClose = null;
try {
final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
if (roleContext != null) {
+ roleCtxForClose = roleContext;
changeForEntity(ownershipChange, roleContext);
return;
}
final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
if (txRoleContext != null) {
+ roleCtxForClose = txRoleContext;
changeForTxEntity(ownershipChange, txRoleContext);
return;
}
} catch (final InterruptedException e) {
LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
- // FIXME: consider forcibly closing this connection
+ if (roleCtxForClose != null) {
+ roleCtxForClose.close();
+ }
}
LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
final DeviceContext deviceContext = roleContext.getDeviceContext();
final NodeId nodeId = roleContext.getDeviceState().getNodeId();
+ if (!roleContext.getDeviceState().isValid()
+ && RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) {
+ LOG.debug("Node {} ownership changed during closing process", roleContext.getDeviceState().getNodeId());
+ roleContext.close();
+ txCandidateGuard.release();
+ return;
+ }
+
if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
// SLAVE -> MASTER - acquired transition lock
LOG.debug("Acquired tx-lock for entity {}", ownershipChange.getEntity());
try {
LOG.debug("Node {} is marked as LEADER", nodeId);
Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
- "RoleCtx for TxEntity {} master Node {} is still not closed.",
- roleContext.getTxEntity(), nodeId);
+ "RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId);
roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
// try to register tx-candidate via ownership service
roleContext.setupTxCandidate();
} catch (final CandidateAlreadyRegisteredException e) {
- LOG.debug("txCandidate registration failed");
+ LOG.warn("txCandidate registration failed {}", roleContext.getDeviceState().getNodeId(), e);
// --- CLEAN UP ---
// withdraw context from map in order to have it as before
txContexts.remove(roleContext.getTxEntity(), roleContext);
// no more propagating any role - there is no txCandidate lock approaching
roleContext.setPropagatingRole(null);
-
- Throwables.propagate(e);
+ roleContext.getDeviceContext().close();
}
return null;
}
final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard();
LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
mainCandidateGuard.acquire();
- //FIXME : check again implementation for double candidate scenario
LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
if (roleContext.getDeviceState().isValid()) {
} else {
txProcessCallback = null;
}
- } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.isOwner()) {
+ } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) {
// SLAVE -> MASTER
txProcessCallback = makeTxEntitySetupCallback(roleContext);
+ } else if (!ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
+ if (RoleContext.ROLE_CONTEXT_STATE.STARTING.equals(roleContext.getState())) {
+ rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
+ }
+ txProcessCallback = null;
} else {
LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
txProcessCallback = null;
// catching result
Futures.addCallback(rolePropagationFx, new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable final Void aVoid) {
- LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
- ownershipChange.getEntity(), oldRole, newRole);
- mainCandidateGuard.release();
- }
+ @Override
+ public void onSuccess(@Nullable final Void aVoid) {
+ LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
+ ownershipChange.getEntity(), oldRole, newRole);
+ roleContext.setPropagatingRole(newRole);
+ mainCandidateGuard.release();
+ }
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
- ownershipChange.getEntity(), oldRole, newRole);
- mainCandidateGuard.release();
- roleContext.getDeviceContext().close();
- }
- }
- );
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
+ ownershipChange.getEntity(), oldRole, newRole);
+ mainCandidateGuard.release();
+ roleContext.getDeviceContext().close();
+ }
+ });
} else {
LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
return delFuture;
}
- private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleChangeListener roleChangeListener) {
+
+ private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
- Futures.addCallback(removeDeviceFromOperDS(roleChangeListener), new FutureCallback<Void>() {
+ Futures.addCallback(removeDeviceFromOperDS(roleContext), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable final Void aVoid) {
- LOG.debug("Freeing roleContext slot for device: {}", roleChangeListener.getDeviceState().getNodeId());
- contexts.remove(ownershipChange.getEntity(), roleChangeListener);
- ((RoleContext) roleChangeListener).suspendTxCandidate();
+ LOG.debug("Freeing roleContext slot for device: {}", roleContext.getDeviceState().getNodeId());
+ contexts.remove(ownershipChange.getEntity(), roleContext);
+ roleContext.suspendTxCandidate();
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleChangeListener.getDeviceState()
+ LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleContext.getDeviceState()
.getNodeId(), throwable.getMessage());
- contexts.remove(ownershipChange.getEntity(), roleChangeListener);
- ((RoleContext) roleChangeListener).suspendTxCandidate();
+ contexts.remove(ownershipChange.getEntity(), roleContext);
+ roleContext.suspendTxCandidate();
}
});
}