X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendReadWriteTransaction.java;h=ced91b5acd0cd7b5449a90f70f266c92379b4bc8;hb=5e7cf2452ef634dc934a3ea5a2dd95059fbab68c;hp=0dcaac233ab6c4ee6d6566b1ecaa74594d17e0d5;hpb=d6ed0a044d591d65847714451d97d80345154089;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index 0dcaac233a..ced91b5acd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -101,8 +101,10 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); return null; } else if (request instanceof TransactionAbortRequest) { - return handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + return null; } else { + LOG.warn("Rejecting unsupported request {}", request); throw new UnsupportedRequestException(request); } } @@ -140,11 +142,12 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { }); } - private TransactionSuccess handleTransactionAbort(final TransactionAbortRequest request, + private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope, final long now) throws RequestException { if (readyCohort == null) { - openTransaction.abort(); - return new TransactionAbortSuccess(getIdentifier(), request.getSequence()); + openTransaction.abort(() -> recordAndSendSuccess(envelope, now, + new TransactionAbortSuccess(getIdentifier(), request.getSequence()))); + return; } readyCohort.abort(new FutureCallback() { @@ -163,7 +166,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure)); } }); - return null; } private void coordinatedCommit(final RequestEnvelope envelope, final long now) { @@ -214,7 +216,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { readyCohort.commit(new FutureCallback() { - @Override public void onSuccess(final UnsignedLong result) { successfulCommit(envelope, startTime); @@ -236,16 +237,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - if (sealedModification.equals(request.getModification())) { + if (!sealedModification.equals(request.getModification())) { + LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification()); + throw new UnsupportedRequestException(request); + } + + final java.util.Optional optFailure = request.getDelayedFailure(); + if (optFailure.isPresent()) { + readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()); + } else { readyCohort = history().createReadyCohort(getIdentifier(), sealedModification); + } - if (request.isCoordinated()) { - coordinatedCommit(envelope, now); - } else { - directCommit(envelope, now); - } + if (request.isCoordinated()) { + coordinatedCommit(envelope, now); } else { - throw new UnsupportedRequestException(request); + directCommit(envelope, now); } } @@ -293,9 +300,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { switch (maybeProto.get()) { case ABORT: - openTransaction.abort(); + openTransaction.abort(() -> replyModifySuccess(request.getSequence())); openTransaction = null; - return replyModifySuccess(request.getSequence()); + return null; case READY: ensureReady(); return replyModifySuccess(request.getSequence()); @@ -308,6 +315,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { coordinatedCommit(envelope, now); return null; default: + LOG.warn("{}: rejecting unsupported protocol {}", history().persistenceId(), maybeProto.get()); throw new UnsupportedRequestException(request); } }