X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=e91bd6c8b031f935a08749272286ac565602d95c;hb=63bca3841f0187b5127f62fd04e4edcdce3a63c1;hp=347c7eac033ae1bf4a63db2f3d4ba0d26b981d6c;hpb=320a4e5cd2d9d80468a3f82798744f2035488218;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 347c7eac03..e91bd6c8b0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -13,8 +13,6 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; -import java.util.Collection; import java.util.function.Consumer; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; @@ -25,13 +23,16 @@ import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequ import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionMerge; import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; -import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -62,18 +63,28 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // FIXME: make this tuneable private static final int REQUEST_MAX_MODIFICATIONS = 1000; - private final Collection> successfulRequests = new ArrayList<>(); private final ModifyTransactionRequestBuilder builder; + private final boolean sendReadyOnSeal; + private final boolean snapshotOnly; private boolean builderBusy; private volatile Exception operationFailure; - RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { + + RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, + final boolean snapshotOnly, final boolean sendReadyOnSeal) { super(parent); + this.snapshotOnly = snapshotOnly; + this.sendReadyOnSeal = sendReadyOnSeal; builder = new ModifyTransactionRequestBuilder(identifier, localActor()); } + @Override + boolean isSnapshotOnly() { + return snapshotOnly; + } + @Override public TransactionIdentifier getIdentifier() { return builder.getIdentifier(); @@ -111,25 +122,25 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override CheckedFuture doExists(final YangInstanceIdentifier path) { final SettableFuture future = SettableFuture.create(); - return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path), - t -> completeExists(future, t), future); + return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, + isSnapshotOnly()), t -> completeExists(future, t), future); } @Override CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { final SettableFuture>> future = SettableFuture.create(); - return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path), - t -> completeRead(future, t), future); + return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, + isSnapshotOnly()), t -> completeRead(future, t), future); } @Override void doAbort() { - ensureInitializedBuider(); + ensureInitializedBuilder(); builder.setAbort(); flushBuilder(); } - private void ensureInitializedBuider() { + private void ensureInitializedBuilder() { if (!builderBusy) { builder.setSequence(nextSequence()); builderBusy = true; @@ -179,7 +190,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private void appendModification(final TransactionModification modification) { if (operationFailure == null) { - ensureInitializedBuider(); + ensureInitializedBuilder(); builder.addModification(modification); if (builder.size() >= REQUEST_MAX_MODIFICATIONS) { @@ -195,7 +206,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { if (response instanceof TransactionSuccess) { // Happy path - successfulRequests.add(request); + recordSuccessfulRequest(request); } else { recordFailedResponse(response); } @@ -229,6 +240,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else { failFuture(future, response); } + + recordFinishedRequest(); } private void completeRead(final SettableFuture>> future, @@ -240,11 +253,13 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else { failFuture(future, response); } + + recordFinishedRequest(); } @Override ModifyTransactionRequest commitRequest(final boolean coordinated) { - ensureInitializedBuider(); + ensureInitializedBuilder(); builder.setCommit(coordinated); final ModifyTransactionRequest ret = builder.build(); @@ -254,28 +269,29 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doSeal() { - // No-op + if (sendReadyOnSeal) { + ensureInitializedBuilder(); + builder.setReady(); + flushBuilder(); + } } @Override - void replaySuccessfulRequests(final AbstractProxyTransaction successor) { - super.replaySuccessfulRequests(successor); - - for (TransactionRequest req : successfulRequests) { - LOG.debug("Forwarding request {} to successor {}", req, successor); - successor.handleForwardedRemoteRequest(req, null); + void flushState(final AbstractProxyTransaction successor) { + if (builderBusy) { + final ModifyTransactionRequest request = builder.build(); + builderBusy = false; + successor.handleForwardedRemoteRequest(request, null); } - successfulRequests.clear(); } @Override void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, - final Consumer> callback) throws RequestException { + final Consumer> callback) { successor.handleForwardedRequest(request, callback); } - private void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) - throws RequestException { + private void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { if (request instanceof ModifyTransactionRequest) { final ModifyTransactionRequest req = (ModifyTransactionRequest) request; @@ -283,7 +299,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final java.util.Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { - seal(); + ensureSealed(); switch (maybeProto.get()) { case ABORT: @@ -295,10 +311,32 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { case THREE_PHASE: sendRequest(commitRequest(true), callback); break; + case READY: + //no op + break; default: throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); } } + } else if (request instanceof ReadTransactionRequest) { + ensureFlushedBuider(); + sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback); + } else if (request instanceof ExistsTransactionRequest) { + ensureFlushedBuider(); + sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback); + } else if (request instanceof TransactionPreCommitRequest) { + ensureFlushedBuider(); + sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionDoCommitRequest) { + ensureFlushedBuider(); + sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionAbortRequest) { + ensureFlushedBuider(); + sendAbort(callback); + } else if (request instanceof TransactionPurgeRequest) { + purge(); } else { throw new IllegalArgumentException("Unhandled request {}" + request); } @@ -306,7 +344,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, - final Consumer> callback) throws RequestException { + final Consumer> callback) { successor.handleForwardedRemoteRequest(request, callback); } }