X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalProxyTransaction.java;h=2a81e1d0aa37714cf208beea5d9012cf893ff96c;hb=db3d7caeeb310f76a9a159f9a8d7e9beff89f645;hp=5e78f44905a39d9f44e0f50a2d150eac95f002bc;hpb=e5d320150e9cb40c338bf27f31a636fa5207d3eb;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 5e78f44905..2a81e1d0aa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -7,17 +7,19 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.FluentFuture; +import java.util.Optional; import java.util.function.Consumer; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; @@ -26,7 +28,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; -import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -56,8 +58,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { private final TransactionIdentifier identifier; - LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { - super(parent); + LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) { + super(parent, isDone); this.identifier = Preconditions.checkNotNull(identifier); } @@ -66,51 +68,110 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { return identifier; } + @Nonnull abstract DataTreeSnapshot readOnlyView(); - abstract void applyModifyTransactionRequest(ModifyTransactionRequest request, + abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request, @Nullable Consumer> callback); + abstract void replayModifyTransactionRequest(ModifyTransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); + @Override - final CheckedFuture doExists(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent()); + final FluentFuture doExists(final YangInstanceIdentifier path) { + return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path).isPresent()); } @Override - final CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(readOnlyView().readNode(path)); + final FluentFuture>> doRead(final YangInstanceIdentifier path) { + return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path)); } @Override - final void doAbort() { - sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> { - LOG.debug("Transaction {} abort completed with {}", identifier, response); - }); + final AbortLocalTransactionRequest abortRequest() { + return new AbortLocalTransactionRequest(identifier, localActor()); } @Override - void handleForwardedRemoteRequest(final TransactionRequest request, - final @Nullable Consumer> callback) { - if (request instanceof ModifyTransactionRequest) { - applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); - } else if (request instanceof ReadTransactionRequest) { + void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + if (request instanceof AbortLocalTransactionRequest) { + enqueueAbort(request, callback, enqueuedTicks); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } + } + + private boolean handleReadRequest(final TransactionRequest request, + @Nullable final Consumer> callback) { + // Note we delay completion of read requests to limit the scope at which the client can run, as they have + // listeners, which we do not want to execute while we are reconnecting. + if (request instanceof ReadTransactionRequest) { final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); final Optional> result = readOnlyView().readNode(path); - callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result)); + if (callback != null) { + // XXX: FB does not see that callback is final, on stack and has be check for non-null. + final Consumer> fbIsStupid = Preconditions.checkNotNull(callback); + executeInActor(() -> fbIsStupid.accept(new ReadTransactionSuccess(request.getTarget(), + request.getSequence(), result))); + } + return true; } else if (request instanceof ExistsTransactionRequest) { final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); final boolean result = readOnlyView().readNode(path).isPresent(); - callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result)); + if (callback != null) { + // XXX: FB does not see that callback is final, on stack and has be check for non-null. + final Consumer> fbIsStupid = Preconditions.checkNotNull(callback); + executeInActor(() -> fbIsStupid.accept(new ExistsTransactionSuccess(request.getTarget(), + request.getSequence(), result))); + } + return true; + } else { + return false; + } + } + + @Override + void handleReplayedRemoteRequest(final TransactionRequest request, + @Nullable final Consumer> callback, final long enqueuedTicks) { + if (request instanceof ModifyTransactionRequest) { + replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks); + } else if (handleReadRequest(request, callback)) { + // No-op + } else if (request instanceof TransactionPurgeRequest) { + enqueuePurge(callback, enqueuedTicks); + } else if (request instanceof IncrementTransactionSequenceRequest) { + // Local transactions do not have non-replayable requests which would be visible to the backend, + // hence we can skip sequence increments. + LOG.debug("Not replaying {}", request); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + + /** + * Remote-to-local equivalent of {@link #handleReplayedRemoteRequest(TransactionRequest, Consumer, long)}, + * except it is invoked in the forwarding path from + * {@link RemoteProxyTransaction#forwardToLocal(LocalProxyTransaction, TransactionRequest, Consumer)}. + * + * @param request Forwarded request + * @param callback Callback to be invoked once the request completes + */ + void handleForwardedRemoteRequest(final TransactionRequest request, final Consumer> callback) { + if (request instanceof ModifyTransactionRequest) { + applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback); + } else if (handleReadRequest(request, callback)) { + // No-op } else if (request instanceof TransactionPurgeRequest) { - purge(); + enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } } @Override - void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, - final Consumer> callback) { + final void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { if (request instanceof CommitLocalTransactionRequest) { final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request; final DataTreeModification mod = req.getModification(); @@ -133,15 +194,19 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } }); - successor.ensureSealed(); - + successor.sealOnly(); final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); successor.sendRequest(successorReq, callback); } else if (request instanceof AbortLocalTransactionRequest) { LOG.debug("Forwarding abort {} to successor {}", request, successor); successor.abort(); + } else if (request instanceof TransactionPurgeRequest) { + LOG.debug("Forwarding purge {} to successor {}", request, successor); + successor.enqueuePurge(callback); + } else if (request instanceof ModifyTransactionRequest) { + successor.handleForwardedRequest(request, callback); } else { - throw new IllegalArgumentException("Unhandled request" + request); + throwUnhandledRequest(request); } } @@ -151,15 +216,24 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); } else if (request instanceof TransactionPurgeRequest) { - successor.purge(); + successor.enqueuePurge(callback); } else { - throw new IllegalArgumentException("Unhandled request" + request); + throwUnhandledRequest(request); } LOG.debug("Forwarded request {} to successor {}", request, successor); } + private static void throwUnhandledRequest(final TransactionRequest request) { + throw new IllegalArgumentException("Unhandled request" + request); + } + void sendAbort(final TransactionRequest request, final Consumer> callback) { sendRequest(request, callback); } + + void enqueueAbort(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + enqueueRequest(request, callback, enqueuedTicks); + } }