X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=284be4a45793699576e412c88d559d15bc916bba;hb=refs%2Fchanges%2F10%2F78310%2F5;hp=09a8a605631c1f9d4b5cbe07e95a42a13077e85a;hpb=e1c283de301355cb8fa3f7d4fa28a6dd0af501eb;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 09a8a60563..284be4a457 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -7,19 +7,20 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.util.Optional; import java.util.function.Consumer; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; @@ -35,12 +36,14 @@ import org.opendaylight.controller.cluster.access.commands.TransactionPurgeReque import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.DataStoreUnavailableException; import org.opendaylight.mdsal.common.api.ReadFailedException; -import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -77,8 +80,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private volatile Exception operationFailure; RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, - final boolean snapshotOnly, final boolean sendReadyOnSeal) { - super(parent); + final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) { + super(parent, isDone); this.snapshotOnly = snapshotOnly; this.sendReadyOnSeal = sendReadyOnSeal; builder = new ModifyTransactionRequestBuilder(identifier, localActor()); @@ -96,52 +99,46 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doDelete(final YangInstanceIdentifier path) { - appendModification(new TransactionDelete(path), Optional.absent()); + appendModification(new TransactionDelete(path), Optional.empty()); } @Override void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionMerge(path, data), Optional.absent()); + appendModification(new TransactionMerge(path, data), Optional.empty()); } @Override void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionWrite(path, data), Optional.absent()); + appendModification(new TransactionWrite(path, data), Optional.empty()); } - private CheckedFuture sendReadRequest(final AbstractReadTransactionRequest request, + private FluentFuture sendReadRequest(final AbstractReadTransactionRequest request, final Consumer> completer, final ListenableFuture future) { // Check if a previous operation failed. If it has, do not bother sending anything and report a failure final Exception local = operationFailure; if (local != null) { - return Futures.immediateFailedCheckedFuture(new ReadFailedException("Previous operation failed", local)); + return FluentFutures.immediateFailedFluentFuture( + new ReadFailedException("Previous operation failed", local)); } // Make sure we send any modifications before issuing a read ensureFlushedBuider(); sendRequest(request, completer); - return MappingCheckedFuture.create(future, ReadFailedException.MAPPER); + return FluentFuture.from(future); } @Override - CheckedFuture doExists(final YangInstanceIdentifier path) { + FluentFuture doExists(final YangInstanceIdentifier path) { final SettableFuture future = SettableFuture.create(); return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, - isSnapshotOnly()), t -> completeExists(future, t), future); + isSnapshotOnly()), t -> completeExists(path, future, t), future); } @Override - CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { + FluentFuture>> doRead(final YangInstanceIdentifier path) { final SettableFuture>> future = SettableFuture.create(); return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, - isSnapshotOnly()), t -> completeRead(future, t), future); - } - - @Override - void doAbort() { - ensureInitializedBuilder(); - builder.setAbort(); - flushBuilder(); + isSnapshotOnly()), t -> completeRead(path, future, t), future); } private void ensureInitializedBuilder() { @@ -152,9 +149,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void ensureFlushedBuider() { - if (builderBusy) { - flushBuilder(); - } + ensureFlushedBuider(Optional.empty()); } private void ensureFlushedBuider(final Optional enqueuedTicks) { @@ -163,10 +158,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } } - private void flushBuilder() { - flushBuilder(Optional.absent()); - } - private void flushBuilder(final Optional enqueuedTicks) { final ModifyTransactionRequest request = builder.build(); builderBusy = false; @@ -183,7 +174,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void appendModification(final TransactionModification modification) { - appendModification(modification, Optional.absent()); + appendModification(modification, Optional.empty()); } private void appendModification(final TransactionModification modification, final Optional enqueuedTicks) { @@ -213,7 +204,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private Exception recordFailedResponse(final Response response) { final Exception failure; if (response instanceof RequestFailure) { - failure = ((RequestFailure) response).getCause(); + final RequestException cause = ((RequestFailure) response).getCause(); + failure = cause instanceof RequestTimeoutException + ? new DataStoreUnavailableException(cause.getMessage(), cause) : cause; } else { LOG.warn("Unhandled response {}", response); failure = new IllegalArgumentException("Unhandled response " + response.getClass()); @@ -226,69 +219,79 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { return failure; } - private void failFuture(final SettableFuture future, final Response response) { - future.setException(recordFailedResponse(response)); + private void failReadFuture(final SettableFuture future, final String message, + final Response response) { + future.setException(new ReadFailedException(message, recordFailedResponse(response))); } - private void completeExists(final SettableFuture future, final Response response) { - LOG.debug("Exists request completed with {}", response); + private void completeExists(final YangInstanceIdentifier path, final SettableFuture future, + final Response response) { + LOG.debug("Exists request for {} completed with {}", path, response); if (response instanceof ExistsTransactionSuccess) { future.set(((ExistsTransactionSuccess) response).getExists()); } else { - failFuture(future, response); + failReadFuture(future, "Error executing exists request for path " + path, response); } - recordFinishedRequest(); + recordFinishedRequest(response); } - private void completeRead(final SettableFuture>> future, - final Response response) { - LOG.debug("Read request completed with {}", response); + private void completeRead(final YangInstanceIdentifier path, + final SettableFuture>> future, final Response response) { + LOG.debug("Read request for {} completed with {}", path, response); if (response instanceof ReadTransactionSuccess) { future.set(((ReadTransactionSuccess) response).getData()); } else { - failFuture(future, response); + failReadFuture(future, "Error reading data for path " + path, response); } - recordFinishedRequest(); + recordFinishedRequest(response); } - private ModifyTransactionRequest abortRequest() { + @Override + ModifyTransactionRequest abortRequest() { ensureInitializedBuilder(); builder.setAbort(); - final ModifyTransactionRequest ret = builder.build(); builderBusy = false; - return ret; + return builder.build(); } @Override ModifyTransactionRequest commitRequest(final boolean coordinated) { ensureInitializedBuilder(); builder.setCommit(coordinated); + builderBusy = false; + return builder.build(); + } - final ModifyTransactionRequest ret = builder.build(); + private ModifyTransactionRequest readyRequest() { + ensureInitializedBuilder(); + builder.setReady(); builderBusy = false; - return ret; + return builder.build(); } @Override - void doSeal() { + boolean sealAndSend(final Optional enqueuedTicks) { if (sendReadyOnSeal) { ensureInitializedBuilder(); builder.setReady(); - flushBuilder(); + flushBuilder(enqueuedTicks); } + return super.sealAndSend(enqueuedTicks); } @Override - void flushState(final AbstractProxyTransaction successor) { - if (builderBusy) { - final ModifyTransactionRequest request = builder.build(); - builderBusy = false; - forwardToSuccessor(successor, request, null); + java.util.Optional flushState() { + if (!builderBusy) { + return java.util.Optional.empty(); } + + final ModifyTransactionRequest request = builder.build(); + builderBusy = false; + return java.util.Optional.of(request); } @Override @@ -297,58 +300,21 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { successor.handleForwardedRequest(request, callback); } - private void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { + void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { if (request instanceof ModifyTransactionRequest) { - final ModifyTransactionRequest req = (ModifyTransactionRequest) request; - - req.getModifications().forEach(this::appendModification); - - final java.util.Optional maybeProto = req.getPersistenceProtocol(); - if (maybeProto.isPresent()) { - ensureSealed(); - - final TransactionRequest tmp; - switch (maybeProto.get()) { - case ABORT: - tmp = abortRequest(); - sendRequest(tmp, resp -> { - completeModify(tmp, resp); - callback.accept(resp); - }); - break; - case SIMPLE: - tmp = commitRequest(false); - sendRequest(tmp, resp -> { - completeModify(tmp, resp); - callback.accept(resp); - }); - break; - case THREE_PHASE: - tmp = commitRequest(true); - sendRequest(tmp, resp -> { - recordSuccessfulRequest(tmp); - callback.accept(resp); - }); - break; - case READY: - //no op - break; - default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); - } - } + handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request); } else if (request instanceof ReadTransactionRequest) { ensureFlushedBuider(); sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); callback.accept(resp); }); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(); sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); callback.accept(resp); }); } else if (request instanceof TransactionPreCommitRequest) { @@ -364,14 +330,62 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); } else if (request instanceof TransactionAbortRequest) { ensureFlushedBuider(); - sendAbort(callback); + sendDoAbort(callback); } else if (request instanceof TransactionPurgeRequest) { - sendPurge(); + enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request {}" + request); } } + private void handleForwardedModifyTransactionRequest(final Consumer> callback, + final ModifyTransactionRequest req) { + req.getModifications().forEach(this::appendModification); + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } + + final TransactionRequest tmp; + switch (maybeProto.get()) { + case ABORT: + tmp = abortRequest(); + sendRequest(tmp, resp -> { + completeModify(tmp, resp); + callback.accept(resp); + }); + break; + case SIMPLE: + tmp = commitRequest(false); + sendRequest(tmp, resp -> { + completeModify(tmp, resp); + callback.accept(resp); + }); + break; + case THREE_PHASE: + tmp = commitRequest(true); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); + break; + case READY: + tmp = readyRequest(); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } + @Override void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, final Consumer> callback) { @@ -417,62 +431,24 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void handleReplayedRemoteRequest(final TransactionRequest request, - final @Nullable Consumer> callback, final long enqueuedTicks) { - final Consumer> cb = callback != null ? callback : resp -> { }; + @Nullable final Consumer> callback, final long enqueuedTicks) { + final Consumer> cb = callback != null ? callback : resp -> { /* NOOP */ }; final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); if (request instanceof ModifyTransactionRequest) { - final ModifyTransactionRequest req = (ModifyTransactionRequest) request; - for (TransactionModification mod : req.getModifications()) { - appendModification(mod, optTicks); - } - - final java.util.Optional maybeProto = req.getPersistenceProtocol(); - if (maybeProto.isPresent()) { - ensureSealed(); - - final TransactionRequest tmp; - switch (maybeProto.get()) { - case ABORT: - tmp = abortRequest(); - enqueueRequest(tmp, resp -> { - completeModify(tmp, resp); - cb.accept(resp); - }, enqueuedTicks); - break; - case SIMPLE: - tmp = commitRequest(false); - enqueueRequest(tmp, resp -> { - completeModify(tmp, resp); - cb.accept(resp); - }, enqueuedTicks); - break; - case THREE_PHASE: - tmp = commitRequest(true); - enqueueRequest(tmp, resp -> { - recordSuccessfulRequest(tmp); - cb.accept(resp); - }, enqueuedTicks); - break; - case READY: - //no op - break; - default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); - } - } + handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request); } else if (request instanceof ReadTransactionRequest) { ensureFlushedBuider(optTicks); enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(optTicks); enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); } else if (request instanceof TransactionPreCommitRequest) { @@ -489,11 +465,65 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { enqueuedTicks); } else if (request instanceof TransactionAbortRequest) { ensureFlushedBuider(optTicks); - enqueueAbort(callback, enqueuedTicks); + enqueueDoAbort(callback, enqueuedTicks); } else if (request instanceof TransactionPurgeRequest) { - enqueuePurge(enqueuedTicks); + enqueuePurge(callback, enqueuedTicks); + } else if (request instanceof IncrementTransactionSequenceRequest) { + final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request; + ensureFlushedBuider(optTicks); + enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(), + snapshotOnly, req.getIncrement()), callback, enqueuedTicks); + incrementSequence(req.getIncrement()); } else { throw new IllegalArgumentException("Unhandled request {}" + request); } } + + private void handleReplayedModifyTransactionRequest(final long enqueuedTicks, final Consumer> cb, + final ModifyTransactionRequest req) { + req.getModifications().forEach(this::appendModification); + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } + + final TransactionRequest tmp; + switch (maybeProto.get()) { + case ABORT: + tmp = abortRequest(); + enqueueRequest(tmp, resp -> { + completeModify(tmp, resp); + cb.accept(resp); + }, enqueuedTicks); + break; + case SIMPLE: + tmp = commitRequest(false); + enqueueRequest(tmp, resp -> { + completeModify(tmp, resp); + cb.accept(resp); + }, enqueuedTicks); + break; + case THREE_PHASE: + tmp = commitRequest(true); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + break; + case READY: + tmp = readyRequest(); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } }