From 3ee40198347cfb53bd0ce12ffd625cff8ed2383b Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 5 May 2017 14:35:32 +0200 Subject: [PATCH] BUG-8372: fix AbstractProxyTransaction.replayMessages() This method made assumptions on whan requests can be present in the queue -- notably that local requests are never encountered. This is not true, as local requests can be present here due to being in-flight when reconnect occurs. Change-Id: Ia5b6ec442c014329046bf384a0f5ea97666a2c4a Signed-off-by: Robert Varga --- .../AbstractLocalTransactionRequest.java | 2 +- .../actors/dds/AbstractProxyTransaction.java | 59 ++++++++++++++----- .../actors/dds/LocalProxyTransaction.java | 11 ++++ .../dds/LocalReadOnlyProxyTransaction.java | 1 - .../dds/LocalReadWriteProxyTransaction.java | 11 ++++ .../databroker/actors/dds/ProxyHistory.java | 2 + .../actors/dds/RemoteProxyTransaction.java | 55 +++++++++++++++-- .../dds/AbstractProxyTransactionTest.java | 16 +++-- 8 files changed, 130 insertions(+), 27 deletions(-) diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java index 87f59c5bea..efc0e856b2 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java @@ -20,7 +20,7 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier * * @param Message type */ -abstract class AbstractLocalTransactionRequest> +public abstract class AbstractLocalTransactionRequest> extends TransactionRequest { private static final long serialVersionUID = 1L; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index ed34b3e8e4..83ba07b69a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -491,7 +492,7 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, response -> { }); + successor.replay((TransactionRequest) obj, response -> { }); } else { Verify.verify(obj instanceof IncrementSequence); successor.incrementSequence(((IncrementSequence) obj).getDelta()); @@ -509,7 +510,7 @@ abstract class AbstractProxyTransaction implements Identifiable) req, e.getCallback()); + successor.replay((TransactionRequest) req, e.getCallback()); it.remove(); } } @@ -527,6 +528,24 @@ abstract class AbstractProxyTransaction implements Identifiable + * Note: this method is invoked by the predecessor on the successor. + * + * @param request Request which needs to be forwarded + * @param callback Callback to be invoked once the request completes + */ + private void replay(TransactionRequest request, Consumer> callback) { + if (request instanceof AbstractLocalTransactionRequest) { + handleForwardedLocalRequest((AbstractLocalTransactionRequest) request, callback); + } else { + handleForwardedRemoteRequest(request, callback); + } + } + // Called with the connection locked final void finishReconnect() { final SuccessorState local = getSuccessorState(); @@ -577,9 +596,19 @@ abstract class AbstractProxyTransaction implements Identifiable commitRequest(boolean coordinated); /** - * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is - * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all - * operations are packaged in the message. + * Replay a request originating in this proxy to a successor remote proxy. + */ + abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest request, + Consumer> callback); + + /** + * Replay a request originating in this proxy to a successor local proxy. + */ + abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest request, + Consumer> callback); + + /** + * Invoked from {@link LocalProxyTransaction} when it replays its successful requests to its successor. * *

* Note: this method is invoked by the predecessor on the successor. @@ -587,20 +616,20 @@ abstract class AbstractProxyTransaction implements Identifiable request, + abstract void handleForwardedLocalRequest(AbstractLocalTransactionRequest request, @Nullable Consumer> callback); /** - * Replay a request originating in this proxy to a successor remote proxy. - */ - abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest request, - Consumer> callback); - - /** - * Replay a request originating in this proxy to a successor local proxy. + * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. + * + *

+ * Note: this method is invoked by the predecessor on the successor. + * + * @param request Request which needs to be forwarded + * @param callback Callback to be invoked once the request completes */ - abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest request, - Consumer> callback); + abstract void handleForwardedRemoteRequest(TransactionRequest request, + @Nullable Consumer> callback); @Override public final String toString() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 3aed0dcdaa..7facc5160a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -15,6 +15,7 @@ import java.util.function.Consumer; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; @@ -88,6 +89,16 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { }); } + @Override + void handleForwardedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback) { + if (request instanceof AbortLocalTransactionRequest) { + sendAbort(request, callback); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } + } + @Override void handleForwardedRemoteRequest(final TransactionRequest request, final @Nullable Consumer> callback) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java index 65e6bf6b49..0f2d007716 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java @@ -85,5 +85,4 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction { Verify.verify(protocol == PersistenceProtocol.ABORT); abort(); } - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java index 9eba0bf4e9..720ada3191 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -14,6 +14,7 @@ import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; @@ -238,6 +239,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } } + @Override + void handleForwardedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback) { + if (request instanceof CommitLocalTransactionRequest) { + sendCommit((CommitLocalTransactionRequest) request, callback); + } else { + super.handleForwardedLocalRequest(request, callback); + } + } + @Override void handleForwardedRemoteRequest(final TransactionRequest request, final @Nullable Consumer> callback) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 827c19e526..2a21b8e858 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -259,6 +259,8 @@ abstract class ProxyHistory implements Identifiable { @Override void forwardRequest(final Request request, final Consumer> callback, final BiConsumer, Consumer>> forwardTo) throws RequestException { + // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the + // period required to get into the queue. if (request instanceof TransactionRequest) { forwardTransactionRequest((TransactionRequest) request, callback); } else if (request instanceof LocalHistoryRequest) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index d14b7bda66..192205dc0a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -15,7 +15,10 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.function.Consumer; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; @@ -36,10 +39,13 @@ import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,6 +169,41 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { sendRequest(request, response -> completeModify(request, response)); } + @Override + void handleForwardedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback) { + if (request instanceof CommitLocalTransactionRequest) { + replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback); + } else if (request instanceof AbortLocalTransactionRequest) { + sendRequest(abortRequest(), callback); + } else { + throw new IllegalStateException("Unhandled request " + request); + } + } + + private void replayLocalCommitRequest(final CommitLocalTransactionRequest request, + final Consumer> callback) { + final DataTreeModification mod = request.getModification(); + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + doWrite(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + doMerge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + doDelete(current().node(child)); + } + }); + + sendRequest(commitRequest(request.isCoordinated()), callback); + } + @Override void handleForwardedRemoteRequest(final TransactionRequest request, final @Nullable Consumer> callback) { @@ -256,6 +297,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { recordFinishedRequest(); } + private ModifyTransactionRequest abortRequest() { + ensureInitializedBuilder(); + builder.setAbort(); + final ModifyTransactionRequest ret = builder.build(); + builderBusy = false; + return ret; + } + @Override ModifyTransactionRequest commitRequest(final boolean coordinated) { ensureInitializedBuilder(); @@ -302,11 +351,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { switch (maybeProto.get()) { case ABORT: - ensureInitializedBuilder(); - builder.setAbort(); - final ModifyTransactionRequest newReq = builder.build(); - builderBusy = false; - sendRequest(newReq, callback); + sendRequest(abortRequest(), callback); break; case SIMPLE: sendRequest(commitRequest(false), callback); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java index 6b37bc0743..ac24304e34 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java @@ -39,6 +39,7 @@ import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; @@ -181,7 +182,12 @@ public abstract class AbstractProxyTransactionTest modifications = modifyRequest.getModifications(); Assert.assertEquals(3, modifications.size()); - Assert.assertThat(modifications, hasItem(both(isA(TransactionWrite.class)).and((hasPath(PATH_1))))); - Assert.assertThat(modifications, hasItem(both(isA(TransactionMerge.class)).and((hasPath(PATH_2))))); - Assert.assertThat(modifications, hasItem(both(isA(TransactionDelete.class)).and((hasPath(PATH_3))))); + Assert.assertThat(modifications, hasItem(both(isA(TransactionWrite.class)).and(hasPath(PATH_1)))); + Assert.assertThat(modifications, hasItem(both(isA(TransactionMerge.class)).and(hasPath(PATH_2)))); + Assert.assertThat(modifications, hasItem(both(isA(TransactionDelete.class)).and(hasPath(PATH_3)))); } protected void testRequestResponse(final Consumer consumer, @@ -233,7 +239,7 @@ public abstract class AbstractProxyTransactionTest Consumer createCallbackMock() { - return (Consumer) mock(Consumer.class); + return mock(Consumer.class); } protected static BaseMatcher hasPath(final YangInstanceIdentifier path) { -- 2.36.6