From d6ed0a044d591d65847714451d97d80345154089 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 12 Dec 2016 12:43:31 +0100 Subject: [PATCH] BUG-5280: add READY protocol In order to make chained transactions work with remoting we need a way for the frontend to propagate the ready state to the backend. This patch adds a READY protocol, which acts as a preparatory stage before the 'real' protocol kicks in. With that the backend state is properly updated to reflect state transitions on the frontend and we do not need to play weird future-based delays in the frontend, which would be exceedingly complex. Change-Id: I51a0ca0c2b900e3c6522426e5897a4fca1b9da19 Signed-off-by: Robert Varga --- .../ModifyTransactionRequestBuilder.java | 15 +++++-- .../access/commands/PersistenceProtocol.java | 13 +++++- .../actors/dds/AbstractProxyTransaction.java | 14 +++++- .../actors/dds/LocalProxyTransaction.java | 2 +- .../dds/LocalReadOnlyProxyTransaction.java | 2 +- .../dds/LocalReadWriteProxyTransaction.java | 9 ++-- .../databroker/actors/dds/ProxyHistory.java | 19 +++++--- .../actors/dds/RemoteProxyTransaction.java | 21 ++++++--- .../FrontendReadWriteTransaction.java | 44 ++++++++++++------- 9 files changed, 99 insertions(+), 40 deletions(-) diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java index 4e2a8cec2a..9312e4c7ac 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java @@ -30,8 +30,10 @@ public final class ModifyTransactionRequestBuilder implements Builder modifications = new ArrayList<>(1); private final TransactionIdentifier identifier; private final ActorRef replyTo; + private PersistenceProtocol protocol; - private Long sequence; + private boolean haveSequence; + private long sequence; public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) { this.identifier = Preconditions.checkNotNull(identifier); @@ -53,7 +55,9 @@ public final class ModifyTransactionRequestBuilder implements Builder maybeProtocol = request.getPersistenceProtocol(); if (maybeProtocol.isPresent()) { - seal(); Verify.verify(callback != null, "Request {} has null callback", request); + ensureSealed(); switch (maybeProtocol.get()) { case ABORT: sendAbort(callback); break; + case READY: + // No-op, as we have already issued a seal() + break; case SIMPLE: sendRequest(commitRequest(false), callback); break; @@ -215,7 +218,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } }); - successor.seal(); + successor.ensureSealed(); final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); successor.sendRequest(successorReq, callback); @@ -251,7 +254,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { request.getModification().applyToCursor(cursor); } - seal(); + ensureSealed(); sendRequest(commitRequest(request.isCoordinated()), callback); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 846f5c37cf..d6aa3d3f3f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -62,13 +62,6 @@ abstract class ProxyHistory implements Identifiable { final LocalHistoryIdentifier identifier) { super(connection, identifier); } - - @Override - final AbstractProxyTransaction doCreateTransactionProxy( - final AbstractClientConnection connection, final TransactionIdentifier txId, - final boolean snapshotOnly) { - return new RemoteProxyTransaction(this, txId, snapshotOnly); - } } private static final class Local extends AbstractLocal { @@ -164,6 +157,12 @@ abstract class ProxyHistory implements Identifiable { super(connection, identifier); } + @Override + AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, + final TransactionIdentifier txId, final boolean snapshotOnly) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, true); + } + @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { return createClient(connection, getIdentifier()); @@ -176,6 +175,12 @@ abstract class ProxyHistory implements Identifiable { super(connection, identifier); } + @Override + AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, + final TransactionIdentifier txId, final boolean snapshotOnly) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, false); + } + @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { return createSingle(connection, getIdentifier()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 1429ec5a78..e777156b3e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -63,16 +63,19 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private static final int REQUEST_MAX_MODIFICATIONS = 1000; private final ModifyTransactionRequestBuilder builder; + private final boolean sendReadyOnSeal; private final boolean snapshotOnly; private boolean builderBusy; private volatile Exception operationFailure; + RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, - final boolean snapshotOnly) { + final boolean snapshotOnly, final boolean sendReadyOnSeal) { super(parent); this.snapshotOnly = snapshotOnly; + this.sendReadyOnSeal = sendReadyOnSeal; builder = new ModifyTransactionRequestBuilder(identifier, localActor()); } @@ -131,12 +134,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doAbort() { - ensureInitializedBuider(); + ensureInitializedBuilder(); builder.setAbort(); flushBuilder(); } - private void ensureInitializedBuider() { + private void ensureInitializedBuilder() { if (!builderBusy) { builder.setSequence(nextSequence()); builderBusy = true; @@ -186,7 +189,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private void appendModification(final TransactionModification modification) { if (operationFailure == null) { - ensureInitializedBuider(); + ensureInitializedBuilder(); builder.addModification(modification); if (builder.size() >= REQUEST_MAX_MODIFICATIONS) { @@ -255,7 +258,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override ModifyTransactionRequest commitRequest(final boolean coordinated) { - ensureInitializedBuider(); + ensureInitializedBuilder(); builder.setCommit(coordinated); final ModifyTransactionRequest ret = builder.build(); @@ -265,7 +268,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doSeal() { - // No-op + if (sendReadyOnSeal) { + ensureInitializedBuilder(); + builder.setReady(); + flushBuilder(); + } } @Override @@ -291,7 +298,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final java.util.Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { - seal(); + ensureSealed(); switch (maybeProto.get()) { case ABORT: diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index 79d555c856..0dcaac233a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -11,6 +11,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; +import java.util.Collection; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; @@ -194,7 +195,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { readyCohort = null; } }); - } private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { @@ -270,16 +270,19 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - final DataTreeModification modification = openTransaction.getSnapshot(); - for (TransactionModification m : request.getModifications()) { - if (m instanceof TransactionDelete) { - modification.delete(m.getPath()); - } else if (m instanceof TransactionWrite) { - modification.write(m.getPath(), ((TransactionWrite) m).getData()); - } else if (m instanceof TransactionMerge) { - modification.merge(m.getPath(), ((TransactionMerge) m).getData()); - } else { - LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m); + final Collection mods = request.getModifications(); + if (!mods.isEmpty()) { + final DataTreeModification modification = openTransaction.getSnapshot(); + for (TransactionModification m : mods) { + if (m instanceof TransactionDelete) { + modification.delete(m.getPath()); + } else if (m instanceof TransactionWrite) { + modification.write(m.getPath(), ((TransactionWrite) m).getData()); + } else if (m instanceof TransactionMerge) { + modification.merge(m.getPath(), ((TransactionMerge) m).getData()); + } else { + LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m); + } } } @@ -293,18 +296,29 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { openTransaction.abort(); openTransaction = null; return replyModifySuccess(request.getSequence()); + case READY: + ensureReady(); + return replyModifySuccess(request.getSequence()); case SIMPLE: - readyCohort = openTransaction.ready(); - openTransaction = null; + ensureReady(); directCommit(envelope, now); return null; case THREE_PHASE: - readyCohort = openTransaction.ready(); - openTransaction = null; + ensureReady(); coordinatedCommit(envelope, now); return null; default: throw new UnsupportedRequestException(request); } } + + private void ensureReady() { + // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction + // only once. + if (readyCohort == null) { + readyCohort = openTransaction.ready(); + LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier()); + openTransaction = null; + } + } } -- 2.36.6