X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=0095ec58a37512c66aefd250652980a9b4dce0fb;hp=09a8a605631c1f9d4b5cbe07e95a42a13077e85a;hb=b4e199c509cab1c9b838c6471a96ce3cb21fa913;hpb=e1c283de301355cb8fa3f7d4fa28a6dd0af501eb diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 09a8a60563..0095ec58a3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; @@ -137,13 +138,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { isSnapshotOnly()), t -> completeRead(future, t), future); } - @Override - void doAbort() { - ensureInitializedBuilder(); - builder.setAbort(); - flushBuilder(); - } - private void ensureInitializedBuilder() { if (!builderBusy) { builder.setSequence(nextSequence()); @@ -239,7 +233,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { failFuture(future, response); } - recordFinishedRequest(); + recordFinishedRequest(response); } private void completeRead(final SettableFuture>> future, @@ -252,10 +246,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { failFuture(future, response); } - recordFinishedRequest(); + recordFinishedRequest(response); } - private ModifyTransactionRequest abortRequest() { + @Override + ModifyTransactionRequest abortRequest() { ensureInitializedBuilder(); builder.setAbort(); final ModifyTransactionRequest ret = builder.build(); @@ -341,14 +336,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ensureFlushedBuider(); sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); callback.accept(resp); }); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(); sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); callback.accept(resp); }); } else if (request instanceof TransactionPreCommitRequest) { @@ -364,9 +359,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); } else if (request instanceof TransactionAbortRequest) { ensureFlushedBuider(); - sendAbort(callback); + sendDoAbort(callback); } else if (request instanceof TransactionPurgeRequest) { - sendPurge(); + sendPurge(callback); } else { throw new IllegalArgumentException("Unhandled request {}" + request); } @@ -465,14 +460,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ensureFlushedBuider(optTicks); enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(optTicks); enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); } else if (request instanceof TransactionPreCommitRequest) { @@ -489,9 +484,15 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { enqueuedTicks); } else if (request instanceof TransactionAbortRequest) { ensureFlushedBuider(optTicks); - enqueueAbort(callback, enqueuedTicks); + enqueueDoAbort(callback, enqueuedTicks); } else if (request instanceof TransactionPurgeRequest) { - enqueuePurge(enqueuedTicks); + enqueuePurge(callback, enqueuedTicks); + } else if (request instanceof IncrementTransactionSequenceRequest) { + final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request; + ensureFlushedBuider(optTicks); + enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(), + snapshotOnly, req.getIncrement()), callback, enqueuedTicks); + incrementSequence(req.getIncrement()); } else { throw new IllegalArgumentException("Unhandled request {}" + request); }