return tx.handleRequest(request, envelope, now);
}
- void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
+ final void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
tree.closeTransactionChain(getIdentifier(),
() -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
}
- void purge(final long sequence, final RequestEnvelope envelope, final long now) {
+ final void purge(final long sequence, final RequestEnvelope envelope, final long now) {
LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
tree.purgeTransactionChain(getIdentifier(),
() -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
}
+ final void retire() {
+ transactions.values().forEach(FrontendTransaction::retire);
+ tree.removeTransactionChain(getIdentifier());
+ }
+
private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
throws RequestException {
if (request instanceof CommitLocalTransactionRequest) {
}
}
+ @Override
+ void retire() {
+ // No-op
+ }
+
private void handleModifyTransaction(final ModifyTransactionRequest request, final RequestEnvelope envelope,
final long now) {
// The only valid request here is with abort protocol
}
}
+ /**
+ * Retired state, needed to catch and suppress callbacks after we have removed associated state.
+ */
+ private static final class Retired extends State {
+ private final String prevStateString;
+
+ Retired(final State prevState) {
+ prevStateString = prevState.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "RETIRED (in " + prevStateString + ")";
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
private static final State ABORTED = new State() {
@Override
}
}
+ @Override
+ void retire() {
+ state = new Retired(state);
+ }
+
private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
throwIfFailed();
ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
- LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
- recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(getIdentifier(),
- request.getSequence()));
- ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
+ successfulPreCommit(envelope, now);
}
@Override
}
}
- private void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
+ void successfulPreCommit(final RequestEnvelope envelope, final long startTime) {
+ if (state instanceof Retired) {
+ LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(),
+ getIdentifier());
+ return;
+ }
+
+ final Ready ready = checkReady();
+ LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
+ recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(),
+ envelope.getMessage().getSequence()));
+ ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
+ }
+
+ void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
+ if (state instanceof Retired) {
+ LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause);
+ return;
+ }
+
recordAndSendFailure(envelope, now, cause);
state = new Failed(cause);
LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(getIdentifier(),
- envelope.getMessage().getSequence()));
- ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
- LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
+ successfulCanCommit(envelope, now);
}
@Override
}
}
+ void successfulCanCommit(final RequestEnvelope envelope, final long startTime) {
+ if (state instanceof Retired) {
+ LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(),
+ getIdentifier());
+ return;
+ }
+
+ final Ready ready = checkReady();
+ recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(),
+ envelope.getMessage().getSequence()));
+ ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
+ LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
+ }
+
private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
throwIfFailed();
}
void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
+ if (state instanceof Retired) {
+ LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier());
+ return;
+ }
+
final Ready ready = checkReady();
ready.stage = CommitStage.PRE_COMMIT_PENDING;
LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
}
void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
+ if (state instanceof Retired) {
+ LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier());
+ return;
+ }
+
final Ready ready = checkReady();
ready.stage = CommitStage.COMMIT_PENDING;
LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
}
void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+ if (state instanceof Retired) {
+ LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier());
+ return;
+ }
+
recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
envelope.getMessage().getSequence()));
state = COMMITTED;
abstract TransactionSuccess<?> doHandleRequest(TransactionRequest<?> request, RequestEnvelope envelope,
long now) throws RequestException;
+ abstract void retire();
+
private void recordResponse(final long sequence, final Object response) {
if (replayQueue.isEmpty()) {
firstReplaySequence = sequence;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.slf4j.Logger;
}
void retire() {
- // FIXME: flush all state
+ // Hunt down any transactions associated with this frontend
+ final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
+ while (it.hasNext()) {
+ final SimpleShardDataTreeCohort cohort = it.next();
+ if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
+ if (cohort.getState() != State.COMMIT_PENDING) {
+ LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
+ it.remove();
+ } else {
+ LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
+ cohort.getIdentifier());
+ }
+ }
+ }
+
+ // Clear out all transaction chains
+ localHistories.values().forEach(AbstractFrontendHistory::retire);
+ localHistories.clear();
+ standaloneHistory.retire();
}
@Override
private final FrontendMetadata frontendMetadata;
private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
+ private boolean paused;
private final MessageSlicer responseMessageSlicer;
private final Dispatchers dispatchers;
private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
throws RequestException {
// We are not the leader, hence we want to fail-fast.
- if (!isLeader() || !isLeaderActive()) {
- LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
- + "isLeadershipTransferInProgress: {}.",
- persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
+ if (!isLeader() || paused || !isLeaderActive()) {
+ LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}, paused: {}",
+ persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
throw new NotLeaderException(getSelf());
}
persistenceId(), getId());
}
+ paused = false;
store.purgeLeaderState();
}
@Override
protected void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.incrementLeadershipChangeCount();
+ paused = false;
- final boolean hasLeader = hasLeader();
- if (!hasLeader) {
- // No leader implies we are not the leader, lose frontend state if we have any. This also places
- // an explicit guard so the map will not get modified accidentally.
+ if (!isLeader()) {
if (!knownFrontends.isEmpty()) {
LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
knownFrontends = ImmutableMap.of();
}
- return;
- }
- if (!isLeader()) {
+ if (!hasLeader()) {
+ // No leader anywhere, nothing else to do
+ return;
+ }
+
// Another leader was elected. If we were the previous leader and had pending transactions, convert
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
@Override
protected void pauseLeader(final Runnable operation) {
LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ paused = true;
+
+ // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
+
store.setRunOnPendingTransactionsComplete(operation);
}
@Override
protected void unpauseLeader() {
LOG.debug("{}: In unpauseLeader", persistenceId());
+ paused = false;
+
store.setRunOnPendingTransactionsComplete(null);
+
+ // Restore tell-based protocol state as if we were becoming the leader
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
}
@Override
ShardStats getStats() {
return shard.getShardMBean();
}
+
+ Iterator<SimpleShardDataTreeCohort> cohortIterator() {
+ return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
+ e -> e.cohort).iterator();
+ }
+
+ void removeTransactionChain(final LocalHistoryIdentifier id) {
+ if (transactionChains.remove(id) != null) {
+ LOG.debug("{}: Removed transaction chain {}", logContext, id);
+ }
+ }
}