X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalReadWriteProxyTransaction.java;h=c44e70eb304f4458aeb2d9782b9f9581abcbfa81;hp=424a9ea0db220752c979dbdae89672bfcb505790;hb=18ddbfdc55a1faddf7aeb2df6b25481d34c820ab;hpb=e1c283de301355cb8fa3f7d4fa28a6dd0af501eb diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java index 424a9ea0db..c44e70eb30 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -13,6 +13,7 @@ import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; @@ -84,10 +85,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) { - super(parent, identifier); + super(parent, identifier, false); this.modification = (CursorAwareDataTreeModification) snapshot.newModification(); } + LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { + super(parent, identifier, true); + // This is DONE transaction, this should never be touched + this.modification = null; + } + @Override boolean isSnapshotOnly() { return false; @@ -169,14 +176,25 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { return ret; } - @Override - void doSeal() { - Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", getIdentifier()); + private void sealModification() { + Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", this); final CursorAwareDataTreeModification mod = getModification(); mod.ready(); sealedModification = mod; } + @Override + void sealOnly() { + sealModification(); + super.sealOnly(); + } + + @Override + boolean sealAndSend(final com.google.common.base.Optional enqueuedTicks) { + sealModification(); + return super.sealAndSend(enqueuedTicks); + } + @Override void flushState(final AbstractProxyTransaction successor) { sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() { @@ -203,7 +221,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } @Override - void applyModifyTransactionRequest(final ModifyTransactionRequest request, + void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request, final @Nullable Consumer> callback) { commonModifyTransactionRequest(request, callback, this::sendRequest); } @@ -232,14 +250,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { final Optional maybeProtocol = request.getPersistenceProtocol(); if (maybeProtocol.isPresent()) { Verify.verify(callback != null, "Request {} has null callback", request); - ensureSealed(); + if (markSealed()) { + sealOnly(); + } switch (maybeProtocol.get()) { case ABORT: sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback); break; case READY: - // No-op, as we have already issued a seal() + // No-op, as we have already issued a sealOnly() and we are not transmitting anything break; case SIMPLE: sendMethod.accept(commitRequest(false), callback); @@ -257,7 +277,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, final Consumer> callback, final long now) { if (request instanceof CommitLocalTransactionRequest) { - sendCommit((CommitLocalTransactionRequest) request, callback); + enqueueRequest(rebaseCommit((CommitLocalTransactionRequest)request), callback, now); } else { super.handleReplayedLocalRequest(request, callback, now); } @@ -275,7 +295,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback, enqueuedTicks); } else if (request instanceof TransactionAbortRequest) { - enqueueAbort(callback, enqueuedTicks); + enqueueDoAbort(callback, enqueuedTicks); } else { super.handleReplayedRemoteRequest(request, callback, enqueuedTicks); } @@ -290,7 +310,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } else if (request instanceof TransactionDoCommitRequest) { sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); } else if (request instanceof TransactionAbortRequest) { - sendAbort(callback); + sendDoAbort(callback); } else { super.handleForwardedRemoteRequest(request, callback); } @@ -301,7 +321,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { final Consumer> callback) { if (request instanceof CommitLocalTransactionRequest) { Verify.verify(successor instanceof LocalReadWriteProxyTransaction); - ((LocalReadWriteProxyTransaction) successor).sendCommit((CommitLocalTransactionRequest)request, callback); + ((LocalReadWriteProxyTransaction) successor).sendRebased((CommitLocalTransactionRequest)request, callback); LOG.debug("Forwarded request {} to successor {}", request, successor); } else { super.forwardToLocal(successor, request, callback); @@ -321,15 +341,19 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { closedException = this::abortedException; } - private CursorAwareDataTreeModification getModification() { + private @Nonnull CursorAwareDataTreeModification getModification() { if (closedException != null) { throw closedException.get(); } - return modification; + return Preconditions.checkNotNull(modification, "Transaction %s is DONE", getIdentifier()); } - private void sendCommit(final CommitLocalTransactionRequest request, final Consumer> callback) { + private void sendRebased(final CommitLocalTransactionRequest request, final Consumer> callback) { + sendRequest(rebaseCommit(request), callback); + } + + private CommitLocalTransactionRequest rebaseCommit(final CommitLocalTransactionRequest request) { // Rebase old modification on new data tree. final CursorAwareDataTreeModification mod = getModification(); @@ -337,7 +361,10 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { request.getModification().applyToCursor(cursor); } - ensureSealed(); - sendRequest(commitRequest(request.isCoordinated()), callback); + if (markSealed()) { + sealOnly(); + } + + return commitRequest(request.isCoordinated()); } }