BUG-8402: fix sequencing with read/exists requests 28/57028/17
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 14 May 2017 18:36:09 +0000 (20:36 +0200)
committerThanh Ha <thanh.ha@linuxfoundation.org>
Thu, 18 May 2017 15:12:33 +0000 (15:12 +0000)
When replaying successful requests, we do not issue read and exists
requests, as they have already been satisfied, but account for their
sequence numbers.

This does not work in the case where we have a remote connection,
the first request on a transaction is a read and after it is
satisfied subsequent requests are replayed to a different backend
leader.

Since the initial request is not replayed, but subsequent requests
account for it and the backend has no prior knowledge of the
transaction, it sees an initial request with sequence != 0, and
rejects all requests with an OutOfOrderRequestException.

Fix this by introducing IncrementTransactionSequenceRequest, which
the frontend enqueues as the first request instead of the initial
read/exist request -- introducing the transaction to backend.

Change-Id: Ia0f048e33d417e1fdc8d15bf319d6b8b33c2b1b1
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequestProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccess.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccessProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java

diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequest.java
new file mode 100644 (file)
index 0000000..59faae4
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A blank transaction request. This is used to provide backfill requests in converted retransmit scenarios, such as
+ * when a initial request to a transaction (such as a {@link ReadTransactionRequest}) is satisfied by the backend
+ * before the need to replay the transaction to a different remote backend.
+ *
+ * @author Robert Varga
+ */
+public final class IncrementTransactionSequenceRequest extends TransactionRequest<IncrementTransactionSequenceRequest> {
+    private static final long serialVersionUID = 1L;
+
+    private final long increment;
+
+    public IncrementTransactionSequenceRequest(final TransactionIdentifier identifier, final long sequence,
+            final ActorRef replyTo, final long increment) {
+        super(identifier, sequence, replyTo);
+        Preconditions.checkArgument(increment >= 0);
+        this.increment = increment;
+    }
+
+    @Override
+    protected IncrementTransactionSequenceRequestProxyV1 externalizableProxy(final ABIVersion version) {
+        return new IncrementTransactionSequenceRequestProxyV1(this);
+    }
+
+    @Override
+    protected IncrementTransactionSequenceRequest cloneAsVersion(final ABIVersion targetVersion) {
+        return this;
+    }
+
+    /**
+     * Return the sequence increment beyond this request's sequence.
+     *
+     * @return Sequence increment, guaranteed to be non-negative.
+     */
+    public long getIncrement() {
+        return increment;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceRequestProxyV1.java
new file mode 100644 (file)
index 0000000..f9f8854
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
+final class IncrementTransactionSequenceRequestProxyV1
+        extends AbstractTransactionRequestProxy<IncrementTransactionSequenceRequest> {
+    private long increment;
+
+    // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+    // be able to create instances via reflection.
+    @SuppressWarnings("checkstyle:RedundantModifier")
+    public IncrementTransactionSequenceRequestProxyV1() {
+        // For Externalizable
+    }
+
+    IncrementTransactionSequenceRequestProxyV1(final IncrementTransactionSequenceRequest request) {
+        super(request);
+        this.increment = request.getIncrement();
+    }
+
+    @Override
+    public void writeExternal(final ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        WritableObjects.writeLong(out, increment);
+    }
+
+    @Override
+    public void readExternal(final ObjectInput in) throws ClassNotFoundException, IOException {
+        super.readExternal(in);
+        increment = WritableObjects.readLong(in);
+    }
+
+    @Override
+    protected IncrementTransactionSequenceRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyToActor) {
+        return new IncrementTransactionSequenceRequest(target, sequence, replyToActor, increment);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccess.java
new file mode 100644 (file)
index 0000000..80f4a0d
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to an {@link IncrementTransactionSequenceRequest}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class IncrementTransactionSequenceSuccess extends TransactionSuccess<IncrementTransactionSequenceSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    public IncrementTransactionSequenceSuccess(final TransactionIdentifier target, final long sequence) {
+        super(target, sequence);
+    }
+
+    @Override
+    protected IncrementTransactionSequenceSuccessProxyV1 externalizableProxy(final ABIVersion version) {
+        return new IncrementTransactionSequenceSuccessProxyV1(this);
+    }
+
+    @Override
+    protected IncrementTransactionSequenceSuccess cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/IncrementTransactionSequenceSuccessProxyV1.java
new file mode 100644 (file)
index 0000000..a99faab
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link IncrementTransactionSequenceSuccess}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class IncrementTransactionSequenceSuccessProxyV1
+        extends AbstractTransactionSuccessProxy<IncrementTransactionSequenceSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+    // be able to create instances via reflection.
+    @SuppressWarnings("checkstyle:RedundantModifier")
+    public IncrementTransactionSequenceSuccessProxyV1() {
+        // For Externalizable
+    }
+
+    IncrementTransactionSequenceSuccessProxyV1(final IncrementTransactionSequenceSuccess request) {
+        super(request);
+    }
+
+    @Override
+    protected IncrementTransactionSequenceSuccess createSuccess(final TransactionIdentifier target,
+            final long sequence) {
+        return new IncrementTransactionSequenceSuccess(target, sequence);
+    }
+}
index 46af74a5630999e0c75f430955875754c274dcb5..8e4c757d33767877bb2065a3de45ab201d1bfaf7 100644 (file)
@@ -30,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -70,12 +71,21 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      */
     @NotThreadSafe
     private static final class IncrementSequence {
-        private long delta = 1;
+        private final long sequence;
+        private long delta = 0;
+
+        IncrementSequence(final long sequence) {
+            this.sequence = sequence;
+        }
 
         long getDelta() {
             return delta;
         }
 
+        long getSequence() {
+            return sequence;
+        }
+
         void incrementDelta() {
             delta++;
         }
@@ -191,7 +201,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return parent.localActor();
     }
 
-    private void incrementSequence(final long delta) {
+    final void incrementSequence(final long delta) {
         sequence += delta;
         LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
     }
@@ -298,12 +308,12 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         successfulRequests.add(Verify.verifyNotNull(req));
     }
 
-    final void recordFinishedRequest() {
+    final void recordFinishedRequest(final Response<?, ?> response) {
         final Object last = successfulRequests.peekLast();
         if (last instanceof IncrementSequence) {
             ((IncrementSequence) last).incrementDelta();
         } else {
-            successfulRequests.addLast(new IncrementSequence());
+            successfulRequests.addLast(new IncrementSequence(response.getSequence()));
         }
     }
 
@@ -525,7 +535,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                     successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
                 } else {
                     Verify.verify(obj instanceof IncrementSequence);
-                    successor.incrementSequence(((IncrementSequence) obj).getDelta());
+                    final IncrementSequence increment = (IncrementSequence) obj;
+                    successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
+                        increment.getSequence(), localActor(), increment.getDelta()), resp -> { }, now);
+                    LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
                 }
             }
             LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
index b22829370336e26f0da06c9c344ba1fba9c8add0..7c0ccd1c1b6a4d345447985165c4956cff8ece20 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransact
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
@@ -128,6 +129,10 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
             enqueuePurge(enqueuedTicks);
+        } else if (request instanceof IncrementTransactionSequenceRequest) {
+            // Local transactions do not have non-replayable requests which would be visible to the backend,
+            // hence we can skip sequence increments.
+            LOG.debug("Not replaying {}", request);
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
index 09a8a605631c1f9d4b5cbe07e95a42a13077e85a..a70e6c5b6cb2f238e20239626de37fed6b0c56c9 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
@@ -239,7 +240,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             failFuture(future, response);
         }
 
-        recordFinishedRequest();
+        recordFinishedRequest(response);
     }
 
     private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
@@ -252,7 +253,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             failFuture(future, response);
         }
 
-        recordFinishedRequest();
+        recordFinishedRequest(response);
     }
 
     private ModifyTransactionRequest abortRequest() {
@@ -341,14 +342,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider();
             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     callback.accept(resp);
                 });
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider();
             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     callback.accept(resp);
                 });
         } else if (request instanceof TransactionPreCommitRequest) {
@@ -465,14 +466,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider(optTicks);
             enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     cb.accept(resp);
                 }, enqueuedTicks);
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider(optTicks);
             enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
-                    recordFinishedRequest();
+                    recordFinishedRequest(resp);
                     cb.accept(resp);
                 }, enqueuedTicks);
         } else if (request instanceof TransactionPreCommitRequest) {
@@ -492,6 +493,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             enqueueAbort(callback, enqueuedTicks);
         } else if (request instanceof TransactionPurgeRequest) {
             enqueuePurge(enqueuedTicks);
+        } else if (request instanceof IncrementTransactionSequenceRequest) {
+            final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
+            ensureFlushedBuider(optTicks);
+            enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
+                req.getIncrement()), callback, enqueuedTicks);
+            incrementSequence(req.getIncrement());
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }
index f2612949e0bf602cc3d70eb33fc667cfd4c76159..ea43e22ebbf0c828ded54bdbc3e9daafa7b082ef 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti
 import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
@@ -141,7 +142,7 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
 
             tx = createTransaction(request, id);
             transactions.put(id, tx);
-        } else {
+        } else if (!(request instanceof IncrementTransactionSequenceRequest)) {
             final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
             if (maybeReplay.isPresent()) {
                 final TransactionSuccess<?> replay = maybeReplay.get();
index c0249fd00089d1bd48abe9793ddb8564fb937a94..6c7ae07a3cf890c8cba9db65eba9e3fec349a405 100644 (file)
@@ -15,6 +15,8 @@ import java.util.Iterator;
 import java.util.Queue;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceSuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
@@ -122,6 +124,14 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
     @SuppressWarnings("checkstyle:IllegalCatch")
     final @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request,
             final RequestEnvelope envelope, final long now) throws RequestException {
+        if (request instanceof IncrementTransactionSequenceRequest) {
+            final IncrementTransactionSequenceRequest incr = (IncrementTransactionSequenceRequest) request;
+            expectedSequence += incr.getIncrement();
+
+            return recordSuccess(incr.getSequence(), new IncrementTransactionSequenceSuccess(incr.getTarget(),
+                incr.getSequence()));
+        }
+
         if (previousFailure != null) {
             LOG.debug("{}: Rejecting request {} due to previous failure", persistenceId(), request, previousFailure);
             throw previousFailure;