BUG-8402: fix sequencing with read/exists requests 78/57378/1
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 14 May 2017 18:36:09 +0000 (20:36 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 18 May 2017 15:34:17 +0000 (17:34 +0200)
When replaying successful requests, we do not issue read and exists
requests, as they have already been satisfied, but account for their
sequence numbers.

This does not work in the case where we have a remote connection,
the first request on a transaction is a read and after it is
satisfied subsequent requests are replayed to a different backend
leader.

Since the initial request is not replayed, but subsequent requests
account for it and the backend has no prior knowledge of the
transaction, it sees an initial request with sequence != 0, and
rejects all requests with an OutOfOrderRequestException.

Fix this by introducing IncrementTransactionSequenceRequest, which
the frontend enqueues as the first request instead of the initial
read/exist request -- introducing the transaction to backend.

Change-Id: Ia0f048e33d417e1fdc8d15bf319d6b8b33c2b1b1
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 7f15e81c52f2efda779c670580f0697227557404)

opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequestProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccess.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccessProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java

diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequest.java
new file mode 100644 (file)
index 0000000..59faae4
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A blank transaction request. This is used to provide backfill requests in converted retransmit scenarios, such as
+ * when a initial request to a transaction (such as a {@link ReadTransactionRequest}) is satisfied by the backend
+ * before the need to replay the transaction to a different remote backend.
+ *
+ * @author Robert Varga
+ */
+public final class IncrementTransactionSequenceRequest extends TransactionRequest<IncrementTransactionSequenceRequest> {
+    private static final long serialVersionUID = 1L;
+
+    private final long increment;
+
+    public IncrementTransactionSequenceRequest(final TransactionIdentifier identifier, final long sequence,
+            final ActorRef replyTo, final long increment) {
+        super(identifier, sequence, replyTo);
+        Preconditions.checkArgument(increment >= 0);
+        this.increment = increment;
+    }
+
+    @Override
+    protected IncrementTransactionSequenceRequestProxyV1 externalizableProxy(final ABIVersion version) {
+        return new IncrementTransactionSequenceRequestProxyV1(this);
+    }
+
+    @Override
+    protected IncrementTransactionSequenceRequest cloneAsVersion(final ABIVersion targetVersion) {
+        return this;
+    }
+
+    /**
+     * Return the sequence increment beyond this request's sequence.
+     *
+     * @return Sequence increment, guaranteed to be non-negative.
+     */
+    public long getIncrement() {
+        return increment;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequestProxyV1.java
new file mode 100644 (file)
index 0000000..f9f8854
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
+final class IncrementTransactionSequenceRequestProxyV1
+        extends AbstractTransactionRequestProxy<IncrementTransactionSequenceRequest> {
+    private long increment;
+
+    // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+    // be able to create instances via reflection.
+    @SuppressWarnings("checkstyle:RedundantModifier")
+    public IncrementTransactionSequenceRequestProxyV1() {
+        // For Externalizable
+    }
+
+    IncrementTransactionSequenceRequestProxyV1(final IncrementTransactionSequenceRequest request) {
+        super(request);
+        this.increment = request.getIncrement();
+    }
+
+    @Override
+    public void writeExternal(final ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        WritableObjects.writeLong(out, increment);
+    }
+
+    @Override
+    public void readExternal(final ObjectInput in) throws ClassNotFoundException, IOException {
+        super.readExternal(in);
+        increment = WritableObjects.readLong(in);
+    }
+
+    @Override
+    protected IncrementTransactionSequenceRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyToActor) {
+        return new IncrementTransactionSequenceRequest(target, sequence, replyToActor, increment);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccess.java
new file mode 100644 (file)
index 0000000..80f4a0d
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to an {@link IncrementTransactionSequenceRequest}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class IncrementTransactionSequenceSuccess extends TransactionSuccess<IncrementTransactionSequenceSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    public IncrementTransactionSequenceSuccess(final TransactionIdentifier target, final long sequence) {
+        super(target, sequence);
+    }
+
+    @Override
+    protected IncrementTransactionSequenceSuccessProxyV1 externalizableProxy(final ABIVersion version) {
+        return new IncrementTransactionSequenceSuccessProxyV1(this);
+    }
+
+    @Override
+    protected IncrementTransactionSequenceSuccess cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccessProxyV1.java
new file mode 100644 (file)
index 0000000..a99faab
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link IncrementTransactionSequenceSuccess}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class IncrementTransactionSequenceSuccessProxyV1
+        extends AbstractTransactionSuccessProxy<IncrementTransactionSequenceSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+    // be able to create instances via reflection.
+    @SuppressWarnings("checkstyle:RedundantModifier")
+    public IncrementTransactionSequenceSuccessProxyV1() {
+        // For Externalizable
+    }
+
+    IncrementTransactionSequenceSuccessProxyV1(final IncrementTransactionSequenceSuccess request) {
+        super(request);
+    }
+
+    @Override
+    protected IncrementTransactionSequenceSuccess createSuccess(final TransactionIdentifier target,
+            final long sequence) {
+        return new IncrementTransactionSequenceSuccess(target, sequence);
+    }
+}
index 46af74a..8e4c757 100644 (file)
@@ -30,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -70,12 +71,21 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      */
     @NotThreadSafe
     private static final class IncrementSequence {
-        private long delta = 1;
+        private final long sequence;
+        private long delta = 0;
+
+        IncrementSequence(final long sequence) {
+            this.sequence = sequence;
+        }
 
         long getDelta() {
             return delta;
         }
 
+        long getSequence() {
+            return sequence;
+        }
+
         void incrementDelta() {
             delta++;
         }
@@ -191,7 +201,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return parent.localActor();
     }
 
-    private void incrementSequence(final long delta) {
+    final void incrementSequence(final long delta) {
         sequence += delta;
         LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
     }
@@ -298,12 +308,12 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         successfulRequests.add(Verify.verifyNotNull(req));
     }
 
-    final void recordFinishedRequest() {
+    final void recordFinishedRequest(final Response<?, ?> response) {
         final Object last = successfulRequests.peekLast();
         if (last instanceof IncrementSequence) {
             ((IncrementSequence) last).incrementDelta();
         } else {
-            successfulRequests.addLast(new IncrementSequence());
+            successfulRequests.addLast(new IncrementSequence(response.getSequence()));
         }
     }
 
@@ -525,7 +535,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                     successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
                 } else {
                     Verify.verify(obj instanceof IncrementSequence);
-                    successor.incrementSequence(((IncrementSequence) obj).getDelta());
+                    final IncrementSequence increment = (IncrementSequence) obj;
+                    successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
+                        increment.getSequence(), localActor(), increment.getDelta()), resp -> { }, now);
+                    LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
                 }
             }
             LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
index 00b29e4..9f4b18e 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransact
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
@@ -127,6 +128,10 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
             enqueuePurge(enqueuedTicks);
+        } else if (request instanceof IncrementTransactionSequenceRequest) {
+            // Local transactions do not have non-replayable requests which would be visible to the backend,
+            // hence we can skip sequence increments.
+            LOG.debug("Not replaying {}", request);
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
index 09a8a60..a70e6c5 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
@@ -239,7 +240,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             failFuture(future, response);
         }
 
-        recordFinishedRequest();
+        recordFinishedRequest(response);
     }
 
     private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
@@ -252,7 +253,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             failFuture(future, response);
         }
 
-        recordFinishedRequest();
+        recordFinishedRequest(response);
     }
 
     private ModifyTransactionRequest abortRequest() {
@@ -341,14 +342,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider();
             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     callback.accept(resp);
                 });
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider();
             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     callback.accept(resp);
                 });
         } else if (request instanceof TransactionPreCommitRequest) {
@@ -465,14 +466,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider(optTicks);
             enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     cb.accept(resp);
                 }, enqueuedTicks);
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider(optTicks);
             enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     cb.accept(resp);
                 }, enqueuedTicks);
         } else if (request instanceof TransactionPreCommitRequest) {
@@ -492,6 +493,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             enqueueAbort(callback, enqueuedTicks);
         } else if (request instanceof TransactionPurgeRequest) {
             enqueuePurge(enqueuedTicks);
+        } else if (request instanceof IncrementTransactionSequenceRequest) {
+            final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
+            ensureFlushedBuider(optTicks);
+            enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
+                req.getIncrement()), callback, enqueuedTicks);
+            incrementSequence(req.getIncrement());
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }
index 8262afa..851e500 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti
 import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
@@ -141,7 +142,7 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
 
             tx = createTransaction(request, id);
             transactions.put(id, tx);
-        } else {
+        } else if (!(request instanceof IncrementTransactionSequenceRequest)) {
             final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
             if (maybeReplay.isPresent()) {
                 final TransactionSuccess<?> replay = maybeReplay.get();
index c0249fd..6c7ae07 100644 (file)
@@ -15,6 +15,8 @@ import java.util.Iterator;
 import java.util.Queue;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceSuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
@@ -122,6 +124,14 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
     @SuppressWarnings("checkstyle:IllegalCatch")
     final @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request,
             final RequestEnvelope envelope, final long now) throws RequestException {
+        if (request instanceof IncrementTransactionSequenceRequest) {
+            final IncrementTransactionSequenceRequest incr = (IncrementTransactionSequenceRequest) request;
+            expectedSequence += incr.getIncrement();
+
+            return recordSuccess(incr.getSequence(), new IncrementTransactionSequenceSuccess(incr.getTarget(),
+                incr.getSequence()));
+        }
+
         if (previousFailure != null) {
             LOG.debug("{}: Rejecting request {} due to previous failure", persistenceId(), request, previousFailure);
             throw previousFailure;

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.