X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=1ba96426df75feb7e38e4db0f217c8c4f98708b8;hp=09a8a605631c1f9d4b5cbe07e95a42a13077e85a;hb=18ddbfdc55a1faddf7aeb2df6b25481d34c820ab;hpb=e1c283de301355cb8fa3f7d4fa28a6dd0af501eb diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 09a8a60563..1ba96426df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; @@ -77,8 +78,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private volatile Exception operationFailure; RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, - final boolean snapshotOnly, final boolean sendReadyOnSeal) { - super(parent); + final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) { + super(parent, isDone); this.snapshotOnly = snapshotOnly; this.sendReadyOnSeal = sendReadyOnSeal; builder = new ModifyTransactionRequestBuilder(identifier, localActor()); @@ -137,13 +138,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { isSnapshotOnly()), t -> completeRead(future, t), future); } - @Override - void doAbort() { - ensureInitializedBuilder(); - builder.setAbort(); - flushBuilder(); - } - private void ensureInitializedBuilder() { if (!builderBusy) { builder.setSequence(nextSequence()); @@ -152,9 +146,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void ensureFlushedBuider() { - if (builderBusy) { - flushBuilder(); - } + ensureFlushedBuider(Optional.absent()); } private void ensureFlushedBuider(final Optional enqueuedTicks) { @@ -163,10 +155,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } } - private void flushBuilder() { - flushBuilder(Optional.absent()); - } - private void flushBuilder(final Optional enqueuedTicks) { final ModifyTransactionRequest request = builder.build(); builderBusy = false; @@ -239,7 +227,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { failFuture(future, response); } - recordFinishedRequest(); + recordFinishedRequest(response); } private void completeRead(final SettableFuture>> future, @@ -252,34 +240,40 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { failFuture(future, response); } - recordFinishedRequest(); + recordFinishedRequest(response); } - private ModifyTransactionRequest abortRequest() { + @Override + ModifyTransactionRequest abortRequest() { ensureInitializedBuilder(); builder.setAbort(); - final ModifyTransactionRequest ret = builder.build(); builderBusy = false; - return ret; + return builder.build(); } @Override ModifyTransactionRequest commitRequest(final boolean coordinated) { ensureInitializedBuilder(); builder.setCommit(coordinated); + builderBusy = false; + return builder.build(); + } - final ModifyTransactionRequest ret = builder.build(); + private ModifyTransactionRequest readyRequest() { + ensureInitializedBuilder(); + builder.setReady(); builderBusy = false; - return ret; + return builder.build(); } @Override - void doSeal() { + boolean sealAndSend(final Optional enqueuedTicks) { if (sendReadyOnSeal) { ensureInitializedBuilder(); builder.setReady(); - flushBuilder(); + flushBuilder(enqueuedTicks); } + return super.sealAndSend(enqueuedTicks); } @Override @@ -305,7 +299,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final java.util.Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { - ensureSealed(); + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } final TransactionRequest tmp; switch (maybeProto.get()) { @@ -331,7 +329,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { }); break; case READY: - //no op + tmp = readyRequest(); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); break; default: throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); @@ -341,14 +343,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ensureFlushedBuider(); sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); callback.accept(resp); }); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(); sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); callback.accept(resp); }); } else if (request instanceof TransactionPreCommitRequest) { @@ -364,9 +366,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); } else if (request instanceof TransactionAbortRequest) { ensureFlushedBuider(); - sendAbort(callback); + sendDoAbort(callback); } else if (request instanceof TransactionPurgeRequest) { - sendPurge(); + enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request {}" + request); } @@ -429,7 +431,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final java.util.Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { - ensureSealed(); + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } final TransactionRequest tmp; switch (maybeProto.get()) { @@ -455,7 +461,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { }, enqueuedTicks); break; case READY: - //no op + tmp = readyRequest(); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); break; default: throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); @@ -465,14 +475,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ensureFlushedBuider(optTicks); enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(optTicks); enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); } else if (request instanceof TransactionPreCommitRequest) { @@ -489,9 +499,15 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { enqueuedTicks); } else if (request instanceof TransactionAbortRequest) { ensureFlushedBuider(optTicks); - enqueueAbort(callback, enqueuedTicks); + enqueueDoAbort(callback, enqueuedTicks); } else if (request instanceof TransactionPurgeRequest) { - enqueuePurge(enqueuedTicks); + enqueuePurge(callback, enqueuedTicks); + } else if (request instanceof IncrementTransactionSequenceRequest) { + final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request; + ensureFlushedBuider(optTicks); + enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(), + snapshotOnly, req.getIncrement()), callback, enqueuedTicks); + incrementSequence(req.getIncrement()); } else { throw new IllegalArgumentException("Unhandled request {}" + request); }