X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendReadWriteTransaction.java;h=f9d9580214e41c2d28430c3a46c68b005ad6e8bc;hb=28e9832cc97a345d5ceb69262784e5c8fef77e37;hp=79d555c85695d7b437bf4831036ca6a8b9070f9b;hpb=5cb0787412ab63a3aa5dcc044511e1ce569662cf;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index 79d555c856..f9d9580214 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -11,6 +11,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; +import java.util.Collection; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; @@ -100,7 +101,8 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); return null; } else if (request instanceof TransactionAbortRequest) { - return handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + return null; } else { throw new UnsupportedRequestException(request); } @@ -139,11 +141,12 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private TransactionSuccess handleTransactionAbort(final TransactionAbortRequest request, + private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope, final long now) throws RequestException { if (readyCohort == null) { - openTransaction.abort(); - return new TransactionAbortSuccess(getIdentifier(), request.getSequence()); + openTransaction.abort(() -> recordAndSendSuccess(envelope, now, + new TransactionAbortSuccess(getIdentifier(), request.getSequence()))); + return; } readyCohort.abort(new FutureCallback() { @@ -162,7 +165,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure)); } }); - return null; } private void coordinatedCommit(final RequestEnvelope envelope, final long now) { @@ -194,7 +196,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { readyCohort = null; } }); - } private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { @@ -270,16 +271,19 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - final DataTreeModification modification = openTransaction.getSnapshot(); - for (TransactionModification m : request.getModifications()) { - if (m instanceof TransactionDelete) { - modification.delete(m.getPath()); - } else if (m instanceof TransactionWrite) { - modification.write(m.getPath(), ((TransactionWrite) m).getData()); - } else if (m instanceof TransactionMerge) { - modification.merge(m.getPath(), ((TransactionMerge) m).getData()); - } else { - LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m); + final Collection mods = request.getModifications(); + if (!mods.isEmpty()) { + final DataTreeModification modification = openTransaction.getSnapshot(); + for (TransactionModification m : mods) { + if (m instanceof TransactionDelete) { + modification.delete(m.getPath()); + } else if (m instanceof TransactionWrite) { + modification.write(m.getPath(), ((TransactionWrite) m).getData()); + } else if (m instanceof TransactionMerge) { + modification.merge(m.getPath(), ((TransactionMerge) m).getData()); + } else { + LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m); + } } } @@ -290,21 +294,32 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { switch (maybeProto.get()) { case ABORT: - openTransaction.abort(); + openTransaction.abort(() -> replyModifySuccess(request.getSequence())); openTransaction = null; + return null; + case READY: + ensureReady(); return replyModifySuccess(request.getSequence()); case SIMPLE: - readyCohort = openTransaction.ready(); - openTransaction = null; + ensureReady(); directCommit(envelope, now); return null; case THREE_PHASE: - readyCohort = openTransaction.ready(); - openTransaction = null; + ensureReady(); coordinatedCommit(envelope, now); return null; default: throw new UnsupportedRequestException(request); } } + + private void ensureReady() { + // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction + // only once. + if (readyCohort == null) { + readyCohort = openTransaction.ready(); + LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier()); + openTransaction = null; + } + } }