X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=192205dc0a9c0f1df53b8177484f8bf3c0a2282f;hp=d14b7bda666790f7268875c4dead378d77e70b31;hb=3ee40198347cfb53bd0ce12ffd625cff8ed2383b;hpb=4036805b31f73c7e7e2b06e84c8da975b2e45263 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index d14b7bda66..192205dc0a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -15,7 +15,10 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.function.Consumer; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; @@ -36,10 +39,13 @@ import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,6 +169,41 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { sendRequest(request, response -> completeModify(request, response)); } + @Override + void handleForwardedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback) { + if (request instanceof CommitLocalTransactionRequest) { + replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback); + } else if (request instanceof AbortLocalTransactionRequest) { + sendRequest(abortRequest(), callback); + } else { + throw new IllegalStateException("Unhandled request " + request); + } + } + + private void replayLocalCommitRequest(final CommitLocalTransactionRequest request, + final Consumer> callback) { + final DataTreeModification mod = request.getModification(); + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + doWrite(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + doMerge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + doDelete(current().node(child)); + } + }); + + sendRequest(commitRequest(request.isCoordinated()), callback); + } + @Override void handleForwardedRemoteRequest(final TransactionRequest request, final @Nullable Consumer> callback) { @@ -256,6 +297,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { recordFinishedRequest(); } + private ModifyTransactionRequest abortRequest() { + ensureInitializedBuilder(); + builder.setAbort(); + final ModifyTransactionRequest ret = builder.build(); + builderBusy = false; + return ret; + } + @Override ModifyTransactionRequest commitRequest(final boolean coordinated) { ensureInitializedBuilder(); @@ -302,11 +351,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { switch (maybeProto.get()) { case ABORT: - ensureInitializedBuilder(); - builder.setAbort(); - final ModifyTransactionRequest newReq = builder.build(); - builderBusy = false; - sendRequest(newReq, callback); + sendRequest(abortRequest(), callback); break; case SIMPLE: sendRequest(commitRequest(false), callback);