X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalProxyTransaction.java;h=cad6b5f16d53bc46dd886fbc791e86a600c72e60;hp=1dec84631a2904f6c7b528b737ec14579d7c6010;hb=d6ed0a044d591d65847714451d97d80345154089;hpb=98d1c5606bad9633ce5549bcd691a98c75abdf6a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 1dec84631a..cad6b5f16d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -12,13 +12,22 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import java.util.function.Consumer; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; @@ -29,78 +38,123 @@ import org.slf4j.LoggerFactory; * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with * the client instance. * + *

* It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard * leader. * + *

* This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern. * * @author Robert Varga */ @NotThreadSafe -final class LocalProxyTransaction extends AbstractProxyTransaction { +abstract class LocalProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class); - private static final Consumer> ABORT_COMPLETER = response -> { - LOG.debug("Abort completed with {}", response); - }; private final TransactionIdentifier identifier; - private DataTreeModification modification; - LocalProxyTransaction(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) { - super(client); + LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { + super(parent); this.identifier = Preconditions.checkNotNull(identifier); - this.modification = snapshot.newModification(); } @Override - public TransactionIdentifier getIdentifier() { + public final TransactionIdentifier getIdentifier() { return identifier; } - @Override - void doDelete(final YangInstanceIdentifier path) { - modification.delete(path); - } + abstract DataTreeSnapshot readOnlyView(); + + abstract void applyModifyTransactionRequest(ModifyTransactionRequest request, + @Nullable Consumer> callback); @Override - void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - modification.merge(path, data); + final CheckedFuture doExists(final YangInstanceIdentifier path) { + return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent()); } @Override - void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - modification.write(path, data); + final CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { + return Futures.immediateCheckedFuture(readOnlyView().readNode(path)); } @Override - CheckedFuture doExists(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(modification.readNode(path).isPresent()); + final void doAbort() { + sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> { + LOG.debug("Transaction {} abort completed with {}", identifier, response); + }); } @Override - CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(modification.readNode(path)); + void handleForwardedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback) { + if (request instanceof ModifyTransactionRequest) { + applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); + } else if (request instanceof ReadTransactionRequest) { + final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); + final Optional> result = readOnlyView().readNode(path); + callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result)); + } else if (request instanceof ExistsTransactionRequest) { + final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); + final boolean result = readOnlyView().readNode(path).isPresent(); + callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result)); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } } @Override - void doAbort() { - client().sendRequest(nextSequence(), new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER); - modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted")); + void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + if (request instanceof CommitLocalTransactionRequest) { + final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request; + final DataTreeModification mod = req.getModification(); + + LOG.debug("Applying modification {} to successor {}", mod, successor); + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + successor.write(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + successor.merge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + successor.delete(current().node(child)); + } + }); + + successor.ensureSealed(); + + final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); + successor.sendRequest(successorReq, callback); + } else if (request instanceof AbortLocalTransactionRequest) { + LOG.debug("Forwarding abort {} to successor {}", request, successor); + successor.abort(); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } } @Override - CommitLocalTransactionRequest doCommit(final boolean coordinated) { - final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, client().self(), - modification, coordinated); - modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted")); - return ret; + void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + if (request instanceof AbortLocalTransactionRequest) { + successor.sendAbort(request, callback); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } + + LOG.debug("Forwarded request {} to successor {}", request, successor); } - @Override - void doSeal() { - modification.ready(); + void sendAbort(final TransactionRequest request, final Consumer> callback) { + sendRequest(request, callback); } }