X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalProxyTransaction.java;h=89402091fc5173996bbe25f2d711408999902537;hp=1dec84631a2904f6c7b528b737ec14579d7c6010;hb=20a32e6459fd1e27e7669bf1ebc7742b96787b94;hpb=98d1c5606bad9633ce5549bcd691a98c75abdf6a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 1dec84631a..89402091fc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -12,13 +12,26 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import java.util.function.Consumer; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; @@ -29,78 +42,199 @@ import org.slf4j.LoggerFactory; * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with * the client instance. * + *

* It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard * leader. * + *

* This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern. * * @author Robert Varga */ @NotThreadSafe -final class LocalProxyTransaction extends AbstractProxyTransaction { +abstract class LocalProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class); - private static final Consumer> ABORT_COMPLETER = response -> { - LOG.debug("Abort completed with {}", response); - }; private final TransactionIdentifier identifier; - private DataTreeModification modification; - LocalProxyTransaction(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) { - super(client); + LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) { + super(parent, isDone); this.identifier = Preconditions.checkNotNull(identifier); - this.modification = snapshot.newModification(); } @Override - public TransactionIdentifier getIdentifier() { + public final TransactionIdentifier getIdentifier() { return identifier; } + @Nonnull + abstract DataTreeSnapshot readOnlyView(); + + abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request, + @Nullable Consumer> callback); + + abstract void replayModifyTransactionRequest(ModifyTransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); + @Override - void doDelete(final YangInstanceIdentifier path) { - modification.delete(path); + final CheckedFuture doExists(final YangInstanceIdentifier path) { + return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent()); } @Override - void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - modification.merge(path, data); + final CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { + return Futures.immediateCheckedFuture(Optional.fromJavaUtil(readOnlyView().readNode(path))); } @Override - void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - modification.write(path, data); + final AbortLocalTransactionRequest abortRequest() { + return new AbortLocalTransactionRequest(identifier, localActor()); } @Override - CheckedFuture doExists(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(modification.readNode(path).isPresent()); + void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + if (request instanceof AbortLocalTransactionRequest) { + enqueueAbort(request, callback, enqueuedTicks); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } } - @Override - CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(modification.readNode(path)); + private boolean handleReadRequest(final TransactionRequest request, + @Nullable final Consumer> callback) { + // Note we delay completion of read requests to limit the scope at which the client can run, as they have + // listeners, which we do not want to execute while we are reconnecting. + if (request instanceof ReadTransactionRequest) { + final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); + final Optional> result = Optional.fromJavaUtil(readOnlyView().readNode(path)); + if (callback != null) { + // XXX: FB does not see that callback is final, on stack and has be check for non-null. + final Consumer> fbIsStupid = Preconditions.checkNotNull(callback); + executeInActor(() -> fbIsStupid.accept(new ReadTransactionSuccess(request.getTarget(), + request.getSequence(), result))); + } + return true; + } else if (request instanceof ExistsTransactionRequest) { + final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); + final boolean result = readOnlyView().readNode(path).isPresent(); + if (callback != null) { + // XXX: FB does not see that callback is final, on stack and has be check for non-null. + final Consumer> fbIsStupid = Preconditions.checkNotNull(callback); + executeInActor(() -> fbIsStupid.accept(new ExistsTransactionSuccess(request.getTarget(), + request.getSequence(), result))); + } + return true; + } else { + return false; + } } @Override - void doAbort() { - client().sendRequest(nextSequence(), new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER); - modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted")); + void handleReplayedRemoteRequest(final TransactionRequest request, + @Nullable final Consumer> callback, final long enqueuedTicks) { + if (request instanceof ModifyTransactionRequest) { + replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks); + } else if (handleReadRequest(request, callback)) { + // No-op + } else if (request instanceof TransactionPurgeRequest) { + enqueuePurge(callback, enqueuedTicks); + } else if (request instanceof IncrementTransactionSequenceRequest) { + // Local transactions do not have non-replayable requests which would be visible to the backend, + // hence we can skip sequence increments. + LOG.debug("Not replaying {}", request); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + + /** + * Remote-to-local equivalent of {@link #handleReplayedRemoteRequest(TransactionRequest, Consumer, long)}, + * except it is invoked in the forwarding path from + * {@link RemoteProxyTransaction#forwardToLocal(LocalProxyTransaction, TransactionRequest, Consumer)}. + * + * @param request Forwarded request + * @param callback Callback to be invoked once the request completes + */ + void handleForwardedRemoteRequest(final TransactionRequest request, final Consumer> callback) { + if (request instanceof ModifyTransactionRequest) { + applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback); + } else if (handleReadRequest(request, callback)) { + // No-op + } else if (request instanceof TransactionPurgeRequest) { + enqueuePurge(callback); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } } @Override - CommitLocalTransactionRequest doCommit(final boolean coordinated) { - final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, client().self(), - modification, coordinated); - modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted")); - return ret; + final void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + if (request instanceof CommitLocalTransactionRequest) { + final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request; + final DataTreeModification mod = req.getModification(); + + LOG.debug("Applying modification {} to successor {}", mod, successor); + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + successor.write(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + successor.merge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + successor.delete(current().node(child)); + } + }); + + successor.sealOnly(); + final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); + successor.sendRequest(successorReq, callback); + } else if (request instanceof AbortLocalTransactionRequest) { + LOG.debug("Forwarding abort {} to successor {}", request, successor); + successor.abort(); + } else if (request instanceof TransactionPurgeRequest) { + LOG.debug("Forwarding purge {} to successor {}", request, successor); + successor.enqueuePurge(callback); + } else if (request instanceof ModifyTransactionRequest) { + successor.handleForwardedRequest(request, callback); + } else { + throwUnhandledRequest(request); + } } @Override - void doSeal() { - modification.ready(); + void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + if (request instanceof AbortLocalTransactionRequest) { + successor.sendAbort(request, callback); + } else if (request instanceof TransactionPurgeRequest) { + successor.enqueuePurge(callback); + } else { + throwUnhandledRequest(request); + } + + LOG.debug("Forwarded request {} to successor {}", request, successor); + } + + private static void throwUnhandledRequest(final TransactionRequest request) { + throw new IllegalArgumentException("Unhandled request" + request); + } + + void sendAbort(final TransactionRequest request, final Consumer> callback) { + sendRequest(request, callback); + } + + void enqueueAbort(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + enqueueRequest(request, callback, enqueuedTicks); } }