X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalReadWriteProxyTransaction.java;h=cbf316d88194e7cbbf59dccc057cbf9d6fc5f5dd;hb=51a85b6c8fce1d9808285a6ad81dc7068afbf7c7;hp=720ada3191e7fd5d06892e0583dcabd55097ffa6;hpb=3ee40198347cfb53bd0ce12ffd625cff8ed2383b;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java index 720ada3191..cbf316d881 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import java.util.Optional; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -201,8 +203,20 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } @Override - void applyModifyTransactionRequest(final ModifyTransactionRequest request, + void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request, final @Nullable Consumer> callback) { + commonModifyTransactionRequest(request, callback, this::sendRequest); + } + + @Override + void replayModifyTransactionRequest(final ModifyTransactionRequest request, + final @Nullable Consumer> callback, final long enqueuedTicks) { + commonModifyTransactionRequest(request, callback, (req, cb) -> enqueueRequest(req, cb, enqueuedTicks)); + } + + private void commonModifyTransactionRequest(final ModifyTransactionRequest request, + final @Nullable Consumer> callback, + final BiConsumer, Consumer>> sendMethod) { for (final TransactionModification mod : request.getModifications()) { if (mod instanceof TransactionWrite) { write(mod.getPath(), ((TransactionWrite)mod).getData()); @@ -215,23 +229,23 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } } - final java.util.Optional maybeProtocol = request.getPersistenceProtocol(); + final Optional maybeProtocol = request.getPersistenceProtocol(); if (maybeProtocol.isPresent()) { Verify.verify(callback != null, "Request {} has null callback", request); ensureSealed(); switch (maybeProtocol.get()) { case ABORT: - sendRequest(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback); + sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback); break; case READY: // No-op, as we have already issued a seal() break; case SIMPLE: - sendRequest(commitRequest(false), callback); + sendMethod.accept(commitRequest(false), callback); break; case THREE_PHASE: - sendRequest(commitRequest(true), callback); + sendMethod.accept(commitRequest(true), callback); break; default: throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get()); @@ -240,18 +254,35 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } @Override - void handleForwardedLocalRequest(final AbstractLocalTransactionRequest request, - final Consumer> callback) { + void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback, final long now) { if (request instanceof CommitLocalTransactionRequest) { sendCommit((CommitLocalTransactionRequest) request, callback); } else { - super.handleForwardedLocalRequest(request, callback); + super.handleReplayedLocalRequest(request, callback, now); } } @Override - void handleForwardedRemoteRequest(final TransactionRequest request, - final @Nullable Consumer> callback) { + void handleReplayedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback, final long enqueuedTicks) { + LOG.debug("Applying replayed request {}", request); + + if (request instanceof TransactionPreCommitRequest) { + enqueueRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } else if (request instanceof TransactionDoCommitRequest) { + enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } else if (request instanceof TransactionAbortRequest) { + enqueueDoAbort(callback, enqueuedTicks); + } else { + super.handleReplayedRemoteRequest(request, callback, enqueuedTicks); + } + } + + @Override + void handleForwardedRemoteRequest(final TransactionRequest request, final Consumer> callback) { LOG.debug("Applying forwarded request {}", request); if (request instanceof TransactionPreCommitRequest) { @@ -259,7 +290,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } else if (request instanceof TransactionDoCommitRequest) { sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); } else if (request instanceof TransactionAbortRequest) { - sendAbort(callback); + sendDoAbort(callback); } else { super.handleForwardedRemoteRequest(request, callback); } @@ -283,6 +314,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { closedException = this::abortedException; } + @Override + void enqueueAbort(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + super.enqueueAbort(request, callback, enqueuedTicks); + closedException = this::abortedException; + } + private CursorAwareDataTreeModification getModification() { if (closedException != null) { throw closedException.get();