X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=09a8a605631c1f9d4b5cbe07e95a42a13077e85a;hb=e1c283de301355cb8fa3f7d4fa28a6dd0af501eb;hp=e91bd6c8b031f935a08749272286ac565602d95c;hpb=1d7e8fd9d781f630dee9dfb1b509067dd7fb9caa;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index e91bd6c8b0..09a8a60563 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -8,14 +8,16 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.function.Consumer; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; @@ -36,10 +38,13 @@ import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +76,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private volatile Exception operationFailure; - RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean snapshotOnly, final boolean sendReadyOnSeal) { super(parent); @@ -92,17 +96,17 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doDelete(final YangInstanceIdentifier path) { - appendModification(new TransactionDelete(path)); + appendModification(new TransactionDelete(path), Optional.absent()); } @Override void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionMerge(path, data)); + appendModification(new TransactionMerge(path, data), Optional.absent()); } @Override void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionWrite(path, data)); + appendModification(new TransactionWrite(path, data), Optional.absent()); } private CheckedFuture sendReadRequest(final AbstractReadTransactionRequest request, @@ -153,48 +157,42 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } } + private void ensureFlushedBuider(final Optional enqueuedTicks) { + if (builderBusy) { + flushBuilder(enqueuedTicks); + } + } + private void flushBuilder() { + flushBuilder(Optional.absent()); + } + + private void flushBuilder(final Optional enqueuedTicks) { final ModifyTransactionRequest request = builder.build(); builderBusy = false; - sendModification(request); + sendModification(request, enqueuedTicks); } - private void sendModification(final TransactionRequest request) { - sendRequest(request, response -> completeModify(request, response)); - } - - @Override - void handleForwardedRemoteRequest(final TransactionRequest request, - final @Nullable Consumer> callback) { - nextSequence(); - - if (callback == null) { - sendModification(request); - return; + private void sendModification(final TransactionRequest request, final Optional enqueuedTicks) { + if (enqueuedTicks.isPresent()) { + enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue()); + } else { + sendRequest(request, response -> completeModify(request, response)); } - - /* - * FindBugs is utterly stupid, as it does not recognize the fact that we have checked for null - * and reports NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE in the lambda below. - */ - final Consumer> findBugsIsStupid = callback; - - // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the - // period required to get into the queue. - sendRequest(request, response -> { - findBugsIsStupid.accept(Preconditions.checkNotNull(response)); - completeModify(request, response); - }); } private void appendModification(final TransactionModification modification) { + appendModification(modification, Optional.absent()); + } + + private void appendModification(final TransactionModification modification, final Optional enqueuedTicks) { if (operationFailure == null) { ensureInitializedBuilder(); builder.addModification(modification); if (builder.size() >= REQUEST_MAX_MODIFICATIONS) { - flushBuilder(); + flushBuilder(enqueuedTicks); } } else { LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier()); @@ -257,6 +255,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { recordFinishedRequest(); } + private ModifyTransactionRequest abortRequest() { + ensureInitializedBuilder(); + builder.setAbort(); + final ModifyTransactionRequest ret = builder.build(); + builderBusy = false; + return ret; + } + @Override ModifyTransactionRequest commitRequest(final boolean coordinated) { ensureInitializedBuilder(); @@ -281,7 +287,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { if (builderBusy) { final ModifyTransactionRequest request = builder.build(); builderBusy = false; - successor.handleForwardedRemoteRequest(request, null); + forwardToSuccessor(successor, request, null); } } @@ -301,15 +307,28 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { if (maybeProto.isPresent()) { ensureSealed(); + final TransactionRequest tmp; switch (maybeProto.get()) { case ABORT: - sendAbort(callback); + tmp = abortRequest(); + sendRequest(tmp, resp -> { + completeModify(tmp, resp); + callback.accept(resp); + }); break; case SIMPLE: - sendRequest(commitRequest(false), callback); + tmp = commitRequest(false); + sendRequest(tmp, resp -> { + completeModify(tmp, resp); + callback.accept(resp); + }); break; case THREE_PHASE: - sendRequest(commitRequest(true), callback); + tmp = commitRequest(true); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); break; case READY: //no op @@ -321,14 +340,25 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else if (request instanceof ReadTransactionRequest) { ensureFlushedBuider(); sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback); + ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(); + callback.accept(resp); + }); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(); sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback); + ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(); + callback.accept(resp); + }); } else if (request instanceof TransactionPreCommitRequest) { ensureFlushedBuider(); - sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + final TransactionRequest tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(), + localActor()); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); } else if (request instanceof TransactionDoCommitRequest) { ensureFlushedBuider(); sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); @@ -336,7 +366,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ensureFlushedBuider(); sendAbort(callback); } else if (request instanceof TransactionPurgeRequest) { - purge(); + sendPurge(); } else { throw new IllegalArgumentException("Unhandled request {}" + request); } @@ -347,4 +377,123 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final Consumer> callback) { successor.handleForwardedRemoteRequest(request, callback); } + + @Override + void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + if (request instanceof CommitLocalTransactionRequest) { + replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks); + } else if (request instanceof AbortLocalTransactionRequest) { + enqueueRequest(abortRequest(), callback, enqueuedTicks); + } else { + throw new IllegalStateException("Unhandled request " + request); + } + } + + private void replayLocalCommitRequest(final CommitLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + final DataTreeModification mod = request.getModification(); + final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); + + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + appendModification(new TransactionWrite(current().node(child), data), optTicks); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + appendModification(new TransactionMerge(current().node(child), data), optTicks); + } + + @Override + public void delete(final PathArgument child) { + appendModification(new TransactionDelete(current().node(child)), optTicks); + } + }); + + enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks); + } + + @Override + void handleReplayedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback, final long enqueuedTicks) { + final Consumer> cb = callback != null ? callback : resp -> { }; + final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); + + if (request instanceof ModifyTransactionRequest) { + final ModifyTransactionRequest req = (ModifyTransactionRequest) request; + for (TransactionModification mod : req.getModifications()) { + appendModification(mod, optTicks); + } + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + ensureSealed(); + + final TransactionRequest tmp; + switch (maybeProto.get()) { + case ABORT: + tmp = abortRequest(); + enqueueRequest(tmp, resp -> { + completeModify(tmp, resp); + cb.accept(resp); + }, enqueuedTicks); + break; + case SIMPLE: + tmp = commitRequest(false); + enqueueRequest(tmp, resp -> { + completeModify(tmp, resp); + cb.accept(resp); + }, enqueuedTicks); + break; + case THREE_PHASE: + tmp = commitRequest(true); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + break; + case READY: + //no op + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } else if (request instanceof ReadTransactionRequest) { + ensureFlushedBuider(optTicks); + enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(); + cb.accept(resp); + }, enqueuedTicks); + } else if (request instanceof ExistsTransactionRequest) { + ensureFlushedBuider(optTicks); + enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(); + cb.accept(resp); + }, enqueuedTicks); + } else if (request instanceof TransactionPreCommitRequest) { + ensureFlushedBuider(optTicks); + final TransactionRequest tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(), + localActor()); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + } else if (request instanceof TransactionDoCommitRequest) { + ensureFlushedBuider(optTicks); + enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } else if (request instanceof TransactionAbortRequest) { + ensureFlushedBuider(optTicks); + enqueueAbort(callback, enqueuedTicks); + } else if (request instanceof TransactionPurgeRequest) { + enqueuePurge(enqueuedTicks); + } else { + throw new IllegalArgumentException("Unhandled request {}" + request); + } + } }