import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
+import java.util.Collection;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
return null;
} else if (request instanceof TransactionAbortRequest) {
- return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+ handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+ return null;
} else {
throw new UnsupportedRequestException(request);
}
});
}
- private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
+ private void handleTransactionAbort(final TransactionAbortRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
if (readyCohort == null) {
- openTransaction.abort();
- return new TransactionAbortSuccess(getIdentifier(), request.getSequence());
+ openTransaction.abort(() -> recordAndSendSuccess(envelope, now,
+ new TransactionAbortSuccess(getIdentifier(), request.getSequence())));
+ return;
}
readyCohort.abort(new FutureCallback<Void>() {
recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
}
});
- return null;
}
private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
readyCohort = null;
}
});
-
}
private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
- final DataTreeModification modification = openTransaction.getSnapshot();
- for (TransactionModification m : request.getModifications()) {
- if (m instanceof TransactionDelete) {
- modification.delete(m.getPath());
- } else if (m instanceof TransactionWrite) {
- modification.write(m.getPath(), ((TransactionWrite) m).getData());
- } else if (m instanceof TransactionMerge) {
- modification.merge(m.getPath(), ((TransactionMerge) m).getData());
- } else {
- LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+ final Collection<TransactionModification> mods = request.getModifications();
+ if (!mods.isEmpty()) {
+ final DataTreeModification modification = openTransaction.getSnapshot();
+ for (TransactionModification m : mods) {
+ if (m instanceof TransactionDelete) {
+ modification.delete(m.getPath());
+ } else if (m instanceof TransactionWrite) {
+ modification.write(m.getPath(), ((TransactionWrite) m).getData());
+ } else if (m instanceof TransactionMerge) {
+ modification.merge(m.getPath(), ((TransactionMerge) m).getData());
+ } else {
+ LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+ }
}
}
switch (maybeProto.get()) {
case ABORT:
- openTransaction.abort();
+ openTransaction.abort(() -> replyModifySuccess(request.getSequence()));
openTransaction = null;
+ return null;
+ case READY:
+ ensureReady();
return replyModifySuccess(request.getSequence());
case SIMPLE:
- readyCohort = openTransaction.ready();
- openTransaction = null;
+ ensureReady();
directCommit(envelope, now);
return null;
case THREE_PHASE:
- readyCohort = openTransaction.ready();
- openTransaction = null;
+ ensureReady();
coordinatedCommit(envelope, now);
return null;
default:
throw new UnsupportedRequestException(request);
}
}
+
+ private void ensureReady() {
+ // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
+ // only once.
+ if (readyCohort == null) {
+ readyCohort = openTransaction.ready();
+ LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier());
+ openTransaction = null;
+ }
+ }
}