BUG-5280: add frontend state lifecycle 65/49265/36
authorRobert Varga <rovarga@cisco.com>
Mon, 12 Dec 2016 18:34:38 +0000 (19:34 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 20 Mar 2017 05:11:53 +0000 (05:11 +0000)
When transitioning between roles we need to take care of proper
handling of state known about frontend. This patch adds
the leader/non-leader transitions, creating the state
from FrontendMetadata and forgetting it when transactions are
committed.

Our replicated log needs to grow more entries to accurately
replicate the state of the conversation between the frontend
and backend, so if a member becomes the leader it has
an understanding of which transactions and transaction
chains have been completed (aborted, committed, purged). These
are replicated before a response is sent to the frontend, so
if a leader before they replicate successfully, the frontend
will see them as a timeout and retry them (and be routed to the
new leader).

Both leader and followers are expected to keep the metadata
handy: the leader keeps for the purpose of being able to generate
a summarized snapshot. The followers keep it so their metadata
view is consistent with the contents of the data tree.

Change-Id: I72eea91ee84716cdd8a6a3521b42cca9a9393aff
Signed-off-by: Robert Varga <rovarga@cisco.com>
31 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ClosedTransactionException.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java
opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/ConnectClientRequestTest.java
opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryExceptionTest.java
opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionExceptionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendHistoryMetadataBuilder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/FrontendHistoryMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/FrontendShardDataTreeSnapshotMetadataTest.java

diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ClosedTransactionException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ClosedTransactionException.java
new file mode 100644 (file)
index 0000000..ece4720
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received a request for a transaction which has already
+ * been closed, either via a successful commit or abort (which is indicated via {@link #isSuccessful()}. This can
+ * happen if the corresponding journal record is replicated, but the message to the frontend gets lost and the backed
+ * leader moved before the frontend retried the corresponding request.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ClosedTransactionException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    private final boolean successful;
+
+    public ClosedTransactionException(final boolean successful) {
+        super("Transaction has been " + (successful ? "committed" : "aborted"));
+        this.successful = successful;
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return false;
+    }
+
+    public boolean isSuccessful() {
+        return successful;
+    }
+}
index bf86c8b9969c7057913f26241d09d30f71f0e402..d4293c4741a4de010e2a89e6ee9d1f975039c6d3 100644 (file)
@@ -26,6 +26,6 @@ public final class DeadHistoryException extends RequestException {
 
     @Override
     public boolean isRetriable() {
-        return true;
+        return false;
     }
 }
index 562c0e59b923c27e1611925aab41a61604840777..fee439984ac53c15fbdc37c09c618ba410bda150 100644 (file)
@@ -8,6 +8,9 @@
 package org.opendaylight.controller.cluster.access.commands;
 
 import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.RangeSet;
+import com.google.common.primitives.UnsignedLong;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 
 /**
@@ -20,12 +23,19 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 public final class DeadTransactionException extends RequestException {
     private static final long serialVersionUID = 1L;
 
-    public DeadTransactionException(final long lastSeenTransaction) {
-        super("Transaction up to " + Long.toUnsignedString(lastSeenTransaction) + " are accounted for");
+    private final RangeSet<UnsignedLong> purgedIdentifiers;
+
+    public DeadTransactionException(final RangeSet<UnsignedLong> purgedIdentifiers) {
+        super("Transactions " + purgedIdentifiers + " have been purged");
+        this.purgedIdentifiers = ImmutableRangeSet.copyOf(purgedIdentifiers);
     }
 
     @Override
     public boolean isRetriable() {
-        return true;
+        return false;
+    }
+
+    public RangeSet<UnsignedLong> getPurgedIdentifier() {
+        return purgedIdentifiers;
     }
 }
index b4da36f47269907f47320a703da03726e152ee4a..02832ae2848cdb4a1709d3c0ad50fa30bcc27637 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.access.commands;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableRangeSet;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.ABIVersion;
@@ -46,7 +47,7 @@ public class ConnectClientRequestTest extends AbstractRequestTest<ConnectClientR
 
     @Test
     public void toRequestFailureTest() throws Exception {
-        final RequestException exception = new DeadTransactionException(0);
+        final RequestException exception = new DeadTransactionException(ImmutableRangeSet.of());
         final ConnectClientFailure failure = OBJECT.toRequestFailure(exception);
         Assert.assertNotNull(failure);
     }
index 595ef1902b55b62138910906bffb892d72ca0de9..f4d88ec4a8f975b86c85a5441319f1245d1eaa09 100644 (file)
@@ -7,8 +7,9 @@
  */
 package org.opendaylight.controller.cluster.access.commands;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 import org.opendaylight.controller.cluster.access.concepts.RequestExceptionTest;
 
@@ -18,13 +19,13 @@ public class DeadHistoryExceptionTest extends RequestExceptionTest<DeadHistoryEx
 
     @Override
     protected void isRetriable() {
-        assertTrue(OBJECT.isRetriable());
+        assertFalse(OBJECT.isRetriable());
     }
 
     @Override
     protected void checkMessage() {
         final String message = OBJECT.getMessage();
-        assertTrue("Histories up to 100 are accounted for".equals(message));
+        assertEquals("Histories up to 100 are accounted for", message);
         assertNull(OBJECT.getCause());
     }
 
index a6103306948f931fed64d4f9fc34f7c1aade94c4..8b6e52c949e8ada35fc9e260af29ef331bd03bac 100644 (file)
@@ -7,25 +7,27 @@
  */
 package org.opendaylight.controller.cluster.access.commands;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.ImmutableRangeSet;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestExceptionTest;
 
 public class DeadTransactionExceptionTest extends RequestExceptionTest<DeadTransactionException> {
 
-    private static final RequestException OBJECT = new DeadTransactionException(100);
+    private static final RequestException OBJECT = new DeadTransactionException(ImmutableRangeSet.of());
 
     @Override
     protected void isRetriable() {
-        assertTrue(OBJECT.isRetriable());
+        assertFalse(OBJECT.isRetriable());
     }
 
     @Override
     protected void checkMessage() {
         final String message = OBJECT.getMessage();
-        assertTrue("Transaction up to 100 are accounted for".equals(message));
+        assertEquals("Transactions [] have been purged", message);
         assertNull(OBJECT.getCause());
     }
 
index dbea3a1cbb5cd00058c0bcf38dff6ce7f0b1e1b1..f23a8ffe98abcec519fd9ed73f006c22a2417553 100644 (file)
@@ -9,13 +9,19 @@ package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
+import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
@@ -41,12 +47,22 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
     private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0);
 
     private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
+    private final RangeSet<UnsignedLong> purgedTransactions;
     private final String persistenceId;
-    private final Ticker ticker;
+    private final ShardDataTree tree;
 
-    AbstractFrontendHistory(final String persistenceId, final Ticker ticker) {
+    /**
+     * Transactions closed by the previous leader. Boolean indicates whether the transaction was committed (true) or
+     * aborted (false). We only ever shrink these.
+     */
+    private Map<UnsignedLong, Boolean> closedTransactions;
+
+    AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree,
+        final Map<UnsignedLong, Boolean> closedTransactions, final RangeSet<UnsignedLong> purgedTransactions) {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
-        this.ticker = Preconditions.checkNotNull(ticker);
+        this.tree = Preconditions.checkNotNull(tree);
+        this.closedTransactions = Preconditions.checkNotNull(closedTransactions);
+        this.purgedTransactions = Preconditions.checkNotNull(purgedTransactions);
     }
 
     final String persistenceId() {
@@ -54,45 +70,104 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
     }
 
     final long readTime() {
-        return ticker.read();
+        return tree.ticker().read();
     }
 
     final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         final TransactionIdentifier id = request.getTarget();
+        final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId());
 
-        FrontendTransaction tx;
         if (request instanceof TransactionPurgeRequest) {
-            tx = transactions.remove(id);
+            if (purgedTransactions.contains(ul)) {
+                // Retransmitted purge request: nothing to do
+                LOG.debug("{}: transaction {} already purged", persistenceId, id);
+                return new TransactionPurgeResponse(id, request.getSequence());
+            }
+
+            // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it
+            // to an ImmutableMap, which does not allow remove().
+            if (closedTransactions.containsKey(ul)) {
+                tree.purgeTransaction(id, () -> {
+                    closedTransactions.remove(ul);
+                    if (closedTransactions.isEmpty()) {
+                        closedTransactions = ImmutableMap.of();
+                    }
+
+                    purgedTransactions.add(Range.singleton(ul));
+                    LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id);
+                    envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
+                });
+                return null;
+            }
+
+            final FrontendTransaction tx = transactions.get(id);
             if (tx == null) {
-                // We have no record of the transaction, nothing to do
-                LOG.debug("{}: no state for transaction {}, purge is complete", persistenceId(), id);
+                // This should never happen because the purge callback removes the transaction and puts it into
+                // purged transactions in one go. If it does, we warn about the situation and
+                LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId,
+                    id, purgedTransactions);
+                purgedTransactions.add(Range.singleton(ul));
                 return new TransactionPurgeResponse(id, request.getSequence());
             }
+
+            tx.purge(() -> {
+                purgedTransactions.add(Range.singleton(ul));
+                transactions.remove(id);
+                LOG.debug("{}: finished purging transaction {}", persistenceId(), id);
+                envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
+            });
+            return null;
+        }
+
+        if (purgedTransactions.contains(ul)) {
+            LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions);
+            throw new DeadTransactionException(purgedTransactions);
+        }
+        final Boolean closed = closedTransactions.get(ul);
+        if (closed != null) {
+            final boolean successful = closed.booleanValue();
+            LOG.debug("{}: Request {} refers to a {} transaction", persistenceId, request, successful ? "successful"
+                    : "failed");
+            throw new ClosedTransactionException(successful);
+        }
+
+        FrontendTransaction tx = transactions.get(id);
+        if (tx == null) {
+            // The transaction does not exist and we are about to create it, check sequence number
+            if (request.getSequence() != 0) {
+                LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
+                throw UNSEQUENCED_START;
+            }
+
+            tx = createTransaction(request, id);
+            transactions.put(id, tx);
         } else {
-            tx = transactions.get(id);
-            if (tx == null) {
-                // The transaction does not exist and we are about to create it, check sequence number
-                if (request.getSequence() != 0) {
-                    LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
-                    throw UNSEQUENCED_START;
-                }
-
-                tx = createTransaction(request, id);
-                transactions.put(id, tx);
-            } else {
-                final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
-                if (maybeReplay.isPresent()) {
-                    final TransactionSuccess<?> replay = maybeReplay.get();
-                    LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
-                    return replay;
-                }
+            final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
+            if (maybeReplay.isPresent()) {
+                final TransactionSuccess<?> replay = maybeReplay.get();
+                LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
+                return replay;
             }
         }
 
         return tx.handleRequest(request, envelope, now);
     }
 
+    void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
+        LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
+        tree.closeTransactionChain(getIdentifier(), () -> {
+            envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
+        });
+    }
+
+    void purge(final long sequence, final RequestEnvelope envelope, final long now) {
+        LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
+        tree.purgeTransactionChain(getIdentifier(), () -> {
+            envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
+        });
+    }
+
     private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
             throws RequestException {
         if (request instanceof CommitLocalTransactionRequest) {
index 222280cf3b0ade880f0486c56f2517dfb90b0620..56e11c124984c6042f269cced30b06322d9e9475 100644 (file)
@@ -13,6 +13,8 @@ import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract base for transactions running on SharrdDataTree.
@@ -22,12 +24,17 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 @NotThreadSafe
 abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
         implements Identifiable<TransactionIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractShardDataTreeTransaction.class);
+
+    private final ShardDataTreeTransactionParent parent;
     private final TransactionIdentifier id;
     private final T snapshot;
 
     private boolean closed;
 
-    AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) {
+    AbstractShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id,
+        final T snapshot) {
+        this.parent = Preconditions.checkNotNull(parent);
         this.snapshot = Preconditions.checkNotNull(snapshot);
         this.id = Preconditions.checkNotNull(id);
     }
@@ -37,6 +44,10 @@ abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
         return id;
     }
 
+    final ShardDataTreeTransactionParent getParent() {
+        return parent;
+    }
+
     final T getSnapshot() {
         return snapshot;
     }
@@ -59,11 +70,21 @@ abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
         return true;
     }
 
+    final void abort(final Runnable callback) {
+        Preconditions.checkState(close(), "Transaction is already closed");
+        parent.abortTransaction(this, callback);
+    }
+
+    final void purge(final Runnable callback) {
+        if (!closed) {
+            LOG.warn("Purging unclosed transaction {}", id);
+        }
+        parent.purgeTransaction(id, callback);
+    }
+
     @Override
     public final String toString() {
         return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot)
                 .toString();
     }
-
-    abstract void abort();
 }
index cd1235b46072ce68a269fb393375795c9bed0d0e..8a0ce605dfd18bb7bcd205c8cdebce9b6691f356 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
@@ -15,6 +16,8 @@ import com.google.common.collect.TreeRangeSet;
 import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -22,8 +25,13 @@ import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMet
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
 import org.opendaylight.yangtools.concepts.Builder;
 import org.opendaylight.yangtools.concepts.Identifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+@NotThreadSafe
 final class FrontendClientMetadataBuilder implements Builder<FrontendClientMetadata>, Identifiable<ClientIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class);
+
     private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
     private final RangeSet<UnsignedLong> purgedHistories;
     private final ClientIdentifier identifier;
@@ -55,25 +63,100 @@ final class FrontendClientMetadataBuilder implements Builder<FrontendClientMetad
     }
 
     void onHistoryCreated(final LocalHistoryIdentifier historyId) {
-        // TODO Auto-generated method stub
-
+        final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId);
+        final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
+        if (oldMeta != null) {
+            // This should not be happening, warn about it
+            LOG.warn("Reused local history {}", historyId);
+        } else {
+            LOG.debug("Created local history {}", historyId);
+        }
     }
 
     void onHistoryClosed(final LocalHistoryIdentifier historyId) {
-        ensureHistory(historyId).onHistoryClosed();
+        final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId);
+        if (builder != null) {
+            builder.onHistoryClosed();
+            LOG.debug("Closed history {}", historyId);
+        } else {
+            LOG.warn("Closed unknown history {}, ignoring", historyId);
+        }
     }
 
     void onHistoryPurged(final LocalHistoryIdentifier historyId) {
-        currentHistories.remove(historyId);
+        final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId);
+        if (history == null) {
+            LOG.warn("Purging unknown history {}", historyId);
+        }
+
         // XXX: do we need to account for cookies?
         purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(historyId.getHistoryId())));
+        LOG.debug("Purged history {}", historyId);
+    }
+
+    void onTransactionAborted(final TransactionIdentifier txId) {
+        final FrontendHistoryMetadataBuilder history = getHistory(txId);
+        if (history != null) {
+            history.onTransactionAborted(txId);
+            LOG.debug("Committed transaction {}", txId);
+        } else {
+            LOG.warn("Unknown history for aborted transaction {}, ignoring", txId);
+        }
     }
 
     void onTransactionCommitted(final TransactionIdentifier txId) {
-        ensureHistory(txId.getHistoryId()).onTransactionCommitted(txId);
+        final FrontendHistoryMetadataBuilder history = getHistory(txId);
+        if (history != null) {
+            history.onTransactionCommitted(txId);
+            LOG.debug("Aborted transaction {}", txId);
+        } else {
+            LOG.warn("Unknown history for commited transaction {}, ignoring", txId);
+        }
+    }
+
+    void onTransactionPurged(final TransactionIdentifier txId) {
+        final FrontendHistoryMetadataBuilder history = getHistory(txId);
+        if (history != null) {
+            history.onTransactionPurged(txId);
+            LOG.debug("Purged transaction {}", txId);
+        } else {
+            LOG.warn("Unknown history for purged transaction {}, ignoring", txId);
+        }
+    }
+
+    /**
+     * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
+     *
+     * @param shard parent shard
+     * @return Leader frontend state
+     */
+    @Nonnull LeaderFrontendState toLeaderState(@Nonnull final Shard shard) {
+        // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
+        //       interactions would get intertwined leading to inconsistencies.
+        final Map<LocalHistoryIdentifier, LocalFrontendHistory> histories = new HashMap<>();
+        for (FrontendHistoryMetadataBuilder e : currentHistories.values()) {
+            if (e.getIdentifier().getHistoryId() != 0) {
+                final AbstractFrontendHistory state = e.toLeaderState(shard);
+                Verify.verify(state instanceof LocalFrontendHistory);
+                histories.put(e.getIdentifier(), (LocalFrontendHistory) state);
+            }
+        }
+
+        final AbstractFrontendHistory singleHistory;
+        final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get(
+            new LocalHistoryIdentifier(identifier, 0));
+        if (singleHistoryMeta == null) {
+            final ShardDataTree tree = shard.getDataStore();
+            singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree);
+        } else {
+            singleHistory = singleHistoryMeta.toLeaderState(shard);
+        }
+
+        return new LeaderFrontendState(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
+            TreeRangeSet.create(purgedHistories), singleHistory, histories);
     }
 
-    private FrontendHistoryMetadataBuilder ensureHistory(final LocalHistoryIdentifier historyId) {
-        return currentHistories.computeIfAbsent(historyId, FrontendHistoryMetadataBuilder::new);
+    private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
+        return currentHistories.get(txId.getHistoryId());
     }
 }
index 6b901b4fa85988cc14ce4557d865a09d4a28d034..beed765a690590bd8174210861a06c7a2b78f09c 100644 (file)
@@ -8,6 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -17,18 +24,23 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 
 final class FrontendHistoryMetadataBuilder implements Builder<FrontendHistoryMetadata>,
         Identifiable<LocalHistoryIdentifier> {
+
+    private final Map<UnsignedLong, Boolean> closedTransactions;
+    private final RangeSet<UnsignedLong> purgedTransactions;
     private final LocalHistoryIdentifier identifier;
 
-    private long nextTransaction;
     private boolean closed;
 
     FrontendHistoryMetadataBuilder(final LocalHistoryIdentifier identifier) {
         this.identifier = Preconditions.checkNotNull(identifier);
+        this.purgedTransactions = TreeRangeSet.create();
+        this.closedTransactions = new HashMap<>(2);
     }
 
     FrontendHistoryMetadataBuilder(final ClientIdentifier clientId, final FrontendHistoryMetadata meta) {
         identifier = new LocalHistoryIdentifier(clientId, meta.getHistoryId(), meta.getCookie());
-        nextTransaction = meta.getNextTransaction();
+        closedTransactions = new HashMap<>(meta.getClosedTransactions());
+        purgedTransactions = TreeRangeSet.create(meta.getPurgedTransactions());
         closed = meta.isClosed();
     }
 
@@ -39,14 +51,42 @@ final class FrontendHistoryMetadataBuilder implements Builder<FrontendHistoryMet
 
     @Override
     public FrontendHistoryMetadata build() {
-        return new FrontendHistoryMetadata(identifier.getHistoryId(), identifier.getCookie(), nextTransaction, closed);
+        return new FrontendHistoryMetadata(identifier.getHistoryId(), identifier.getCookie(), closed,
+            closedTransactions, purgedTransactions);
     }
 
     void onHistoryClosed() {
+        Preconditions.checkState(identifier.getHistoryId() != 0);
         closed = true;
     }
 
+    void onTransactionAborted(final TransactionIdentifier txId) {
+        closedTransactions.put(UnsignedLong.fromLongBits(txId.getTransactionId()), Boolean.FALSE);
+    }
+
     void onTransactionCommitted(final TransactionIdentifier txId) {
-        nextTransaction = txId.getTransactionId() + 1;
+        closedTransactions.put(UnsignedLong.fromLongBits(txId.getTransactionId()), Boolean.TRUE);
+    }
+
+    void onTransactionPurged(final TransactionIdentifier txId) {
+        final UnsignedLong id = UnsignedLong.fromLongBits(txId.getTransactionId());
+        closedTransactions.remove(id);
+        purgedTransactions.add(Range.singleton(id));
+    }
+
+    /**
+     * Transform frontend metadata for a particular client history into its {@link LocalFrontendHistory} counterpart.
+     *
+     * @param shard parent shard
+     * @return Leader history state
+     */
+    @Nonnull AbstractFrontendHistory toLeaderState(@Nonnull final Shard shard) {
+        if (identifier.getHistoryId() == 0) {
+            return StandaloneFrontendHistory.recreate(shard.persistenceId(), identifier.getClientId(),
+                shard.getDataStore(), closedTransactions, purgedTransactions);
+        }
+
+        return LocalFrontendHistory.recreate(shard.persistenceId(), shard.getDataStore(),
+            shard.getDataStore().recreateTransactionChain(identifier, closed), closedTransactions, purgedTransactions);
     }
 }
index 1788f994c872503e73ebe64e64eb33175e2493bd..5f86125523cb37fd0be72dc5157947d21a316f7c 100644 (file)
@@ -7,9 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
@@ -32,6 +35,11 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
     private static final Logger LOG = LoggerFactory.getLogger(FrontendMetadata.class);
 
     private final Map<FrontendIdentifier, FrontendClientMetadataBuilder> clients = new HashMap<>();
+    private final String shardName;
+
+    FrontendMetadata(final String shardName) {
+        this.shardName = Preconditions.checkNotNull(shardName);
+    }
 
     @Override
     Class<FrontendShardDataTreeSnapshotMetadata> getSupportedType() {
@@ -67,9 +75,9 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
         final FrontendClientMetadataBuilder client = new FrontendClientMetadataBuilder(id);
         final FrontendClientMetadataBuilder previous = clients.put(id.getFrontendId(), client);
         if (previous != null) {
-            LOG.debug("Replaced client {} with {}", previous, client);
+            LOG.debug("{}: Replaced client {} with {}", shardName, previous, client);
         } else {
-            LOG.debug("Added client {}", client);
+            LOG.debug("{}: Added client {}", shardName, client);
         }
         return client;
     }
@@ -89,8 +97,27 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
         ensureClient(historyId.getClientId()).onHistoryPurged(historyId);
     }
 
+    @Override
+    void onTransactionAborted(final TransactionIdentifier txId) {
+        ensureClient(txId.getHistoryId().getClientId()).onTransactionAborted(txId);
+    }
+
     @Override
     void onTransactionCommitted(final TransactionIdentifier txId) {
         ensureClient(txId.getHistoryId().getClientId()).onTransactionCommitted(txId);
     }
+
+    @Override
+    void onTransactionPurged(final TransactionIdentifier txId) {
+        ensureClient(txId.getHistoryId().getClientId()).onTransactionPurged(txId);
+    }
+
+    /**
+     * Transform frontend metadata into an active leader state map.
+     *
+     * @return Leader frontend state
+     */
+    @Nonnull Map<FrontendIdentifier, LeaderFrontendState> toLeaderState(@Nonnull final Shard shard) {
+        return new HashMap<>(Maps.transformValues(clients, meta -> meta.toLeaderState(shard)));
+    }
 }
index 465cfcbee10af4a97b47e9cee17e84869433bcd6..5c0d91396347065375beb3e9eb583ba6ad8eaf7c 100644 (file)
@@ -17,8 +17,6 @@ import org.opendaylight.controller.cluster.access.commands.ReadTransactionReques
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
@@ -55,19 +53,22 @@ final class FrontendReadOnlyTransaction extends FrontendTransaction {
         } else if (request instanceof ReadTransactionRequest) {
             return handleReadTransaction((ReadTransactionRequest) request);
         } else if (request instanceof TransactionAbortRequest) {
-            return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
-        } else if (request instanceof TransactionPurgeRequest) {
-            // No-op for now
-            return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
+            handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+            return null;
         } else {
             throw new UnsupportedRequestException(request);
         }
     }
 
-    private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        openTransaction.abort();
-        return new TransactionAbortSuccess(openTransaction.getIdentifier(), request.getSequence());
+    @Override
+    void purge(final Runnable callback) {
+        openTransaction.purge(callback);
+    }
+
+    private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
+        openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(request.getTarget(),
+            request.getSequence())));
     }
 
     private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
index 2c2a287670f2e2676d42721c31641e181569a0e4..9c4d4ddbca9f1cd1bdd81bb3654fc76d156b7fee 100644 (file)
@@ -32,8 +32,6 @@ import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
@@ -103,15 +101,18 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionAbortRequest) {
-            return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
-        } else if (request instanceof TransactionPurgeRequest) {
-            // No-op for now
-            return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
+            handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+            return null;
         } else {
             throw new UnsupportedRequestException(request);
         }
     }
 
+    @Override
+    void purge(final Runnable callback) {
+        openTransaction.purge(callback);
+    }
+
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@@ -145,11 +146,12 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         });
     }
 
-    private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
+    private void handleTransactionAbort(final TransactionAbortRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         if (readyCohort == null) {
-            openTransaction.abort();
-            return new TransactionAbortSuccess(getIdentifier(), request.getSequence());
+            openTransaction.abort(() -> recordAndSendSuccess(envelope, now,
+                new TransactionAbortSuccess(getIdentifier(), request.getSequence())));
+            return;
         }
 
         readyCohort.abort(new FutureCallback<Void>() {
@@ -168,7 +170,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
             }
         });
-        return null;
     }
 
     private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
@@ -298,9 +299,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
         switch (maybeProto.get()) {
             case ABORT:
-                openTransaction.abort();
+                openTransaction.abort(() -> replyModifySuccess(request.getSequence()));
                 openTransaction = null;
-                return replyModifySuccess(request.getSequence());
+                return null;
             case READY:
                 ensureReady();
                 return replyModifySuccess(request.getSequence());
index 1ef775e041d6cc51222db6852d285cc1627ef597..fccf3bf4a1d321384dafb49cf213cbea3f313922 100644 (file)
@@ -112,6 +112,9 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
     abstract @Nullable TransactionSuccess<?> handleRequest(TransactionRequest<?> request,
             RequestEnvelope envelope, long now) throws RequestException;
 
+    // Final request, needs routing to the data tree, so it can persist a tombstone
+    abstract void purge(Runnable callback);
+
     private void recordResponse(final long sequence, final Object response) {
         if (replayQueue.isEmpty()) {
             firstReplaySequence = sequence;
@@ -148,4 +151,5 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
                 .add("lastPurgedSequence", lastPurgedSequence)
                 .toString();
     }
+
 }
index deca605a5b97ec18d041d050d5fe0c467feb436e..409268501a4f5e6914b0ca61f7aadcebe8c35037 100644 (file)
@@ -47,10 +47,10 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
 
     // Histories which have not been purged
-    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories = new HashMap<>();
+    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
 
     // RangeSet performs automatic merging, hence we keep minimal state tracking information
-    private final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
+    private final RangeSet<UnsignedLong> purgedHistories;
 
     // Used for all standalone transactions
     private final AbstractFrontendHistory standaloneHistory;
@@ -61,7 +61,6 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private long expectedTxSequence;
     private Long lastSeenHistory = null;
 
-
     // TODO: explicit failover notification
     //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
     //       to the frontend client -- that way it can immediately start sending requests
@@ -72,10 +71,19 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     // - per-RequestException throw counters
 
     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+        this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId,
+            clientId, tree), new HashMap<>());
+    }
+
+    LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
+        final RangeSet<UnsignedLong> purgedHistories, final AbstractFrontendHistory standaloneHistory,
+        final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
         this.clientId = Preconditions.checkNotNull(clientId);
         this.tree = Preconditions.checkNotNull(tree);
-        standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree);
+        this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
+        this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
+        this.localHistories = Preconditions.checkNotNull(localHistories);
     }
 
     @Override
@@ -133,7 +141,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             lastSeenHistory = id.getHistoryId();
         }
 
-        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(id)));
+        localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id));
         LOG.debug("{}: created history {}", persistenceId, id);
         return new LocalHistorySuccess(id, request.getSequence());
     }
index 8e32c76ba794679dc3cdd84004946f1b57087ade..94c0965c007178828cdc04cdeea3170962abdbf8 100644 (file)
@@ -8,15 +8,16 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
-import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -25,20 +26,28 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 final class LocalFrontendHistory extends AbstractFrontendHistory {
-    private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class);
-
     private final ShardDataTreeTransactionChain chain;
-    private final ShardDataTree tree;
-
-    private Long lastSeenTransaction;
 
-    LocalFrontendHistory(final String persistenceId, final ShardDataTree tree,
-            final ShardDataTreeTransactionChain chain) {
-        super(persistenceId, tree.ticker());
-        this.tree = Preconditions.checkNotNull(tree);
+    private LocalFrontendHistory(final String persistenceId, final ShardDataTree tree,
+            final ShardDataTreeTransactionChain chain, final Map<UnsignedLong, Boolean> closedTransactions,
+            final RangeSet<UnsignedLong> purgedTransactions) {
+        super(persistenceId, tree, closedTransactions, purgedTransactions);
         this.chain = Preconditions.checkNotNull(chain);
     }
 
+    static LocalFrontendHistory create(final String persistenceId, final ShardDataTree tree,
+            final LocalHistoryIdentifier historyId) {
+        return new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(historyId), ImmutableMap.of(),
+            TreeRangeSet.create());
+    }
+
+    static LocalFrontendHistory recreate(final String persistenceId, final ShardDataTree tree,
+            final ShardDataTreeTransactionChain chain, final Map<UnsignedLong, Boolean> closedTransactions,
+            final RangeSet<UnsignedLong> purgedTransactions) {
+        return new LocalFrontendHistory(persistenceId, tree, chain, new HashMap<>(closedTransactions),
+            TreeRangeSet.create(purgedTransactions));
+    }
+
     @Override
     public LocalHistoryIdentifier getIdentifier() {
         return chain.getIdentifier();
@@ -46,23 +55,17 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
 
     @Override
     FrontendTransaction createOpenSnapshot(final TransactionIdentifier id) throws RequestException {
-        checkDeadTransaction(id);
-        lastSeenTransaction = id.getTransactionId();
         return FrontendReadOnlyTransaction.create(this, chain.newReadOnlyTransaction(id));
     }
 
     @Override
     FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
-        checkDeadTransaction(id);
-        lastSeenTransaction = id.getTransactionId();
         return FrontendReadWriteTransaction.createOpen(this, chain.newReadWriteTransaction(id));
     }
 
     @Override
     FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod)
             throws RequestException {
-        checkDeadTransaction(id);
-        lastSeenTransaction = id.getTransactionId();
         return FrontendReadWriteTransaction.createReady(this, id, mod);
     }
 
@@ -70,29 +73,4 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
         return chain.createReadyCohort(id, mod);
     }
-
-    void destroy(final long sequence, final RequestEnvelope envelope, final long now)
-            throws RequestException {
-        LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
-        tree.closeTransactionChain(getIdentifier(), () -> {
-            envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
-        });
-    }
-
-    void purge(final long sequence, final RequestEnvelope envelope, final long now) {
-        LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
-        tree.purgeTransactionChain(getIdentifier(), () -> {
-            envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
-        });
-    }
-
-    private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException {
-        // FIXME: check if this history is still open
-        // FIXME: check if the last transaction has been submitted
-
-        // Transaction identifiers within a local history have to have increasing IDs
-        if (lastSeenTransaction != null && Long.compareUnsigned(lastSeenTransaction, id.getTransactionId()) >= 0) {
-            throw new DeadTransactionException(lastSeenTransaction);
-        }
-    }
 }
index 040a652a0ffbb6c18c1563fc0fdb9b0d304777a5..4df1352b1904467f15aa9d4df846884f7cb1745a 100644 (file)
@@ -11,12 +11,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 
 final class ReadOnlyShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeSnapshot> {
-    ReadOnlyShardDataTreeTransaction(final TransactionIdentifier id, final DataTreeSnapshot snapshot) {
-        super(id, snapshot);
-    }
-
-    @Override
-    void abort() {
-        close();
+    ReadOnlyShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id,
+        final DataTreeSnapshot snapshot) {
+        super(parent, id, snapshot);
     }
 }
index 492f6ec9f62efa2314ada5b8e27cb9fa3c94cb25..fe5dba8a38a57b60ec0503de68372fd5ba1cad55 100644 (file)
@@ -12,24 +12,14 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
-    private final ShardDataTreeTransactionParent parent;
 
-    ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent,
-            final TransactionIdentifier id, final DataTreeModification modification) {
-        super(id, modification);
-        this.parent = Preconditions.checkNotNull(parent);
-    }
-
-    @Override
-    void abort() {
-        Preconditions.checkState(close(), "Transaction is already closed");
-
-        parent.abortTransaction(this);
+    ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id,
+        final DataTreeModification modification) {
+        super(parent, id, modification);
     }
 
     ShardDataTreeCohort ready() {
         Preconditions.checkState(close(), "Transaction is already closed");
-
-        return parent.finishTransaction(this);
+        return getParent().finishTransaction(this);
     }
 }
index 7ca79fc2349add0e9c19236a945aaed53676b45d..7915a6286ed8746149f62dc349bfb4d79a3ce29d 100644 (file)
@@ -18,13 +18,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
+import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Range;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
@@ -164,8 +165,8 @@ public class Shard extends RaftActor {
 
     private final ShardTransactionMessageRetrySupport messageRetrySupport;
 
-    private final FrontendMetadata frontendMetadata = new FrontendMetadata();
-    private final Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = new HashMap<>();
+    private final FrontendMetadata frontendMetadata;
+    private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
 
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
@@ -174,6 +175,7 @@ public class Shard extends RaftActor {
         this.name = builder.getId().toString();
         this.datastoreContext = builder.getDatastoreContext();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
+        this.frontendMetadata = new FrontendMetadata(name);
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -733,7 +735,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
-            store.closeAllTransactionChains();
+            store.purgeLeaderState();
         }
 
         if (hasLeader && !isIsolatedLeader()) {
@@ -745,8 +747,18 @@ public class Shard extends RaftActor {
     protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
 
-        boolean hasLeader = hasLeader();
-        if (hasLeader && !isLeader()) {
+        final boolean hasLeader = hasLeader();
+        if (!hasLeader) {
+            // No leader implies we are not the leader, lose frontend state if we have any. This also places
+            // an explicit guard so the map will not get modified accidentally.
+            if (!knownFrontends.isEmpty()) {
+                LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
+                knownFrontends = ImmutableMap.of();
+            }
+            return;
+        }
+
+        if (!isLeader()) {
             // Another leader was elected. If we were the previous leader and had pending transactions, convert
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
@@ -765,9 +777,13 @@ public class Shard extends RaftActor {
                 commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
                         + "change and the leader address isn't available.", this);
             }
+        } else {
+            // We have become the leader, we need to reconstruct frontend state
+            knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+            LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
         }
 
-        if (hasLeader && !isIsolatedLeader()) {
+        if (!isIsolatedLeader()) {
             messageRetrySupport.retryMessages();
         }
     }
index 613f9adbc9355c4cabb8a776f31069bdfba6660c..e8469d439d7df19b6fc5828574492beadf5109f1 100644 (file)
@@ -45,12 +45,14 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
@@ -323,6 +325,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     ((CommitTransactionPayload) payload).getCandidate();
             applyRecoveryCandidate(e.getValue());
             allMetadataCommittedTransaction(e.getKey());
+        } else if (payload instanceof AbortTransactionPayload) {
+            allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+        } else if (payload instanceof PurgeTransactionPayload) {
+            allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof CreateLocalHistoryPayload) {
             allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof CloseLocalHistoryPayload) {
@@ -384,6 +390,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 Verify.verify(identifier instanceof TransactionIdentifier);
                 payloadReplicationComplete((TransactionIdentifier) identifier);
             }
+        } else if (payload instanceof AbortTransactionPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((AbortTransactionPayload) payload);
+            } else {
+                allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof PurgeTransactionPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((PurgeTransactionPayload) payload);
+            } else {
+                allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof CloseLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((CloseLocalHistoryPayload) payload);
+            } else {
+                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+            }
         } else if (payload instanceof CloseLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CloseLocalHistoryPayload) payload);
@@ -440,12 +464,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         finishCommit(current.cohort);
     }
 
+    private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionAborted(txId);
+        }
+    }
+
     private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
         for (ShardDataTreeMetadata<?> m : metadata) {
             m.onTransactionCommitted(txId);
         }
     }
 
+    private void allMetadataPurgedTransaction(final TransactionIdentifier txId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionPurged(txId);
+        }
+    }
+
     private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) {
         for (ShardDataTreeMetadata<?> m : metadata) {
             m.onHistoryCreated(historyId);
@@ -464,6 +500,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
+    /**
+     * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
+     * this method is used for re-establishing state when we are taking over
+     *
+     * @param historyId Local history identifier
+     * @param closed True if the chain should be created in closed state (i.e. pending purge)
+     * @return Transaction chain handle
+     */
+    ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+            final boolean closed) {
+        final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
+        final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
+        Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId,
+                existing);
+        return ret;
+    }
+
     ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
@@ -477,7 +530,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
         if (txId.getHistoryId().getHistoryId() == 0) {
-            return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
+            return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
         }
 
         return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
@@ -518,14 +571,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     /**
-     * Immediately close all transaction chains.
+     * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
+     * replication callbacks.
      */
-    void closeAllTransactionChains() {
+    void purgeLeaderState() {
         for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
             chain.close();
         }
 
         transactionChains.clear();
+        replicationCallbacks.clear();
     }
 
     /**
@@ -597,8 +652,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
-        // Intentional no-op
+    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+        final TransactionIdentifier id = transaction.getIdentifier();
+        LOG.debug("{}: aborting transaction {}", logContext, id);
+        replicatePayload(id, AbortTransactionPayload.create(id), callback);
+    }
+
+
+    @Override
+    void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+        LOG.debug("{}: purging transaction {}", logContext, id);
+        replicatePayload(id, PurgeTransactionPayload.create(id), callback);
     }
 
     @Override
index 47d07c0892f4e73f5c03dd6a8bb617862aaedd8f..7db3a228c8023f16f9ab975c26f4c76effe7e44c 100644 (file)
@@ -15,23 +15,52 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 
 abstract class ShardDataTreeMetadata<T extends ShardDataTreeSnapshotMetadata<T>> {
+    /**
+     * Apply a recovered metadata snapshot.
+     *
+     * @param snapshot Metadata snapshot
+     */
     final void applySnapshot(@Nonnull final ShardDataTreeSnapshotMetadata<?> snapshot) {
         Verify.verify(getSupportedType().isInstance(snapshot), "Snapshot %s misrouted to handler of %s", snapshot,
             getSupportedType());
         doApplySnapshot(getSupportedType().cast(snapshot));
     }
 
+    /**
+     * Reset metadata to empty state.
+     */
     abstract void reset();
 
+    /**
+     * Apply a recovered metadata snapshot. This is not a public entrypoint, just an interface between the base class
+     * and its subclasses.
+     *
+     * @param snapshot Metadata snapshot
+     */
     abstract void doApplySnapshot(@Nonnull T snapshot);
 
+    /**
+     * Return the type of metadata snapshot this object supports.
+     *
+     * @return Metadata type
+     */
     abstract @Nonnull Class<T> getSupportedType();
 
+    /**
+     * Take a snapshot of current metadata state.
+     *
+     * @return Metadata snapshot, or null if the metadata is empty.
+     */
     abstract @Nullable T toSnapshot();
 
     // Lifecycle events
+
+    abstract void onTransactionAborted(TransactionIdentifier txId);
+
     abstract void onTransactionCommitted(TransactionIdentifier txId);
 
+    abstract void onTransactionPurged(TransactionIdentifier txId);
+
     abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
 
     abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
index 2554151dd73c6e4a0442169474530f21d0bcd586..dfd6680045e2d997744e903861dea7c6bc8fb24f 100644 (file)
@@ -55,7 +55,7 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
         final DataTreeSnapshot snapshot = getSnapshot();
         LOG.debug("Allocated read-only transaction {} snapshot {}", txId, snapshot);
 
-        return new ReadOnlyShardDataTreeTransaction(txId, snapshot);
+        return new ReadOnlyShardDataTreeTransaction(this, txId, snapshot);
     }
 
     ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
@@ -72,17 +72,24 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
     }
 
     @Override
-    protected void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
+    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
         if (transaction instanceof ReadWriteShardDataTreeTransaction) {
             Preconditions.checkState(openTransaction != null,
                     "Attempted to abort transaction %s while none is outstanding", transaction);
-            LOG.debug("Aborted transaction {}", transaction);
+            LOG.debug("Aborted open transaction {}", transaction);
             openTransaction = null;
         }
+
+        dataTree.abortTransaction(transaction, callback);
+    }
+
+    @Override
+    void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+        dataTree.purgeTransaction(id, callback);
     }
 
     @Override
-    protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
         Preconditions.checkState(openTransaction != null,
                 "Attempted to finish transaction %s while none is outstanding", transaction);
 
index 20f50156f8fdd8ef9ea16c34393a586982adaca3..2b0b74d02c3b9209a7d166a894b3b15cc0953aba 100644 (file)
@@ -12,9 +12,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 
 abstract class ShardDataTreeTransactionParent {
 
-    abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
+    abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction, Runnable callback);
+
+    abstract void purgeTransaction(TransactionIdentifier id, Runnable callback);
 
     abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
 
     abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
+
 }
index e3fbb82ee805b610d0d02c782ff47f5b55da5c3f..b4fee8e9c859cb5a0a076806e1bf9f32f9aa38d3 100644 (file)
@@ -37,15 +37,16 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     private final ShardStats shardStats;
     private final TransactionIdentifier transactionId;
 
-    protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, TransactionIdentifier transactionId) {
+    protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
+            final TransactionIdentifier transactionId) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
         this.shardStats = shardStats;
         this.transactionId = Preconditions.checkNotNull(transactionId);
     }
 
-    public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction,
-            ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats) {
+    public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
+            final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
         return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
     }
 
@@ -60,7 +61,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     }
 
     @Override
-    public void handleReceive(Object message) {
+    public void handleReceive(final Object message) {
         if (CloseTransaction.isSerializedType(message)) {
             closeTransaction(true);
         } else if (message instanceof ReceiveTimeout) {
@@ -75,8 +76,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return true;
     }
 
-    private void closeTransaction(boolean sendReply) {
-        getDOMStoreTransaction().abort();
+    private void closeTransaction(final boolean sendReply) {
+        getDOMStoreTransaction().abort(null);
 
         if (sendReply && returnCloseTransactionReply()) {
             getSender().tell(new CloseTransactionReply(), getSelf());
@@ -85,7 +86,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 
-    private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
+    private boolean checkClosed(final AbstractShardDataTreeTransaction<?> transaction) {
         final boolean ret = transaction.isClosed();
         if (ret) {
             shardStats.incrementFailedReadTransactionsCount();
@@ -95,7 +96,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return ret;
     }
 
-    protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message) {
+    protected void readData(final AbstractShardDataTreeTransaction<?> transaction, final ReadData message) {
         if (checkClosed(transaction)) {
             return;
         }
@@ -106,7 +107,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         sender().tell(readDataReply.toSerializable(), self());
     }
 
-    protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message) {
+    protected void dataExists(final AbstractShardDataTreeTransaction<?> transaction, final DataExists message) {
         if (checkClosed(transaction)) {
             return;
         }
@@ -128,8 +129,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         final ShardStats shardStats;
         final TransactionType type;
 
-        ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction,
-                ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats) {
+        ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
+                final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
             this.transaction = Preconditions.checkNotNull(transaction);
             this.shardActor = shardActor;
             this.shardStats = shardStats;
index fe2588d5772289419664126f69fe0726902fbfbc..f08ff4a445b2adbdd2821e63311a6ba5b84728c7 100644 (file)
@@ -8,7 +8,12 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -25,13 +30,27 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory {
     private final LocalHistoryIdentifier identifier;
     private final ShardDataTree tree;
 
-    StandaloneFrontendHistory(final String persistenceId, final Ticker ticker, final ClientIdentifier clientId,
-            final ShardDataTree tree) {
-        super(persistenceId, ticker);
+    private StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId,
+            final ShardDataTree tree, final Map<UnsignedLong, Boolean> closedTransactions,
+            final RangeSet<UnsignedLong> purgedTransactions) {
+        super(persistenceId, tree, closedTransactions, purgedTransactions);
         this.identifier = new LocalHistoryIdentifier(clientId, 0);
         this.tree = Preconditions.checkNotNull(tree);
     }
 
+    static StandaloneFrontendHistory create(final String persistenceId, final ClientIdentifier clientId,
+            final ShardDataTree tree) {
+        return new StandaloneFrontendHistory(persistenceId, clientId, tree, ImmutableMap.of(),
+            TreeRangeSet.create());
+    }
+
+    static StandaloneFrontendHistory recreate(final String persistenceId, final ClientIdentifier clientId,
+            final ShardDataTree tree, final Map<UnsignedLong, Boolean> closedTransactions,
+            final RangeSet<UnsignedLong> purgedTransactions) {
+        return new StandaloneFrontendHistory(persistenceId, clientId, tree, new HashMap<>(closedTransactions),
+            purgedTransactions);
+    }
+
     @Override
     public LocalHistoryIdentifier getIdentifier() {
         return identifier;
index b553bf92a2739cbb1e27474fd775fd7692281718..751f5b21ba504ada3c0a5878df7aac9f2d7b7714 100644 (file)
@@ -7,11 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore.persisted;
 
+import com.google.common.base.Throwables;
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
 import java.io.DataInput;
 import java.io.IOException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Payload persisted when a transaction is aborted. It contains the transaction identifier.
@@ -45,15 +48,22 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload<T
         }
     }
 
+    private static final Logger LOG = LoggerFactory.getLogger(AbortTransactionPayload.class);
     private static final long serialVersionUID = 1L;
 
     AbortTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
         super(transactionId, serialized);
     }
 
-    public static AbortTransactionPayload create(final TransactionIdentifier transactionId) throws IOException {
+    public static AbortTransactionPayload create(final TransactionIdentifier transactionId) {
         final ByteArrayDataOutput out = ByteStreams.newDataOutput();
-        transactionId.writeTo(out);
+        try {
+            transactionId.writeTo(out);
+        } catch (IOException e) {
+            // This should never happen
+            LOG.error("Failed to serialize {}", transactionId, e);
+            throw Throwables.propagate(e);
+        }
         return new AbortTransactionPayload(transactionId, out.toByteArray());
     }
 
index 2a52eccb94e84c1035590004b9a051a3b57c15d2..5429fd19122f01ec574cd27fe7c5f7a30cfacae1 100644 (file)
@@ -8,24 +8,37 @@
 package org.opendaylight.controller.cluster.datastore.persisted;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import org.opendaylight.yangtools.concepts.WritableObject;
 import org.opendaylight.yangtools.concepts.WritableObjects;
 
 public final class FrontendHistoryMetadata implements WritableObject {
+    private final RangeSet<UnsignedLong> purgedTransactions;
+    private final Map<UnsignedLong, Boolean> closedTransactions;
     private final long historyId;
     private final long cookie;
-    private final long nextTransaction;
     private final boolean closed;
 
-    public FrontendHistoryMetadata(final long historyId, final long cookie, final long nextTransaction,
-            final boolean closed) {
+    public FrontendHistoryMetadata(final long historyId, final long cookie, final boolean closed,
+            final Map<UnsignedLong, Boolean> closedTransactions, final RangeSet<UnsignedLong> purgedTransactions) {
         this.historyId = historyId;
         this.cookie = cookie;
-        this.nextTransaction = nextTransaction;
         this.closed = closed;
+        this.closedTransactions = ImmutableMap.copyOf(closedTransactions);
+        this.purgedTransactions = ImmutableRangeSet.copyOf(purgedTransactions);
     }
 
     public long getHistoryId() {
@@ -36,34 +49,70 @@ public final class FrontendHistoryMetadata implements WritableObject {
         return cookie;
     }
 
-    public long getNextTransaction() {
-        return nextTransaction;
-    }
-
     public boolean isClosed() {
         return closed;
     }
 
+    public Map<UnsignedLong, Boolean> getClosedTransactions() {
+        return closedTransactions;
+    }
+
+    public RangeSet<UnsignedLong> getPurgedTransactions() {
+        return purgedTransactions;
+    }
+
     @Override
     public void writeTo(final DataOutput out) throws IOException {
         WritableObjects.writeLongs(out, historyId, cookie);
-        WritableObjects.writeLong(out, nextTransaction);
         out.writeBoolean(closed);
+
+        final Set<Range<UnsignedLong>> purgedRanges = purgedTransactions.asRanges();
+        WritableObjects.writeLongs(out, closedTransactions.size(), purgedRanges.size());
+        for (Entry<UnsignedLong, Boolean> e : closedTransactions.entrySet()) {
+            WritableObjects.writeLong(out, e.getKey().longValue());
+            out.writeBoolean(e.getValue().booleanValue());
+        }
+        for (Range<UnsignedLong> r : purgedRanges) {
+            WritableObjects.writeLongs(out, r.lowerEndpoint().longValue(), r.upperEndpoint().longValue());
+        }
     }
 
     public static FrontendHistoryMetadata readFrom(final DataInput in) throws IOException {
-        final byte header = WritableObjects.readLongHeader(in);
+        byte header = WritableObjects.readLongHeader(in);
         final long historyId = WritableObjects.readFirstLong(in, header);
         final long cookie = WritableObjects.readSecondLong(in, header);
-        final long nextTransaction = WritableObjects.readLong(in);
         final boolean closed = in.readBoolean();
 
-        return new FrontendHistoryMetadata(historyId, cookie, nextTransaction, closed);
+        header = WritableObjects.readLongHeader(in);
+        long ls = WritableObjects.readFirstLong(in, header);
+        Verify.verify(ls >= 0 && ls <= Integer.MAX_VALUE);
+        final int csize = (int) ls;
+
+        ls = WritableObjects.readSecondLong(in, header);
+        Verify.verify(ls >= 0 && ls <= Integer.MAX_VALUE);
+        final int psize = (int) ls;
+
+        final Map<UnsignedLong, Boolean> closedTransactions = new HashMap<>(csize);
+        for (int i = 0; i < csize; ++i) {
+            final UnsignedLong key = UnsignedLong.fromLongBits(WritableObjects.readLong(in));
+            final Boolean value = Boolean.valueOf(in.readBoolean());
+            closedTransactions.put(key, value);
+        }
+        final RangeSet<UnsignedLong> purgedTransactions = TreeRangeSet.create();
+        for (int i = 0; i < psize; ++i) {
+            final byte h = WritableObjects.readLongHeader(in);
+            final UnsignedLong l = UnsignedLong.fromLongBits(WritableObjects.readFirstLong(in, h));
+            final UnsignedLong u = UnsignedLong.fromLongBits(WritableObjects.readSecondLong(in, h));
+            purgedTransactions.add(Range.closed(l, u));
+        }
+
+        return new FrontendHistoryMetadata(historyId, cookie, closed, closedTransactions, purgedTransactions);
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(FrontendHistoryMetadata.class).add("historiId", historyId)
-                .add("cookie", cookie).add("nextTransaction", nextTransaction).add("closed", closed).toString();
+                .add("cookie", cookie).add("closed", closed).add("closedTransactions", closedTransactions)
+                .add("purgedTransactions", purgedTransactions).toString();
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java
new file mode 100644 (file)
index 0000000..71e794b
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a transaction is purged from the frontend. It contains the transaction identifier.
+ *
+ * @author Robert Varga
+ */
+public final class PurgeTransactionPayload extends AbstractIdentifiablePayload<TransactionIdentifier> {
+    private static final class Proxy extends AbstractProxy<TransactionIdentifier> {
+        private static final long serialVersionUID = 1L;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final byte[] serialized) {
+            super(serialized);
+        }
+
+        @Override
+        protected TransactionIdentifier readIdentifier(final DataInput in) throws IOException {
+            return TransactionIdentifier.readFrom(in);
+        }
+
+        @Override
+        protected PurgeTransactionPayload createObject(final TransactionIdentifier identifier,
+                final byte[] serialized) {
+            return new PurgeTransactionPayload(identifier, serialized);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(PurgeTransactionPayload.class);
+    private static final long serialVersionUID = 1L;
+
+    PurgeTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
+        super(transactionId, serialized);
+    }
+
+    public static PurgeTransactionPayload create(final TransactionIdentifier transactionId) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        try {
+            transactionId.writeTo(out);
+        } catch (IOException e) {
+            // This should never happen
+            LOG.error("Failed to serialize {}", transactionId, e);
+            throw Throwables.propagate(e);
+        }
+        return new PurgeTransactionPayload(transactionId, out.toByteArray());
+    }
+
+    @Override
+    protected Proxy externalizableProxy(final byte[] serialized) {
+        return new Proxy(serialized);
+    }
+}
index 6ccede2776bc6a58b8857e79c17c747c3e6745e6..00087e37dfeefafc9d1852d9934ea9c34a39ce5d 100644 (file)
@@ -70,7 +70,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort();
+        subject.underlyingActor().getDOMStoreTransaction().abort(null);
 
         future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
                 DataStoreVersions.CURRENT_VERSION), 3000);
@@ -92,7 +92,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort();
+        subject.underlyingActor().getDOMStoreTransaction().abort(null);
 
         future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
                 DataStoreVersions.CURRENT_VERSION), 3000);
@@ -113,7 +113,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort();
+        subject.underlyingActor().getDOMStoreTransaction().abort(null);
 
         future = akka.pattern.Patterns.ask(subject,
                 new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
index aa2a9dbe1096d3c2e3ea7495da3aa5bd71f94851..43b2c9eaa99725d932178d52c84e84b914fbbde2 100644 (file)
@@ -9,14 +9,13 @@ package org.opendaylight.controller.cluster.datastore.persisted;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.IOException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractTest;
 
 public class AbortTransactionPayloadTest extends AbstractTest {
     @Test
-    public void testPayloadSerDes() throws IOException {
+    public void testPayloadSerDes() {
         final AbortTransactionPayload template = AbortTransactionPayload.create(nextTransactionId());
         final AbortTransactionPayload cloned = SerializationUtils.clone(template);
         assertEquals(template.getIdentifier(), cloned.getIdentifier());
index e175a09eb41ff50f0b5efeba613d94f1afdd96a0..f8734850a5f4528286d653103f720c2f7eaab1df 100644 (file)
@@ -12,6 +12,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 import com.google.common.collect.TreeRangeSet;
@@ -90,7 +91,7 @@ public class FrontendShardDataTreeSnapshotMetadataTest {
     }
 
     private static FrontendShardDataTreeSnapshotMetadata createMetadataSnapshot(final int size) {
-        final List<FrontendClientMetadata> clients = new ArrayList<>();
+        final List<FrontendClientMetadata> clients = new ArrayList<>(size);
         for (long i = 0; i < size; i++) {
             clients.add(createFrontedClientMetadata(i));
         }
@@ -107,8 +108,9 @@ public class FrontendShardDataTreeSnapshotMetadataTest {
         final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
         purgedHistories.add(Range.closed(UnsignedLong.ZERO, UnsignedLong.ONE));
 
-        final Collection<FrontendHistoryMetadata> currentHistories = Collections
-                .singleton(new FrontendHistoryMetadata(num, num, num, true));
+        final Collection<FrontendHistoryMetadata> currentHistories = Collections.singleton(
+            new FrontendHistoryMetadata(num, num, true, ImmutableMap.of(UnsignedLong.ZERO, Boolean.TRUE),
+                purgedHistories));
 
         return new FrontendClientMetadata(clientIdentifier, purgedHistories, currentHistories);
     }