X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendReadWriteTransaction.java;h=45edfc9333d9530b347e735756e48fc81c678b08;hp=79d555c85695d7b437bf4831036ca6a8b9070f9b;hb=d9e3fa9b7d4f1ad484931151c3b3ef237a674ab2;hpb=5cb0787412ab63a3aa5dcc044511e1ce569662cf diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index 79d555c856..45edfc9333 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -11,8 +11,10 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; +import java.util.Collection; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; @@ -82,7 +84,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { // Sequence has already been checked @Override - @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, + @Nullable TransactionSuccess doHandleRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { if (request instanceof ModifyTransactionRequest) { return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now); @@ -100,8 +102,13 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); return null; } else if (request instanceof TransactionAbortRequest) { - return handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + handleTransactionAbort(request.getSequence(), envelope, now); + return null; + } else if (request instanceof AbortLocalTransactionRequest) { + handleLocalTransactionAbort(request.getSequence(), envelope, now); + return null; } else { + LOG.warn("Rejecting unsupported request {}", request); throw new UnsupportedRequestException(request); } } @@ -139,19 +146,25 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private TransactionSuccess handleTransactionAbort(final TransactionAbortRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { + private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) { + Preconditions.checkState(readyCohort == null, "Transaction {} encountered local abort with commit underway", + getIdentifier()); + openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), + sequence))); + } + + private void handleTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) { if (readyCohort == null) { - openTransaction.abort(); - return new TransactionAbortSuccess(getIdentifier(), request.getSequence()); + openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), + sequence))); + return; } readyCohort.abort(new FutureCallback() { @Override public void onSuccess(final Void result) { readyCohort = null; - recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), - request.getSequence())); + recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence)); LOG.debug("Transaction {} aborted", getIdentifier()); } @@ -162,7 +175,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure)); } }); - return null; } private void coordinatedCommit(final RequestEnvelope envelope, final long now) { @@ -194,10 +206,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { readyCohort = null; } }); - } - private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { + void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { readyCohort.preCommit(new FutureCallback() { @Override public void onSuccess(final DataTreeCandidate result) { @@ -212,9 +223,8 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { + void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { readyCohort.commit(new FutureCallback() { - @Override public void onSuccess(final UnsignedLong result) { successfulCommit(envelope, startTime); @@ -228,7 +238,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private void successfulCommit(final RequestEnvelope envelope, final long startTime) { + void successfulCommit(final RequestEnvelope envelope, final long startTime) { recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(), envelope.getMessage().getSequence())); readyCohort = null; @@ -236,16 +246,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - if (sealedModification.equals(request.getModification())) { + if (!sealedModification.equals(request.getModification())) { + LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification()); + throw new UnsupportedRequestException(request); + } + + final java.util.Optional optFailure = request.getDelayedFailure(); + if (optFailure.isPresent()) { + readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()); + } else { readyCohort = history().createReadyCohort(getIdentifier(), sealedModification); + } - if (request.isCoordinated()) { - coordinatedCommit(envelope, now); - } else { - directCommit(envelope, now); - } + if (request.isCoordinated()) { + coordinatedCommit(envelope, now); } else { - throw new UnsupportedRequestException(request); + directCommit(envelope, now); } } @@ -270,16 +286,19 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - final DataTreeModification modification = openTransaction.getSnapshot(); - for (TransactionModification m : request.getModifications()) { - if (m instanceof TransactionDelete) { - modification.delete(m.getPath()); - } else if (m instanceof TransactionWrite) { - modification.write(m.getPath(), ((TransactionWrite) m).getData()); - } else if (m instanceof TransactionMerge) { - modification.merge(m.getPath(), ((TransactionMerge) m).getData()); - } else { - LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m); + final Collection mods = request.getModifications(); + if (!mods.isEmpty()) { + final DataTreeModification modification = openTransaction.getSnapshot(); + for (TransactionModification m : mods) { + if (m instanceof TransactionDelete) { + modification.delete(m.getPath()); + } else if (m instanceof TransactionWrite) { + modification.write(m.getPath(), ((TransactionWrite) m).getData()); + } else if (m instanceof TransactionMerge) { + modification.merge(m.getPath(), ((TransactionMerge) m).getData()); + } else { + LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m); + } } } @@ -290,21 +309,33 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { switch (maybeProto.get()) { case ABORT: - openTransaction.abort(); + openTransaction.abort(() -> replyModifySuccess(request.getSequence())); openTransaction = null; + return null; + case READY: + ensureReady(); return replyModifySuccess(request.getSequence()); case SIMPLE: - readyCohort = openTransaction.ready(); - openTransaction = null; + ensureReady(); directCommit(envelope, now); return null; case THREE_PHASE: - readyCohort = openTransaction.ready(); - openTransaction = null; + ensureReady(); coordinatedCommit(envelope, now); return null; default: + LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get()); throw new UnsupportedRequestException(request); } } + + private void ensureReady() { + // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction + // only once. + if (readyCohort == null) { + readyCohort = openTransaction.ready(); + LOG.debug("{}: transitioned {} to ready", persistenceId(), openTransaction.getIdentifier()); + openTransaction = null; + } + } }