BUG-5280: add a transction purge step 30/51930/5
authorRobert Varga <rovarga@cisco.com>
Wed, 15 Feb 2017 22:34:48 +0000 (23:34 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 17 Feb 2017 12:24:41 +0000 (12:24 +0000)
Backend needs to be informed about the client completing the transaction,
so it can remove any state trakcing the result of the transaction.

Since the frontend has no remaining state to transfer, there is no legal
way for the frontend to ever reference it (aside the purge request itself),
which always succeeds.

Change-Id: Ia957f0879114eede394b76184620a38cd5967955
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeRequestProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeResponse.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeResponseProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java

diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeRequest.java
new file mode 100644 (file)
index 0000000..a0fab70
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A transaction request to perform the final transaction transition, which is purging it from the protocol view,
+ * meaning the frontend has no further knowledge of the transaction. The backend is free to purge any state related
+ * to the transaction and responds with a {@link TransactionPurgeResponse}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class TransactionPurgeRequest extends TransactionRequest<TransactionPurgeRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionPurgeRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
+    }
+
+    @Override
+    protected TransactionPurgeRequestProxyV1 externalizableProxy(final ABIVersion version) {
+        return new TransactionPurgeRequestProxyV1(this);
+    }
+
+    @Override
+    protected TransactionPurgeRequest cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeRequestProxyV1.java
new file mode 100644 (file)
index 0000000..ee56b4c
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionPurgeRequest}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionPurgeRequestProxyV1 extends AbstractTransactionRequestProxy<TransactionPurgeRequest> {
+    private static final long serialVersionUID = 1L;
+
+    // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+    // be able to create instances via reflection.
+    @SuppressWarnings("checkstyle:RedundantModifier")
+    public TransactionPurgeRequestProxyV1() {
+        // For Externalizable
+    }
+
+    TransactionPurgeRequestProxyV1(final TransactionPurgeRequest request) {
+        super(request);
+    }
+
+    @Override
+    protected TransactionPurgeRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new TransactionPurgeRequest(target, sequence, replyTo);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeResponse.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeResponse.java
new file mode 100644 (file)
index 0000000..5471014
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to a {@link TransactionPurgeRequest}.
+ *
+ * @author Robert Varga
+ */
+public final class TransactionPurgeResponse extends TransactionSuccess<TransactionPurgeResponse> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionPurgeResponse(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
+    }
+
+    @Override
+    protected AbstractTransactionSuccessProxy<TransactionPurgeResponse> externalizableProxy(
+            final ABIVersion version) {
+        return new TransactionPurgeResponseProxyV1(this);
+    }
+
+    @Override
+    protected TransactionPurgeResponse cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeResponseProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPurgeResponseProxyV1.java
new file mode 100644 (file)
index 0000000..d15d729
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionPurgeResponse}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionPurgeResponseProxyV1 extends AbstractTransactionSuccessProxy<TransactionPurgeResponse> {
+    private static final long serialVersionUID = 1L;
+
+    // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+    // be able to create instances via reflection.
+    @SuppressWarnings("checkstyle:RedundantModifier")
+    public TransactionPurgeResponseProxyV1() {
+        // For Externalizable
+    }
+
+    TransactionPurgeResponseProxyV1(final TransactionPurgeResponse success) {
+        super(success);
+    }
+
+    @Override
+    protected TransactionPurgeResponse createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new TransactionPurgeResponse(target, sequence);
+    }
+}
index e6313d3..cb5a6cf 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -34,6 +35,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionCommitSucc
 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
@@ -321,7 +323,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
             // This is a terminal request, hence we do not need to record it
             LOG.debug("Transaction {} abort completed", this);
-            parent.completeTransaction(this);
+            purge();
         });
     }
 
@@ -354,7 +356,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
                     // This is a terminal request, hence we do not need to record it
                     LOG.debug("Transaction {} directCommit completed", this);
-                    parent.completeTransaction(this);
+                    purge();
                 });
 
                 return ret;
@@ -414,11 +416,26 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
 
-            recordSuccessfulRequest(req);
-            LOG.debug("Transaction {} preCommit completed", this);
+            onPreCommitComplete(req);
         });
     }
 
+    private void onPreCommitComplete(final TransactionRequest<?> req) {
+        /*
+         * The backend has agreed that the transaction has entered PRE_COMMIT phase, meaning it will be committed
+         * to storage after the timeout completes.
+         *
+         * All state has been replicated to the backend, hence we do not need to keep it around. Retain only
+         * the precommit request, so we know which request to use for resync.
+         */
+        LOG.debug("Transaction {} preCommit completed, clearing successfulRequests", this);
+        successfulRequests.clear();
+
+        // TODO: this works, but can contain some useless state (like batched operations). Create an empty
+        //       equivalent of this request and store that.
+        recordSuccessfulRequest(req);
+    }
+
     final void doCommit(final VotingFuture<?> ret) {
         checkReadWrite();
         checkSealed();
@@ -433,6 +450,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
+            purge();
+        });
+    }
+
+    private void purge() {
+        successfulRequests.clear();
+
+        final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+        sendRequest(req, t -> {
+            LOG.debug("Transaction {} purge completed", this);
             parent.completeTransaction(this);
         });
     }
@@ -574,4 +601,9 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      */
     abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
             Consumer<Response<?, ?>> callback);
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();
+    }
 }
index d7e3c65..dbea3a1 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
 import java.util.HashMap;
@@ -16,6 +17,8 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
@@ -56,26 +59,34 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
 
     final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
             final RequestEnvelope envelope, final long now) throws RequestException {
-
-        // FIXME: handle purging of transactions
-
         final TransactionIdentifier id = request.getTarget();
-        FrontendTransaction tx = transactions.get(id);
-        if (tx == null) {
-            // The transaction does not exist and we are about to create it, check sequence number
-            if (request.getSequence() != 0) {
-                LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
-                throw UNSEQUENCED_START;
-            }
 
-            tx = createTransaction(request, id);
-            transactions.put(id, tx);
+        FrontendTransaction tx;
+        if (request instanceof TransactionPurgeRequest) {
+            tx = transactions.remove(id);
+            if (tx == null) {
+                // We have no record of the transaction, nothing to do
+                LOG.debug("{}: no state for transaction {}, purge is complete", persistenceId(), id);
+                return new TransactionPurgeResponse(id, request.getSequence());
+            }
         } else {
-            final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
-            if (maybeReplay.isPresent()) {
-                final TransactionSuccess<?> replay = maybeReplay.get();
-                LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
-                return replay;
+            tx = transactions.get(id);
+            if (tx == null) {
+                // The transaction does not exist and we are about to create it, check sequence number
+                if (request.getSequence() != 0) {
+                    LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
+                    throw UNSEQUENCED_START;
+                }
+
+                tx = createTransaction(request, id);
+                transactions.put(id, tx);
+            } else {
+                final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
+                if (maybeReplay.isPresent()) {
+                    final TransactionSuccess<?> replay = maybeReplay.get();
+                    LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
+                    return replay;
+                }
             }
         }
 
@@ -107,4 +118,10 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
         throws RequestException;
 
     abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier())
+                .add("persistenceId", persistenceId).add("transactions", transactions).toString();
+    }
 }
index 36a876b..465cfcb 100644 (file)
@@ -17,6 +17,8 @@ import org.opendaylight.controller.cluster.access.commands.ReadTransactionReques
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
@@ -54,6 +56,9 @@ final class FrontendReadOnlyTransaction extends FrontendTransaction {
             return handleReadTransaction((ReadTransactionRequest) request);
         } else if (request instanceof TransactionAbortRequest) {
             return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+        } else if (request instanceof TransactionPurgeRequest) {
+            // No-op for now
+            return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
         } else {
             throw new UnsupportedRequestException(request);
         }
index 0dcaac2..2c2a287 100644 (file)
@@ -32,6 +32,8 @@ import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
@@ -102,6 +104,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
             return null;
         } else if (request instanceof TransactionAbortRequest) {
             return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+        } else if (request instanceof TransactionPurgeRequest) {
+            // No-op for now
+            return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
         } else {
             throw new UnsupportedRequestException(request);
         }
index 16c180e..1ef775e 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import java.util.ArrayDeque;
@@ -139,4 +140,12 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
         recordResponse(envelope.getMessage().getSequence(), failure);
         envelope.sendFailure(failure, executionTime(startTime));
     }
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier())
+                .add("expectedSequence", expectedSequence).add("firstReplaySequence", firstReplaySequence)
+                .add("lastPurgedSequence", lastPurgedSequence)
+                .toString();
+    }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.