X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=1ba96426df75feb7e38e4db0f217c8c4f98708b8;hp=26d718def5d267381f1ae1b5a0879fce7a2ca6e6;hb=1e07329c0d800b8fea43ae0c4060aded5fd18739;hpb=a510fba141230ce9fe8301f9eb0198cc09df46ca diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 26d718def5..1ba96426df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -13,25 +13,39 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionMerge; import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,17 +70,26 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private static final int REQUEST_MAX_MODIFICATIONS = 1000; private final ModifyTransactionRequestBuilder builder; + private final boolean sendReadyOnSeal; + private final boolean snapshotOnly; private boolean builderBusy; private volatile Exception operationFailure; - RemoteProxyTransaction(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier identifier) { - super(client); + RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, + final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) { + super(parent, isDone); + this.snapshotOnly = snapshotOnly; + this.sendReadyOnSeal = sendReadyOnSeal; builder = new ModifyTransactionRequestBuilder(identifier, localActor()); } + @Override + boolean isSnapshotOnly() { + return snapshotOnly; + } + @Override public TransactionIdentifier getIdentifier() { return builder.getIdentifier(); @@ -74,17 +97,17 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doDelete(final YangInstanceIdentifier path) { - appendModification(new TransactionDelete(path)); + appendModification(new TransactionDelete(path), Optional.absent()); } @Override void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionMerge(path, data)); + appendModification(new TransactionMerge(path, data), Optional.absent()); } @Override void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionWrite(path, data)); + appendModification(new TransactionWrite(path, data), Optional.absent()); } private CheckedFuture sendReadRequest(final AbstractReadTransactionRequest request, @@ -104,25 +127,18 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override CheckedFuture doExists(final YangInstanceIdentifier path) { final SettableFuture future = SettableFuture.create(); - return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path), - t -> completeExists(future, t), future); + return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, + isSnapshotOnly()), t -> completeExists(future, t), future); } @Override CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { final SettableFuture>> future = SettableFuture.create(); - return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path), - t -> completeRead(future, t), future); - } - - @Override - void doAbort() { - ensureInitializedBuider(); - builder.setAbort(); - flushBuilder(); + return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, + isSnapshotOnly()), t -> completeRead(future, t), future); } - private void ensureInitializedBuider() { + private void ensureInitializedBuilder() { if (!builderBusy) { builder.setSequence(nextSequence()); builderBusy = true; @@ -130,36 +146,53 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void ensureFlushedBuider() { + ensureFlushedBuider(Optional.absent()); + } + + private void ensureFlushedBuider(final Optional enqueuedTicks) { if (builderBusy) { - flushBuilder(); + flushBuilder(enqueuedTicks); } } - private void flushBuilder() { - final ModifyTransactionRequest message = builder.build(); + private void flushBuilder(final Optional enqueuedTicks) { + final ModifyTransactionRequest request = builder.build(); builderBusy = false; - sendRequest(message, this::completeModify); + sendModification(request, enqueuedTicks); + } + + private void sendModification(final TransactionRequest request, final Optional enqueuedTicks) { + if (enqueuedTicks.isPresent()) { + enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue()); + } else { + sendRequest(request, response -> completeModify(request, response)); + } } private void appendModification(final TransactionModification modification) { + appendModification(modification, Optional.absent()); + } + + private void appendModification(final TransactionModification modification, final Optional enqueuedTicks) { if (operationFailure == null) { - ensureInitializedBuider(); + ensureInitializedBuilder(); builder.addModification(modification); if (builder.size() >= REQUEST_MAX_MODIFICATIONS) { - flushBuilder(); + flushBuilder(enqueuedTicks); } } else { LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier()); } } - private void completeModify(final Response response) { - LOG.debug("Modification request completed with {}", response); + private void completeModify(final TransactionRequest request, final Response response) { + LOG.debug("Modification request {} completed with {}", request, response); if (response instanceof TransactionSuccess) { - // Happy path no-op + // Happy path + recordSuccessfulRequest(request); } else { recordFailedResponse(response); } @@ -193,6 +226,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else { failFuture(future, response); } + + recordFinishedRequest(response); } private void completeRead(final SettableFuture>> future, @@ -204,20 +239,277 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else { failFuture(future, response); } + + recordFinishedRequest(response); + } + + @Override + ModifyTransactionRequest abortRequest() { + ensureInitializedBuilder(); + builder.setAbort(); + builderBusy = false; + return builder.build(); } @Override - ModifyTransactionRequest doCommit(final boolean coordinated) { - ensureInitializedBuider(); + ModifyTransactionRequest commitRequest(final boolean coordinated) { + ensureInitializedBuilder(); builder.setCommit(coordinated); + builderBusy = false; + return builder.build(); + } - final ModifyTransactionRequest ret = builder.build(); + private ModifyTransactionRequest readyRequest() { + ensureInitializedBuilder(); + builder.setReady(); builderBusy = false; - return ret; + return builder.build(); } @Override - void doSeal() { - // No-op + boolean sealAndSend(final Optional enqueuedTicks) { + if (sendReadyOnSeal) { + ensureInitializedBuilder(); + builder.setReady(); + flushBuilder(enqueuedTicks); + } + return super.sealAndSend(enqueuedTicks); + } + + @Override + void flushState(final AbstractProxyTransaction successor) { + if (builderBusy) { + final ModifyTransactionRequest request = builder.build(); + builderBusy = false; + forwardToSuccessor(successor, request, null); + } + } + + @Override + void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + successor.handleForwardedRequest(request, callback); + } + + private void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { + if (request instanceof ModifyTransactionRequest) { + final ModifyTransactionRequest req = (ModifyTransactionRequest) request; + + req.getModifications().forEach(this::appendModification); + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } + + final TransactionRequest tmp; + switch (maybeProto.get()) { + case ABORT: + tmp = abortRequest(); + sendRequest(tmp, resp -> { + completeModify(tmp, resp); + callback.accept(resp); + }); + break; + case SIMPLE: + tmp = commitRequest(false); + sendRequest(tmp, resp -> { + completeModify(tmp, resp); + callback.accept(resp); + }); + break; + case THREE_PHASE: + tmp = commitRequest(true); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); + break; + case READY: + tmp = readyRequest(); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } else if (request instanceof ReadTransactionRequest) { + ensureFlushedBuider(); + sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(resp); + callback.accept(resp); + }); + } else if (request instanceof ExistsTransactionRequest) { + ensureFlushedBuider(); + sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(resp); + callback.accept(resp); + }); + } else if (request instanceof TransactionPreCommitRequest) { + ensureFlushedBuider(); + final TransactionRequest tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(), + localActor()); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); + } else if (request instanceof TransactionDoCommitRequest) { + ensureFlushedBuider(); + sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionAbortRequest) { + ensureFlushedBuider(); + sendDoAbort(callback); + } else if (request instanceof TransactionPurgeRequest) { + enqueuePurge(callback); + } else { + throw new IllegalArgumentException("Unhandled request {}" + request); + } + } + + @Override + void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + successor.handleForwardedRemoteRequest(request, callback); + } + + @Override + void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + if (request instanceof CommitLocalTransactionRequest) { + replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks); + } else if (request instanceof AbortLocalTransactionRequest) { + enqueueRequest(abortRequest(), callback, enqueuedTicks); + } else { + throw new IllegalStateException("Unhandled request " + request); + } + } + + private void replayLocalCommitRequest(final CommitLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + final DataTreeModification mod = request.getModification(); + final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); + + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + appendModification(new TransactionWrite(current().node(child), data), optTicks); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + appendModification(new TransactionMerge(current().node(child), data), optTicks); + } + + @Override + public void delete(final PathArgument child) { + appendModification(new TransactionDelete(current().node(child)), optTicks); + } + }); + + enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks); + } + + @Override + void handleReplayedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback, final long enqueuedTicks) { + final Consumer> cb = callback != null ? callback : resp -> { }; + final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); + + if (request instanceof ModifyTransactionRequest) { + final ModifyTransactionRequest req = (ModifyTransactionRequest) request; + for (TransactionModification mod : req.getModifications()) { + appendModification(mod, optTicks); + } + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } + + final TransactionRequest tmp; + switch (maybeProto.get()) { + case ABORT: + tmp = abortRequest(); + enqueueRequest(tmp, resp -> { + completeModify(tmp, resp); + cb.accept(resp); + }, enqueuedTicks); + break; + case SIMPLE: + tmp = commitRequest(false); + enqueueRequest(tmp, resp -> { + completeModify(tmp, resp); + cb.accept(resp); + }, enqueuedTicks); + break; + case THREE_PHASE: + tmp = commitRequest(true); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + break; + case READY: + tmp = readyRequest(); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } else if (request instanceof ReadTransactionRequest) { + ensureFlushedBuider(optTicks); + enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(resp); + cb.accept(resp); + }, enqueuedTicks); + } else if (request instanceof ExistsTransactionRequest) { + ensureFlushedBuider(optTicks); + enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(resp); + cb.accept(resp); + }, enqueuedTicks); + } else if (request instanceof TransactionPreCommitRequest) { + ensureFlushedBuider(optTicks); + final TransactionRequest tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(), + localActor()); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + } else if (request instanceof TransactionDoCommitRequest) { + ensureFlushedBuider(optTicks); + enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } else if (request instanceof TransactionAbortRequest) { + ensureFlushedBuider(optTicks); + enqueueDoAbort(callback, enqueuedTicks); + } else if (request instanceof TransactionPurgeRequest) { + enqueuePurge(callback, enqueuedTicks); + } else if (request instanceof IncrementTransactionSequenceRequest) { + final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request; + ensureFlushedBuider(optTicks); + enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(), + snapshotOnly, req.getIncrement()), callback, enqueuedTicks); + incrementSequence(req.getIncrement()); + } else { + throw new IllegalArgumentException("Unhandled request {}" + request); + } } }