X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=1ba96426df75feb7e38e4db0f217c8c4f98708b8;hp=6e54695532a19f5eff87f66fc14662d683120c5b;hb=1e07329c0d800b8fea43ae0c4060aded5fd18739;hpb=20ece8c549211d1c453f1763132bb0a0ca7be0e0 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 6e54695532..1ba96426df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -78,8 +78,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private volatile Exception operationFailure; RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, - final boolean snapshotOnly, final boolean sendReadyOnSeal) { - super(parent); + final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) { + super(parent, isDone); this.snapshotOnly = snapshotOnly; this.sendReadyOnSeal = sendReadyOnSeal; builder = new ModifyTransactionRequestBuilder(identifier, localActor()); @@ -146,9 +146,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void ensureFlushedBuider() { - if (builderBusy) { - flushBuilder(); - } + ensureFlushedBuider(Optional.absent()); } private void ensureFlushedBuider(final Optional enqueuedTicks) { @@ -157,10 +155,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } } - private void flushBuilder() { - flushBuilder(Optional.absent()); - } - private void flushBuilder(final Optional enqueuedTicks) { final ModifyTransactionRequest request = builder.build(); builderBusy = false; @@ -253,28 +247,33 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ModifyTransactionRequest abortRequest() { ensureInitializedBuilder(); builder.setAbort(); - final ModifyTransactionRequest ret = builder.build(); builderBusy = false; - return ret; + return builder.build(); } @Override ModifyTransactionRequest commitRequest(final boolean coordinated) { ensureInitializedBuilder(); builder.setCommit(coordinated); + builderBusy = false; + return builder.build(); + } - final ModifyTransactionRequest ret = builder.build(); + private ModifyTransactionRequest readyRequest() { + ensureInitializedBuilder(); + builder.setReady(); builderBusy = false; - return ret; + return builder.build(); } @Override - void doSeal() { + boolean sealAndSend(final Optional enqueuedTicks) { if (sendReadyOnSeal) { ensureInitializedBuilder(); builder.setReady(); - flushBuilder(); + flushBuilder(enqueuedTicks); } + return super.sealAndSend(enqueuedTicks); } @Override @@ -300,7 +299,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final java.util.Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { - ensureSealed(); + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } final TransactionRequest tmp; switch (maybeProto.get()) { @@ -326,7 +329,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { }); break; case READY: - //no op + tmp = readyRequest(); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); break; default: throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); @@ -424,7 +431,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final java.util.Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { - ensureSealed(); + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } final TransactionRequest tmp; switch (maybeProto.get()) { @@ -450,7 +461,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { }, enqueuedTicks); break; case READY: - //no op + tmp = readyRequest(); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); break; default: throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());