X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalProxyTransaction.java;h=00b29e4c35e73ade56c3453f963375e4c20edd23;hp=cad6b5f16d53bc46dd886fbc791e86a600c72e60;hb=5e7cf2452ef634dc934a3ea5a2dd95059fbab68c;hpb=d6ed0a044d591d65847714451d97d80345154089 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index cad6b5f16d..00b29e4c35 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -15,12 +15,14 @@ import java.util.function.Consumer; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -70,6 +72,9 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { abstract void applyModifyTransactionRequest(ModifyTransactionRequest request, @Nullable Consumer> callback); + abstract void replayModifyTransactionRequest(ModifyTransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); + @Override final CheckedFuture doExists(final YangInstanceIdentifier path) { return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent()); @@ -82,32 +87,74 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { @Override final void doAbort() { - sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> { - LOG.debug("Transaction {} abort completed with {}", identifier, response); - }); + sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), + response -> LOG.debug("Transaction {} abort completed with {}", identifier, response)); } @Override - void handleForwardedRemoteRequest(final TransactionRequest request, + void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + if (request instanceof AbortLocalTransactionRequest) { + enqueueAbort(request, callback, enqueuedTicks); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } + } + + private boolean handleReadRequest(final TransactionRequest request, final @Nullable Consumer> callback) { - if (request instanceof ModifyTransactionRequest) { - applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); - } else if (request instanceof ReadTransactionRequest) { + if (request instanceof ReadTransactionRequest) { final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); final Optional> result = readOnlyView().readNode(path); callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result)); + return true; } else if (request instanceof ExistsTransactionRequest) { final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); final boolean result = readOnlyView().readNode(path).isPresent(); callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result)); + return true; + } else { + return false; + } + } + + @Override + void handleReplayedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback, final long enqueuedTicks) { + if (request instanceof ModifyTransactionRequest) { + replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks); + } else if (handleReadRequest(request, callback)) { + // No-op + } else if (request instanceof TransactionPurgeRequest) { + enqueuePurge(enqueuedTicks); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + + /** + * Remote-to-local equivalent of {@link #handleReplayedRemoteRequest(TransactionRequest, Consumer, long)}, + * except it is invoked in the forwarding path from + * {@link RemoteProxyTransaction#forwardToLocal(LocalProxyTransaction, TransactionRequest, Consumer)}. + * + * @param request Forwarded request + * @param callback Callback to be invoked once the request completes + */ + void handleForwardedRemoteRequest(final TransactionRequest request, final Consumer> callback) { + if (request instanceof ModifyTransactionRequest) { + applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); + } else if (handleReadRequest(request, callback)) { + // No-op + } else if (request instanceof TransactionPurgeRequest) { + sendPurge(); } else { throw new IllegalArgumentException("Unhandled request " + request); } } @Override - void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, - final Consumer> callback) { + final void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { if (request instanceof CommitLocalTransactionRequest) { final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request; final DataTreeModification mod = req.getModification(); @@ -137,6 +184,9 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } else if (request instanceof AbortLocalTransactionRequest) { LOG.debug("Forwarding abort {} to successor {}", request, successor); successor.abort(); + } else if (request instanceof TransactionPurgeRequest) { + LOG.debug("Forwarding purge {} to successor {}", request, successor); + successor.sendPurge(); } else { throw new IllegalArgumentException("Unhandled request" + request); } @@ -147,6 +197,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { final Consumer> callback) { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); + } else if (request instanceof TransactionPurgeRequest) { + successor.sendPurge(); } else { throw new IllegalArgumentException("Unhandled request" + request); } @@ -157,4 +209,9 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { void sendAbort(final TransactionRequest request, final Consumer> callback) { sendRequest(request, callback); } + + void enqueueAbort(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + enqueueRequest(request, callback, enqueuedTicks); + } }