X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalProxyTransaction.java;h=7b652f474b80aa8a0fbdd8f6e48214d5d7487370;hb=320a4e5cd2d9d80468a3f82798744f2035488218;hp=576fa67ed467afc05fe871c1db424aa2a45eca17;hpb=5fd8e6506248cc34da72281a1662612f6c2b2f9a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 576fa67ed4..7b652f474b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -9,18 +9,32 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import java.util.function.Consumer; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; +import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionMerge; +import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,18 +57,16 @@ import org.slf4j.LoggerFactory; @NotThreadSafe final class LocalProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class); - private static final Consumer> ABORT_COMPLETER = response -> { - LOG.debug("Abort completed with {}", response); - }; private final TransactionIdentifier identifier; - private DataTreeModification modification; - LocalProxyTransaction(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) { - super(client); + private CursorAwareDataTreeModification modification; + + LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, + final CursorAwareDataTreeModification modification) { + super(parent); this.identifier = Preconditions.checkNotNull(identifier); - this.modification = snapshot.newModification(); + this.modification = Preconditions.checkNotNull(modification); } @Override @@ -87,17 +99,26 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { return Futures.immediateCheckedFuture(modification.readNode(path)); } + private RuntimeException abortedException() { + return new IllegalStateException("Tracker " + identifier + " has been aborted"); + } + + private RuntimeException submittedException() { + return new IllegalStateException("Tracker " + identifier + " has been submitted"); + } + @Override void doAbort() { - sendRequest(new AbortLocalTransactionRequest(identifier, localActor()), ABORT_COMPLETER); - modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted")); + sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> { + LOG.debug("Transaction {} abort completed with {}", identifier, response); + }); } @Override - CommitLocalTransactionRequest doCommit(final boolean coordinated) { + CommitLocalTransactionRequest commitRequest(final boolean coordinated) { final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(), modification, coordinated); - modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted")); + modification = new FailedDataTreeModification(this::submittedException); return ret; } @@ -105,4 +126,121 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { void doSeal() { modification.ready(); } + + DataTreeSnapshot getSnapshot() { + return modification; + } + + private void applyModifyTransactionRequest(final ModifyTransactionRequest request, + final @Nullable Consumer> callback) { + for (TransactionModification mod : request.getModifications()) { + if (mod instanceof TransactionWrite) { + modification.write(mod.getPath(), ((TransactionWrite)mod).getData()); + } else if (mod instanceof TransactionMerge) { + modification.merge(mod.getPath(), ((TransactionMerge)mod).getData()); + } else if (mod instanceof TransactionDelete) { + modification.delete(mod.getPath()); + } else { + throw new IllegalArgumentException("Unsupported modification " + mod); + } + } + + final java.util.Optional maybeProtocol = request.getPersistenceProtocol(); + if (maybeProtocol.isPresent()) { + seal(); + Verify.verify(callback != null, "Request {} has null callback", request); + + switch (maybeProtocol.get()) { + case ABORT: + sendAbort(callback); + break; + case SIMPLE: + sendRequest(commitRequest(false), callback); + break; + case THREE_PHASE: + sendRequest(commitRequest(true), callback); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get()); + } + } + } + + @Override + void handleForwardedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback) { + LOG.debug("Applying forwaded request {}", request); + + if (request instanceof ModifyTransactionRequest) { + applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + + @Override + void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) throws RequestException { + if (request instanceof CommitLocalTransactionRequest) { + final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request; + final DataTreeModification mod = req.getModification(); + + LOG.debug("Applying modification {} to successor {}", mod, successor); + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + successor.write(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + successor.merge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + successor.delete(current().node(child)); + } + }); + + successor.seal(); + + final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); + successor.sendRequest(successorReq, callback); + } else if (request instanceof AbortLocalTransactionRequest) { + LOG.debug("Forwarding abort {} to successor {}", request, successor); + successor.abort(); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } + } + + @Override + void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) throws RequestException { + if (request instanceof AbortLocalTransactionRequest) { + successor.sendAbort(request, callback); + } else if (request instanceof CommitLocalTransactionRequest) { + successor.sendCommit((CommitLocalTransactionRequest)request, callback); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } + + LOG.debug("Forwarded request {} to successor {}", request, successor); + } + + private void sendAbort(final TransactionRequest request, final Consumer> callback) { + sendRequest(request, callback); + modification = new FailedDataTreeModification(this::abortedException); + } + + private void sendCommit(final CommitLocalTransactionRequest request, final Consumer> callback) { + // Rebase old modification on new data tree. + try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) { + request.getModification().applyToCursor(cursor); + } + + seal(); + sendRequest(commitRequest(request.isCoordinated()), callback); + } }