X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalProxyTransaction.java;h=2a81e1d0aa37714cf208beea5d9012cf893ff96c;hb=db3d7caeeb310f76a9a159f9a8d7e9beff89f645;hp=feba5fde5bb688c6b7654b2062a61da0560fc75a;hpb=31316f39aecc6bad171de539292ff5d7f4743419;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index feba5fde5b..2a81e1d0aa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -7,10 +7,9 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.FluentFuture; +import java.util.Optional; import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -29,7 +28,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; -import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -69,7 +68,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { return identifier; } - abstract @Nonnull DataTreeSnapshot readOnlyView(); + @Nonnull + abstract DataTreeSnapshot readOnlyView(); abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request, @Nullable Consumer> callback); @@ -78,13 +78,13 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { @Nullable Consumer> callback, long enqueuedTicks); @Override - final CheckedFuture doExists(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent()); + final FluentFuture doExists(final YangInstanceIdentifier path) { + return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path).isPresent()); } @Override - final CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(readOnlyView().readNode(path)); + final FluentFuture>> doRead(final YangInstanceIdentifier path) { + return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path)); } @Override @@ -103,7 +103,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } private boolean handleReadRequest(final TransactionRequest request, - final @Nullable Consumer> callback) { + @Nullable final Consumer> callback) { // Note we delay completion of read requests to limit the scope at which the client can run, as they have // listeners, which we do not want to execute while we are reconnecting. if (request instanceof ReadTransactionRequest) { @@ -133,7 +133,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { @Override void handleReplayedRemoteRequest(final TransactionRequest request, - final @Nullable Consumer> callback, final long enqueuedTicks) { + @Nullable final Consumer> callback, final long enqueuedTicks) { if (request instanceof ModifyTransactionRequest) { replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks); } else if (handleReadRequest(request, callback)) { @@ -194,8 +194,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } }); - successor.ensureSealed(); - + successor.sealOnly(); final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); successor.sendRequest(successorReq, callback); } else if (request instanceof AbortLocalTransactionRequest) { @@ -204,8 +203,10 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } else if (request instanceof TransactionPurgeRequest) { LOG.debug("Forwarding purge {} to successor {}", request, successor); successor.enqueuePurge(callback); + } else if (request instanceof ModifyTransactionRequest) { + successor.handleForwardedRequest(request, callback); } else { - throw new IllegalArgumentException("Unhandled request" + request); + throwUnhandledRequest(request); } } @@ -217,12 +218,16 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } else if (request instanceof TransactionPurgeRequest) { successor.enqueuePurge(callback); } else { - throw new IllegalArgumentException("Unhandled request" + request); + throwUnhandledRequest(request); } LOG.debug("Forwarded request {} to successor {}", request, successor); } + private static void throwUnhandledRequest(final TransactionRequest request) { + throw new IllegalArgumentException("Unhandled request" + request); + } + void sendAbort(final TransactionRequest request, final Consumer> callback) { sendRequest(request, callback); }