From 7f15e81c52f2efda779c670580f0697227557404 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 14 May 2017 20:36:09 +0200 Subject: [PATCH] BUG-8402: fix sequencing with read/exists requests When replaying successful requests, we do not issue read and exists requests, as they have already been satisfied, but account for their sequence numbers. This does not work in the case where we have a remote connection, the first request on a transaction is a read and after it is satisfied subsequent requests are replayed to a different backend leader. Since the initial request is not replayed, but subsequent requests account for it and the backend has no prior knowledge of the transaction, it sees an initial request with sequence != 0, and rejects all requests with an OutOfOrderRequestException. Fix this by introducing IncrementTransactionSequenceRequest, which the frontend enqueues as the first request instead of the initial read/exist request -- introducing the transaction to backend. Change-Id: Ia0f048e33d417e1fdc8d15bf319d6b8b33c2b1b1 Signed-off-by: Robert Varga --- .../IncrementTransactionSequenceRequest.java | 52 +++++++++++++++++++ ...mentTransactionSequenceRequestProxyV1.java | 50 ++++++++++++++++++ .../IncrementTransactionSequenceSuccess.java | 36 +++++++++++++ ...mentTransactionSequenceSuccessProxyV1.java | 38 ++++++++++++++ .../actors/dds/AbstractProxyTransaction.java | 23 ++++++-- .../actors/dds/LocalProxyTransaction.java | 5 ++ .../actors/dds/RemoteProxyTransaction.java | 19 ++++--- .../datastore/AbstractFrontendHistory.java | 3 +- .../datastore/FrontendTransaction.java | 10 ++++ 9 files changed, 224 insertions(+), 12 deletions(-) create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequest.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequestProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccess.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccessProxyV1.java diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequest.java new file mode 100644 index 0000000000..59faae42cc --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequest.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * A blank transaction request. This is used to provide backfill requests in converted retransmit scenarios, such as + * when a initial request to a transaction (such as a {@link ReadTransactionRequest}) is satisfied by the backend + * before the need to replay the transaction to a different remote backend. + * + * @author Robert Varga + */ +public final class IncrementTransactionSequenceRequest extends TransactionRequest { + private static final long serialVersionUID = 1L; + + private final long increment; + + public IncrementTransactionSequenceRequest(final TransactionIdentifier identifier, final long sequence, + final ActorRef replyTo, final long increment) { + super(identifier, sequence, replyTo); + Preconditions.checkArgument(increment >= 0); + this.increment = increment; + } + + @Override + protected IncrementTransactionSequenceRequestProxyV1 externalizableProxy(final ABIVersion version) { + return new IncrementTransactionSequenceRequestProxyV1(this); + } + + @Override + protected IncrementTransactionSequenceRequest cloneAsVersion(final ABIVersion targetVersion) { + return this; + } + + /** + * Return the sequence increment beyond this request's sequence. + * + * @return Sequence increment, guaranteed to be non-negative. + */ + public long getIncrement() { + return increment; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequestProxyV1.java new file mode 100644 index 0000000000..f9f8854d3b --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequestProxyV1.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.concepts.WritableObjects; + +final class IncrementTransactionSequenceRequestProxyV1 + extends AbstractTransactionRequestProxy { + private long increment; + + // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to + // be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public IncrementTransactionSequenceRequestProxyV1() { + // For Externalizable + } + + IncrementTransactionSequenceRequestProxyV1(final IncrementTransactionSequenceRequest request) { + super(request); + this.increment = request.getIncrement(); + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + super.writeExternal(out); + WritableObjects.writeLong(out, increment); + } + + @Override + public void readExternal(final ObjectInput in) throws ClassNotFoundException, IOException { + super.readExternal(in); + increment = WritableObjects.readLong(in); + } + + @Override + protected IncrementTransactionSequenceRequest createRequest(final TransactionIdentifier target, final long sequence, + final ActorRef replyToActor) { + return new IncrementTransactionSequenceRequest(target, sequence, replyToActor, increment); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccess.java new file mode 100644 index 0000000000..80f4a0d5aa --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccess.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Successful reply to an {@link IncrementTransactionSequenceRequest}. + * + * @author Robert Varga + */ +@Beta +public final class IncrementTransactionSequenceSuccess extends TransactionSuccess { + private static final long serialVersionUID = 1L; + + public IncrementTransactionSequenceSuccess(final TransactionIdentifier target, final long sequence) { + super(target, sequence); + } + + @Override + protected IncrementTransactionSequenceSuccessProxyV1 externalizableProxy(final ABIVersion version) { + return new IncrementTransactionSequenceSuccessProxyV1(this); + } + + @Override + protected IncrementTransactionSequenceSuccess cloneAsVersion(final ABIVersion version) { + return this; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccessProxyV1.java new file mode 100644 index 0000000000..a99faabcff --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccessProxyV1.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Externalizable proxy for use with {@link IncrementTransactionSequenceSuccess}. It implements the initial (Boron) + * serialization format. + * + * @author Robert Varga + */ +final class IncrementTransactionSequenceSuccessProxyV1 + extends AbstractTransactionSuccessProxy { + private static final long serialVersionUID = 1L; + + // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to + // be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public IncrementTransactionSequenceSuccessProxyV1() { + // For Externalizable + } + + IncrementTransactionSequenceSuccessProxyV1(final IncrementTransactionSequenceSuccess request) { + super(request); + } + + @Override + protected IncrementTransactionSequenceSuccess createSuccess(final TransactionIdentifier target, + final long sequence) { + return new IncrementTransactionSequenceSuccess(target, sequence); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 46af74a563..8e4c757d33 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -30,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -70,12 +71,21 @@ abstract class AbstractProxyTransaction implements Identifiable response) { final Object last = successfulRequests.peekLast(); if (last instanceof IncrementSequence) { ((IncrementSequence) last).incrementDelta(); } else { - successfulRequests.addLast(new IncrementSequence()); + successfulRequests.addLast(new IncrementSequence(response.getSequence())); } } @@ -525,7 +535,10 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, resp -> { }, now); } else { Verify.verify(obj instanceof IncrementSequence); - successor.incrementSequence(((IncrementSequence) obj).getDelta()); + final IncrementSequence increment = (IncrementSequence) obj; + successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), + increment.getSequence(), localActor(), increment.getDelta()), resp -> { }, now); + LOG.debug("Incrementing sequence {} to successor {}", obj, successor); } } LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index b228293703..7c0ccd1c1b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransact import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; @@ -128,6 +129,10 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { // No-op } else if (request instanceof TransactionPurgeRequest) { enqueuePurge(enqueuedTicks); + } else if (request instanceof IncrementTransactionSequenceRequest) { + // Local transactions do not have non-replayable requests which would be visible to the backend, + // hence we can skip sequence increments. + LOG.debug("Not replaying {}", request); } else { throw new IllegalArgumentException("Unhandled request " + request); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 09a8a60563..a70e6c5b6c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; @@ -239,7 +240,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { failFuture(future, response); } - recordFinishedRequest(); + recordFinishedRequest(response); } private void completeRead(final SettableFuture>> future, @@ -252,7 +253,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { failFuture(future, response); } - recordFinishedRequest(); + recordFinishedRequest(response); } private ModifyTransactionRequest abortRequest() { @@ -341,14 +342,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ensureFlushedBuider(); sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); callback.accept(resp); }); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(); sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); callback.accept(resp); }); } else if (request instanceof TransactionPreCommitRequest) { @@ -465,14 +466,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ensureFlushedBuider(optTicks); enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(optTicks); enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { - recordFinishedRequest(); + recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); } else if (request instanceof TransactionPreCommitRequest) { @@ -492,6 +493,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { enqueueAbort(callback, enqueuedTicks); } else if (request instanceof TransactionPurgeRequest) { enqueuePurge(enqueuedTicks); + } else if (request instanceof IncrementTransactionSequenceRequest) { + final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request; + ensureFlushedBuider(optTicks); + enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(), + req.getIncrement()), callback, enqueuedTicks); + incrementSequence(req.getIncrement()); } else { throw new IllegalArgumentException("Unhandled request {}" + request); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index f2612949e0..ea43e22ebb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -21,6 +21,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.DeadTransactionException; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; @@ -141,7 +142,7 @@ abstract class AbstractFrontendHistory implements Identifiable> maybeReplay = tx.replaySequence(request.getSequence()); if (maybeReplay.isPresent()) { final TransactionSuccess replay = maybeReplay.get(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java index c0249fd000..6c7ae07a3c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java @@ -15,6 +15,8 @@ import java.util.Iterator; import java.util.Queue; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceSuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; @@ -122,6 +124,14 @@ abstract class FrontendTransaction implements Identifiable handleRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { + if (request instanceof IncrementTransactionSequenceRequest) { + final IncrementTransactionSequenceRequest incr = (IncrementTransactionSequenceRequest) request; + expectedSequence += incr.getIncrement(); + + return recordSuccess(incr.getSequence(), new IncrementTransactionSequenceSuccess(incr.getTarget(), + incr.getSequence())); + } + if (previousFailure != null) { LOG.debug("{}: Rejecting request {} due to previous failure", persistenceId(), request, previousFailure); throw previousFailure; -- 2.36.6