import java.util.Collection;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
// Sequence has already been checked
@Override
- @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+ @Nullable TransactionSuccess<?> doHandleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
final long now) throws RequestException {
if (request instanceof ModifyTransactionRequest) {
return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
return null;
} else if (request instanceof TransactionAbortRequest) {
- handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+ handleTransactionAbort(request.getSequence(), envelope, now);
+ return null;
+ } else if (request instanceof AbortLocalTransactionRequest) {
+ handleLocalTransactionAbort(request.getSequence(), envelope, now);
return null;
} else {
+ LOG.warn("Rejecting unsupported request {}", request);
throw new UnsupportedRequestException(request);
}
}
- @Override
- void purge(final Runnable callback) {
- openTransaction.purge(callback);
- }
-
private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
});
}
- private void handleTransactionAbort(final TransactionAbortRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
+ private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
+ Preconditions.checkState(readyCohort == null, "Transaction {} encountered local abort with commit underway",
+ getIdentifier());
+ openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+ sequence)));
+ }
+
+ private void handleTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
if (readyCohort == null) {
- openTransaction.abort(() -> recordAndSendSuccess(envelope, now,
- new TransactionAbortSuccess(getIdentifier(), request.getSequence())));
+ openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+ sequence)));
return;
}
@Override
public void onSuccess(final Void result) {
readyCohort = null;
- recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
- request.getSequence()));
+ recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
LOG.debug("Transaction {} aborted", getIdentifier());
}
});
}
- private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
+ void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
});
}
- private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
+ void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
@Override
public void onSuccess(final UnsignedLong result) {
successfulCommit(envelope, startTime);
});
}
- private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+ void successfulCommit(final RequestEnvelope envelope, final long startTime) {
recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
envelope.getMessage().getSequence()));
readyCohort = null;
private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
- if (sealedModification.equals(request.getModification())) {
+ if (!sealedModification.equals(request.getModification())) {
+ LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
+ throw new UnsupportedRequestException(request);
+ }
+
+ final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
+ if (optFailure.isPresent()) {
+ readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get());
+ } else {
readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+ }
- if (request.isCoordinated()) {
- coordinatedCommit(envelope, now);
- } else {
- directCommit(envelope, now);
- }
+ if (request.isCoordinated()) {
+ coordinatedCommit(envelope, now);
} else {
- throw new UnsupportedRequestException(request);
+ directCommit(envelope, now);
}
}
} else if (m instanceof TransactionMerge) {
modification.merge(m.getPath(), ((TransactionMerge) m).getData());
} else {
- LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+ LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m);
}
}
}
coordinatedCommit(envelope, now);
return null;
default:
+ LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get());
throw new UnsupportedRequestException(request);
}
}
// only once.
if (readyCohort == null) {
readyCohort = openTransaction.ready();
- LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier());
+ LOG.debug("{}: transitioned {} to ready", persistenceId(), openTransaction.getIdentifier());
openTransaction = null;
}
}