X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalProxyTransaction.java;h=5e78f44905a39d9f44e0f50a2d150eac95f002bc;hb=refs%2Fchanges%2F12%2F52212%2F3;hp=e9941179c7aa7c599435defda9b2f6a9165ff862;hpb=b5444f8c2c10ded63d6a9e890db61b0f3aa2095e;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index e9941179c7..5e78f44905 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Verify; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import java.util.function.Consumer; @@ -20,17 +19,10 @@ import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactio import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionDelete; -import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionMerge; -import org.opendaylight.controller.cluster.access.commands.TransactionModification; -import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; @@ -38,9 +30,7 @@ import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,161 +51,58 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @NotThreadSafe -final class LocalProxyTransaction extends AbstractProxyTransaction { +abstract class LocalProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class); private final TransactionIdentifier identifier; - private CursorAwareDataTreeModification modification; - private CursorAwareDataTreeModification sealedModification; - - LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, - final CursorAwareDataTreeModification modification) { + LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { super(parent); this.identifier = Preconditions.checkNotNull(identifier); - this.modification = Preconditions.checkNotNull(modification); } @Override - public TransactionIdentifier getIdentifier() { + public final TransactionIdentifier getIdentifier() { return identifier; } - @Override - void doDelete(final YangInstanceIdentifier path) { - modification.delete(path); - } + abstract DataTreeSnapshot readOnlyView(); - @Override - void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - modification.merge(path, data); - } + abstract void applyModifyTransactionRequest(ModifyTransactionRequest request, + @Nullable Consumer> callback); @Override - void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - modification.write(path, data); + final CheckedFuture doExists(final YangInstanceIdentifier path) { + return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent()); } @Override - CheckedFuture doExists(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(modification.readNode(path).isPresent()); + final CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { + return Futures.immediateCheckedFuture(readOnlyView().readNode(path)); } @Override - CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(modification.readNode(path)); - } - - private RuntimeException abortedException() { - return new IllegalStateException("Tracker " + identifier + " has been aborted"); - } - - private RuntimeException submittedException() { - return new IllegalStateException("Tracker " + identifier + " has been submitted"); - } - - @Override - void doAbort() { + final void doAbort() { sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> { LOG.debug("Transaction {} abort completed with {}", identifier, response); }); } - @Override - CommitLocalTransactionRequest commitRequest(final boolean coordinated) { - final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, nextSequence(), - localActor(), modification, coordinated); - modification = new FailedDataTreeModification(this::submittedException); - return ret; - } - - @Override - void doSeal() { - modification.ready(); - sealedModification = modification; - } - - @Override - void flushState(final AbstractProxyTransaction successor) { - sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() { - @Override - public void write(final PathArgument child, final NormalizedNode data) { - successor.write(current().node(child), data); - } - - @Override - public void merge(final PathArgument child, final NormalizedNode data) { - successor.merge(current().node(child), data); - } - - @Override - public void delete(final PathArgument child) { - successor.delete(current().node(child)); - } - }); - } - - DataTreeSnapshot getSnapshot() { - Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier); - return sealedModification; - } - - private void applyModifyTransactionRequest(final ModifyTransactionRequest request, - final @Nullable Consumer> callback) { - for (TransactionModification mod : request.getModifications()) { - if (mod instanceof TransactionWrite) { - modification.write(mod.getPath(), ((TransactionWrite)mod).getData()); - } else if (mod instanceof TransactionMerge) { - modification.merge(mod.getPath(), ((TransactionMerge)mod).getData()); - } else if (mod instanceof TransactionDelete) { - modification.delete(mod.getPath()); - } else { - throw new IllegalArgumentException("Unsupported modification " + mod); - } - } - - final java.util.Optional maybeProtocol = request.getPersistenceProtocol(); - if (maybeProtocol.isPresent()) { - seal(); - Verify.verify(callback != null, "Request {} has null callback", request); - - switch (maybeProtocol.get()) { - case ABORT: - sendAbort(callback); - break; - case SIMPLE: - sendRequest(commitRequest(false), callback); - break; - case THREE_PHASE: - sendRequest(commitRequest(true), callback); - break; - default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get()); - } - } - } - @Override void handleForwardedRemoteRequest(final TransactionRequest request, final @Nullable Consumer> callback) { - LOG.debug("Applying forwarded request {}", request); - if (request instanceof ModifyTransactionRequest) { applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); } else if (request instanceof ReadTransactionRequest) { final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); - final Optional> result = modification.readNode(path); + final Optional> result = readOnlyView().readNode(path); callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result)); } else if (request instanceof ExistsTransactionRequest) { final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); - final boolean result = modification.readNode(path).isPresent(); + final boolean result = readOnlyView().readNode(path).isPresent(); callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result)); - } else if (request instanceof TransactionPreCommitRequest) { - sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); - } else if (request instanceof TransactionDoCommitRequest) { - sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); - } else if (request instanceof TransactionAbortRequest) { - sendAbort(callback); + } else if (request instanceof TransactionPurgeRequest) { + purge(); } else { throw new IllegalArgumentException("Unhandled request " + request); } @@ -246,7 +133,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { } }); - successor.seal(); + successor.ensureSealed(); final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); successor.sendRequest(successorReq, callback); @@ -263,8 +150,8 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { final Consumer> callback) { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); - } else if (request instanceof CommitLocalTransactionRequest) { - successor.sendCommit((CommitLocalTransactionRequest)request, callback); + } else if (request instanceof TransactionPurgeRequest) { + successor.purge(); } else { throw new IllegalArgumentException("Unhandled request" + request); } @@ -272,18 +159,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { LOG.debug("Forwarded request {} to successor {}", request, successor); } - private void sendAbort(final TransactionRequest request, final Consumer> callback) { + void sendAbort(final TransactionRequest request, final Consumer> callback) { sendRequest(request, callback); - modification = new FailedDataTreeModification(this::abortedException); - } - - private void sendCommit(final CommitLocalTransactionRequest request, final Consumer> callback) { - // Rebase old modification on new data tree. - try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) { - request.getModification().applyToCursor(cursor); - } - - seal(); - sendRequest(commitRequest(request.isCoordinated()), callback); } }