BUG-5280: add frontend state lifecycle 65/49265/36
authorRobert Varga <rovarga@cisco.com>
Mon, 12 Dec 2016 18:34:38 +0000 (19:34 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 20 Mar 2017 05:11:53 +0000 (05:11 +0000)
When transitioning between roles we need to take care of proper
handling of state known about frontend. This patch adds
the leader/non-leader transitions, creating the state
from FrontendMetadata and forgetting it when transactions are
committed.

Our replicated log needs to grow more entries to accurately
replicate the state of the conversation between the frontend
and backend, so if a member becomes the leader it has
an understanding of which transactions and transaction
chains have been completed (aborted, committed, purged). These
are replicated before a response is sent to the frontend, so
if a leader before they replicate successfully, the frontend
will see them as a timeout and retry them (and be routed to the
new leader).

Both leader and followers are expected to keep the metadata
handy: the leader keeps for the purpose of being able to generate
a summarized snapshot. The followers keep it so their metadata
view is consistent with the contents of the data tree.

Change-Id: I72eea91ee84716cdd8a6a3521b42cca9a9393aff
Signed-off-by: Robert Varga <rovarga@cisco.com>
31 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ClosedTransactionException.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java
opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/ConnectClientRequestTest.java
opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryExceptionTest.java
opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionExceptionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendHistoryMetadataBuilder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/FrontendHistoryMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/FrontendShardDataTreeSnapshotMetadataTest.java

diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ClosedTransactionException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ClosedTransactionException.java
new file mode 100644 (file)
index 0000000..ece4720
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received a request for a transaction which has already
+ * been closed, either via a successful commit or abort (which is indicated via {@link #isSuccessful()}. This can
+ * happen if the corresponding journal record is replicated, but the message to the frontend gets lost and the backed
+ * leader moved before the frontend retried the corresponding request.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ClosedTransactionException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    private final boolean successful;
+
+    public ClosedTransactionException(final boolean successful) {
+        super("Transaction has been " + (successful ? "committed" : "aborted"));
+        this.successful = successful;
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return false;
+    }
+
+    public boolean isSuccessful() {
+        return successful;
+    }
+}
index 562c0e5..fee4399 100644 (file)
@@ -8,6 +8,9 @@
 package org.opendaylight.controller.cluster.access.commands;
 
 import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.RangeSet;
+import com.google.common.primitives.UnsignedLong;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 
 /**
@@ -20,12 +23,19 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 public final class DeadTransactionException extends RequestException {
     private static final long serialVersionUID = 1L;
 
-    public DeadTransactionException(final long lastSeenTransaction) {
-        super("Transaction up to " + Long.toUnsignedString(lastSeenTransaction) + " are accounted for");
+    private final RangeSet<UnsignedLong> purgedIdentifiers;
+
+    public DeadTransactionException(final RangeSet<UnsignedLong> purgedIdentifiers) {
+        super("Transactions " + purgedIdentifiers + " have been purged");
+        this.purgedIdentifiers = ImmutableRangeSet.copyOf(purgedIdentifiers);
     }
 
     @Override
     public boolean isRetriable() {
-        return true;
+        return false;
+    }
+
+    public RangeSet<UnsignedLong> getPurgedIdentifier() {
+        return purgedIdentifiers;
     }
 }
index b4da36f..02832ae 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.access.commands;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableRangeSet;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.ABIVersion;
@@ -46,7 +47,7 @@ public class ConnectClientRequestTest extends AbstractRequestTest<ConnectClientR
 
     @Test
     public void toRequestFailureTest() throws Exception {
-        final RequestException exception = new DeadTransactionException(0);
+        final RequestException exception = new DeadTransactionException(ImmutableRangeSet.of());
         final ConnectClientFailure failure = OBJECT.toRequestFailure(exception);
         Assert.assertNotNull(failure);
     }
index 595ef19..f4d88ec 100644 (file)
@@ -7,8 +7,9 @@
  */
 package org.opendaylight.controller.cluster.access.commands;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 import org.opendaylight.controller.cluster.access.concepts.RequestExceptionTest;
 
@@ -18,13 +19,13 @@ public class DeadHistoryExceptionTest extends RequestExceptionTest<DeadHistoryEx
 
     @Override
     protected void isRetriable() {
-        assertTrue(OBJECT.isRetriable());
+        assertFalse(OBJECT.isRetriable());
     }
 
     @Override
     protected void checkMessage() {
         final String message = OBJECT.getMessage();
-        assertTrue("Histories up to 100 are accounted for".equals(message));
+        assertEquals("Histories up to 100 are accounted for", message);
         assertNull(OBJECT.getCause());
     }
 
index a610330..8b6e52c 100644 (file)
@@ -7,25 +7,27 @@
  */
 package org.opendaylight.controller.cluster.access.commands;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.ImmutableRangeSet;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestExceptionTest;
 
 public class DeadTransactionExceptionTest extends RequestExceptionTest<DeadTransactionException> {
 
-    private static final RequestException OBJECT = new DeadTransactionException(100);
+    private static final RequestException OBJECT = new DeadTransactionException(ImmutableRangeSet.of());
 
     @Override
     protected void isRetriable() {
-        assertTrue(OBJECT.isRetriable());
+        assertFalse(OBJECT.isRetriable());
     }
 
     @Override
     protected void checkMessage() {
         final String message = OBJECT.getMessage();
-        assertTrue("Transaction up to 100 are accounted for".equals(message));
+        assertEquals("Transactions [] have been purged", message);
         assertNull(OBJECT.getCause());
     }
 
index dbea3a1..f23a8ff 100644 (file)
@@ -9,13 +9,19 @@ package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
+import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
@@ -41,12 +47,22 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
     private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0);
 
     private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
+    private final RangeSet<UnsignedLong> purgedTransactions;
     private final String persistenceId;
-    private final Ticker ticker;
+    private final ShardDataTree tree;
 
-    AbstractFrontendHistory(final String persistenceId, final Ticker ticker) {
+    /**
+     * Transactions closed by the previous leader. Boolean indicates whether the transaction was committed (true) or
+     * aborted (false). We only ever shrink these.
+     */
+    private Map<UnsignedLong, Boolean> closedTransactions;
+
+    AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree,
+        final Map<UnsignedLong, Boolean> closedTransactions, final RangeSet<UnsignedLong> purgedTransactions) {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
-        this.ticker = Preconditions.checkNotNull(ticker);
+        this.tree = Preconditions.checkNotNull(tree);
+        this.closedTransactions = Preconditions.checkNotNull(closedTransactions);
+        this.purgedTransactions = Preconditions.checkNotNull(purgedTransactions);
     }
 
     final String persistenceId() {
@@ -54,45 +70,104 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
     }
 
     final long readTime() {
-        return ticker.read();
+        return tree.ticker().read();
     }
 
     final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         final TransactionIdentifier id = request.getTarget();
+        final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId());
 
-        FrontendTransaction tx;
         if (request instanceof TransactionPurgeRequest) {
-            tx = transactions.remove(id);
+            if (purgedTransactions.contains(ul)) {
+                // Retransmitted purge request: nothing to do
+                LOG.debug("{}: transaction {} already purged", persistenceId, id);
+                return new TransactionPurgeResponse(id, request.getSequence());
+            }
+
+            // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it
+            // to an ImmutableMap, which does not allow remove().
+            if (closedTransactions.containsKey(ul)) {
+                tree.purgeTransaction(id, () -> {
+                    closedTransactions.remove(ul);
+                    if (closedTransactions.isEmpty()) {
+                        closedTransactions = ImmutableMap.of();
+                    }
+
+                    purgedTransactions.add(Range.singleton(ul));
+                    LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id);
+                    envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
+                });
+                return null;
+            }
+
+            final FrontendTransaction tx = transactions.get(id);
             if (tx == null) {
-                // We have no record of the transaction, nothing to do
-                LOG.debug("{}: no state for transaction {}, purge is complete", persistenceId(), id);
+                // This should never happen because the purge callback removes the transaction and puts it into
+                // purged transactions in one go. If it does, we warn about the situation and
+                LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId,
+                    id, purgedTransactions);
+                purgedTransactions.add(Range.singleton(ul));
                 return new TransactionPurgeResponse(id, request.getSequence());
             }
+
+            tx.purge(() -> {
+                purgedTransactions.add(Range.singleton(ul));
+                transactions.remove(id);
+                LOG.debug("{}: finished purging transaction {}", persistenceId(), id);
+                envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
+            });
+            return null;
+        }
+
+        if (purgedTransactions.contains(ul)) {
+            LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions);
+            throw new DeadTransactionException(purgedTransactions);
+        }
+        final Boolean closed = closedTransactions.get(ul);
+        if (closed != null) {
+            final boolean successful = closed.booleanValue();
+            LOG.debug("{}: Request {} refers to a {} transaction", persistenceId, request, successful ? "successful"
+                    : "failed");
+            throw new ClosedTransactionException(successful);
+        }
+
+        FrontendTransaction tx = transactions.get(id);
+        if (tx == null) {
+            // The transaction does not exist and we are about to create it, check sequence number
+            if (request.getSequence() != 0) {
+                LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
+                throw UNSEQUENCED_START;
+            }
+
+            tx = createTransaction(request, id);
+            transactions.put(id, tx);
         } else {
-            tx = transactions.get(id);
-            if (tx == null) {
-                // The transaction does not exist and we are about to create it, check sequence number
-                if (request.getSequence() != 0) {
-                    LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
-                    throw UNSEQUENCED_START;
-                }
-
-                tx = createTransaction(request, id);
-                transactions.put(id, tx);
-            } else {
-                final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
-                if (maybeReplay.isPresent()) {
-                    final TransactionSuccess<?> replay = maybeReplay.get();
-                    LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
-                    return replay;
-                }
+            final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
+            if (maybeReplay.isPresent()) {
+                final TransactionSuccess<?> replay = maybeReplay.get();
+                LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
+                return replay;
             }
         }
 
         return tx.handleRequest(request, envelope, now);
     }
 
+    void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
+        LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
+        tree.closeTransactionChain(getIdentifier(), () -> {
+            envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
+        });
+    }
+
+    void purge(final long sequence, final RequestEnvelope envelope, final long now) {
+        LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
+        tree.purgeTransactionChain(getIdentifier(), () -> {
+            envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
+        });
+    }
+
     private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
             throws RequestException {
         if (request instanceof CommitLocalTransactionRequest) {
index 222280c..56e11c1 100644 (file)
@@ -13,6 +13,8 @@ import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract base for transactions running on SharrdDataTree.
@@ -22,12 +24,17 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 @NotThreadSafe
 abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
         implements Identifiable<TransactionIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractShardDataTreeTransaction.class);
+
+    private final ShardDataTreeTransactionParent parent;
     private final TransactionIdentifier id;
     private final T snapshot;
 
     private boolean closed;
 
-    AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) {
+    AbstractShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id,
+        final T snapshot) {
+        this.parent = Preconditions.checkNotNull(parent);
         this.snapshot = Preconditions.checkNotNull(snapshot);
         this.id = Preconditions.checkNotNull(id);
     }
@@ -37,6 +44,10 @@ abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
         return id;
     }
 
+    final ShardDataTreeTransactionParent getParent() {
+        return parent;
+    }
+
     final T getSnapshot() {
         return snapshot;
     }
@@ -59,11 +70,21 @@ abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
         return true;
     }
 
+    final void abort(final Runnable callback) {
+        Preconditions.checkState(close(), "Transaction is already closed");
+        parent.abortTransaction(this, callback);
+    }
+
+    final void purge(final Runnable callback) {
+        if (!closed) {
+            LOG.warn("Purging unclosed transaction {}", id);
+        }
+        parent.purgeTransaction(id, callback);
+    }
+
     @Override
     public final String toString() {
         return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot)
                 .toString();
     }
-
-    abstract void abort();
 }
index cd1235b..8a0ce60 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
@@ -15,6 +16,8 @@ import com.google.common.collect.TreeRangeSet;
 import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -22,8 +25,13 @@ import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMet
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
 import org.opendaylight.yangtools.concepts.Builder;
 import org.opendaylight.yangtools.concepts.Identifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+@NotThreadSafe
 final class FrontendClientMetadataBuilder implements Builder<FrontendClientMetadata>, Identifiable<ClientIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class);
+
     private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
     private final RangeSet<UnsignedLong> purgedHistories;
     private final ClientIdentifier identifier;
@@ -55,25 +63,100 @@ final class FrontendClientMetadataBuilder implements Builder<FrontendClientMetad
     }
 
     void onHistoryCreated(final LocalHistoryIdentifier historyId) {
-        // TODO Auto-generated method stub
-
+        final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId);
+        final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
+        if (oldMeta != null) {
+            // This should not be happening, warn about it
+            LOG.warn("Reused local history {}", historyId);
+        } else {
+            LOG.debug("Created local history {}", historyId);
+        }
     }
 
     void onHistoryClosed(final LocalHistoryIdentifier historyId) {
-        ensureHistory(historyId).onHistoryClosed();
+        final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId);
+        if (builder != null) {
+            builder.onHistoryClosed();
+            LOG.debug("Closed history {}", historyId);
+        } else {
+            LOG.warn("Closed unknown history {}, ignoring", historyId);
+        }
     }
 
     void onHistoryPurged(final LocalHistoryIdentifier historyId) {
-        currentHistories.remove(historyId);
+        final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId);
+        if (history == null) {
+            LOG.warn("Purging unknown history {}", historyId);
+        }
+
         // XXX: do we need to account for cookies?
         purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(historyId.getHistoryId())));
+        LOG.debug("Purged history {}", historyId);
+    }
+
+    void onTransactionAborted(final TransactionIdentifier txId) {
+        final FrontendHistoryMetadataBuilder history = getHistory(txId);
+        if (history != null) {
+            history.onTransactionAborted(txId);
+            LOG.debug("Committed transaction {}", txId);
+        } else {
+            LOG.warn("Unknown history for aborted transaction {}, ignoring", txId);
+        }
     }
 
     void onTransactionCommitted(final TransactionIdentifier txId) {
-        ensureHistory(txId.getHistoryId()).onTransactionCommitted(txId);
+        final FrontendHistoryMetadataBuilder history = getHistory(txId);
+        if (history != null) {
+            history.onTransactionCommitted(txId);
+            LOG.debug("Aborted transaction {}", txId);
+        } else {
+            LOG.warn("Unknown history for commited transaction {}, ignoring", txId);
+        }
+    }
+
+    void onTransactionPurged(final TransactionIdentifier txId) {
+        final FrontendHistoryMetadataBuilder history = getHistory(txId);
+        if (history != null) {
+            history.onTransactionPurged(txId);
+            LOG.debug("Purged transaction {}", txId);
+        } else {
+            LOG.warn("Unknown history for purged transaction {}, ignoring", txId);
+        }
+    }
+
+    /**
+     * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
+     *
+     * @param shard parent shard
+     * @return Leader frontend state
+     */
+    @Nonnull LeaderFrontendState toLeaderState(@Nonnull final Shard shard) {
+        // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
+        //       interactions would get intertwined leading to inconsistencies.
+        final Map<LocalHistoryIdentifier, LocalFrontendHistory> histories = new HashMap<>();
+        for (FrontendHistoryMetadataBuilder e : currentHistories.values()) {
+            if (e.getIdentifier().getHistoryId() != 0) {
+                final AbstractFrontendHistory state = e.toLeaderState(shard);
+                Verify.verify(state instanceof LocalFrontendHistory);
+                histories.put(e.getIdentifier(), (LocalFrontendHistory) state);
+            }
+        }
+
+        final AbstractFrontendHistory singleHistory;
+        final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get(
+            new LocalHistoryIdentifier(identifier, 0));
+        if (singleHistoryMeta == null) {
+            final ShardDataTree tree = shard.getDataStore();
+            singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree);
+        } else {
+            singleHistory = singleHistoryMeta.toLeaderState(shard);
+        }
+
+        return new LeaderFrontendState(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
+            TreeRangeSet.create(purgedHistories), singleHistory, histories);
     }
 
-    private FrontendHistoryMetadataBuilder ensureHistory(final LocalHistoryIdentifier historyId) {
-        return currentHistories.computeIfAbsent(historyId, FrontendHistoryMetadataBuilder::new);
+    private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
+        return currentHistories.get(txId.getHistoryId());
     }
 }
index 6b901b4..beed765 100644 (file)
@@ -8,6 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -17,18 +24,23 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 
 final class FrontendHistoryMetadataBuilder implements Builder<FrontendHistoryMetadata>,
         Identifiable<LocalHistoryIdentifier> {
+
+    private final Map<UnsignedLong, Boolean> closedTransactions;
+    private final RangeSet<UnsignedLong> purgedTransactions;
     private final LocalHistoryIdentifier identifier;
 
-    private long nextTransaction;
     private boolean closed;
 
     FrontendHistoryMetadataBuilder(final LocalHistoryIdentifier identifier) {
         this.identifier = Preconditions.checkNotNull(identifier);
+        this.purgedTransactions = TreeRangeSet.create();
+        this.closedTransactions = new HashMap<>(2);
     }
 
     FrontendHistoryMetadataBuilder(final ClientIdentifier clientId, final FrontendHistoryMetadata meta) {
         identifier = new LocalHistoryIdentifier(clientId, meta.getHistoryId(), meta.getCookie());
-        nextTransaction = meta.getNextTransaction();
+        closedTransactions = new HashMap<>(meta.getClosedTransactions());
+        purgedTransactions = TreeRangeSet.create(meta.getPurgedTransactions());
         closed = meta.isClosed();
     }
 
@@ -39,14 +51,42 @@ final class FrontendHistoryMetadataBuilder implements Builder<FrontendHistoryMet
 
     @Override
     public FrontendHistoryMetadata build() {
-        return new FrontendHistoryMetadata(identifier.getHistoryId(), identifier.getCookie(), nextTransaction, closed);
+        return new FrontendHistoryMetadata(identifier.getHistoryId(), identifier.getCookie(), closed,
+            closedTransactions, purgedTransactions);
     }
 
     void onHistoryClosed() {
+        Preconditions.checkState(identifier.getHistoryId() != 0);
         closed = true;
     }
 
+    void onTransactionAborted(final TransactionIdentifier txId) {
+        closedTransactions.put(UnsignedLong.fromLongBits(txId.getTransactionId()), Boolean.FALSE);
+    }
+
     void onTransactionCommitted(final TransactionIdentifier txId) {
-        nextTransaction = txId.getTransactionId() + 1;
+        closedTransactions.put(UnsignedLong.fromLongBits(txId.getTransactionId()), Boolean.TRUE);
+    }
+
+    void onTransactionPurged(final TransactionIdentifier txId) {
+        final UnsignedLong id = UnsignedLong.fromLongBits(txId.getTransactionId());
+        closedTransactions.remove(id);
+        purgedTransactions.add(Range.singleton(id));
+    }
+
+    /**
+     * Transform frontend metadata for a particular client history into its {@link LocalFrontendHistory} counterpart.
+     *
+     * @param shard parent shard
+     * @return Leader history state
+     */
+    @Nonnull AbstractFrontendHistory toLeaderState(@Nonnull final Shard shard) {
+        if (identifier.getHistoryId() == 0) {
+            return StandaloneFrontendHistory.recreate(shard.persistenceId(), identifier.getClientId(),
+                shard.getDataStore(), closedTransactions, purgedTransactions);
+        }
+
+        return LocalFrontendHistory.recreate(shard.persistenceId(), shard.getDataStore(),
+            shard.getDataStore().recreateTransactionChain(identifier, closed), closedTransactions, purgedTransactions);
     }
 }
index 1788f99..5f86125 100644 (file)
@@ -7,9 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
@@ -32,6 +35,11 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
     private static final Logger LOG = LoggerFactory.getLogger(FrontendMetadata.class);
 
     private final Map<FrontendIdentifier, FrontendClientMetadataBuilder> clients = new HashMap<>();
+    private final String shardName;
+
+    FrontendMetadata(final String shardName) {
+        this.shardName = Preconditions.checkNotNull(shardName);
+    }
 
     @Override
     Class<FrontendShardDataTreeSnapshotMetadata> getSupportedType() {
@@ -67,9 +75,9 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
         final FrontendClientMetadataBuilder client = new FrontendClientMetadataBuilder(id);
         final FrontendClientMetadataBuilder previous = clients.put(id.getFrontendId(), client);
         if (previous != null) {
-            LOG.debug("Replaced client {} with {}", previous, client);
+            LOG.debug("{}: Replaced client {} with {}", shardName, previous, client);
         } else {
-            LOG.debug("Added client {}", client);
+            LOG.debug("{}: Added client {}", shardName, client);
         }
         return client;
     }
@@ -89,8 +97,27 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
         ensureClient(historyId.getClientId()).onHistoryPurged(historyId);
     }
 
+    @Override
+    void onTransactionAborted(final TransactionIdentifier txId) {
+        ensureClient(txId.getHistoryId().getClientId()).onTransactionAborted(txId);
+    }
+
     @Override
     void onTransactionCommitted(final TransactionIdentifier txId) {
         ensureClient(txId.getHistoryId().getClientId()).onTransactionCommitted(txId);
     }
+
+    @Override
+    void onTransactionPurged(final TransactionIdentifier txId) {
+        ensureClient(txId.getHistoryId().getClientId()).onTransactionPurged(txId);
+    }
+
+    /**
+     * Transform frontend metadata into an active leader state map.
+     *
+     * @return Leader frontend state
+     */
+    @Nonnull Map<FrontendIdentifier, LeaderFrontendState> toLeaderState(@Nonnull final Shard shard) {
+        return new HashMap<>(Maps.transformValues(clients, meta -> meta.toLeaderState(shard)));
+    }
 }
index 465cfcb..5c0d913 100644 (file)
@@ -17,8 +17,6 @@ import org.opendaylight.controller.cluster.access.commands.ReadTransactionReques
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
@@ -55,19 +53,22 @@ final class FrontendReadOnlyTransaction extends FrontendTransaction {
         } else if (request instanceof ReadTransactionRequest) {
             return handleReadTransaction((ReadTransactionRequest) request);
         } else if (request instanceof TransactionAbortRequest) {
-            return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
-        } else if (request instanceof TransactionPurgeRequest) {
-            // No-op for now
-            return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
+            handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+            return null;
         } else {
             throw new UnsupportedRequestException(request);
         }
     }
 
-    private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        openTransaction.abort();
-        return new TransactionAbortSuccess(openTransaction.getIdentifier(), request.getSequence());
+    @Override
+    void purge(final Runnable callback) {
+        openTransaction.purge(callback);
+    }
+
+    private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
+        openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(request.getTarget(),
+            request.getSequence())));
     }
 
     private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
index 2c2a287..9c4d4dd 100644 (file)
@@ -32,8 +32,6 @@ import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
@@ -103,15 +101,18 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionAbortRequest) {
-            return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
-        } else if (request instanceof TransactionPurgeRequest) {
-            // No-op for now
-            return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
+            handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+            return null;
         } else {
             throw new UnsupportedRequestException(request);
         }
     }
 
+    @Override
+    void purge(final Runnable callback) {
+        openTransaction.purge(callback);
+    }
+
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@@ -145,11 +146,12 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         });
     }
 
-    private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
+    private void handleTransactionAbort(final TransactionAbortRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         if (readyCohort == null) {
-            openTransaction.abort();
-            return new TransactionAbortSuccess(getIdentifier(), request.getSequence());
+            openTransaction.abort(() -> recordAndSendSuccess(envelope, now,
+                new TransactionAbortSuccess(getIdentifier(), request.getSequence())));
+            return;
         }
 
         readyCohort.abort(new FutureCallback<Void>() {
@@ -168,7 +170,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
             }
         });
-        return null;
     }
 
     private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
@@ -298,9 +299,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
         switch (maybeProto.get()) {
             case ABORT:
-                openTransaction.abort();
+                openTransaction.abort(() -> replyModifySuccess(request.getSequence()));
                 openTransaction = null;
-                return replyModifySuccess(request.getSequence());
+                return null;
             case READY:
                 ensureReady();
                 return replyModifySuccess(request.getSequence());
index 1ef775e..fccf3bf 100644 (file)
@@ -112,6 +112,9 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
     abstract @Nullable TransactionSuccess<?> handleRequest(TransactionRequest<?> request,
             RequestEnvelope envelope, long now) throws RequestException;
 
+    // Final request, needs routing to the data tree, so it can persist a tombstone
+    abstract void purge(Runnable callback);
+
     private void recordResponse(final long sequence, final Object response) {
         if (replayQueue.isEmpty()) {
             firstReplaySequence = sequence;
@@ -148,4 +151,5 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
                 .add("lastPurgedSequence", lastPurgedSequence)
                 .toString();
     }
+
 }
index deca605..4092685 100644 (file)
@@ -47,10 +47,10 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
 
     // Histories which have not been purged
-    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories = new HashMap<>();
+    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
 
     // RangeSet performs automatic merging, hence we keep minimal state tracking information
-    private final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
+    private final RangeSet<UnsignedLong> purgedHistories;
 
     // Used for all standalone transactions
     private final AbstractFrontendHistory standaloneHistory;
@@ -61,7 +61,6 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private long expectedTxSequence;
     private Long lastSeenHistory = null;
 
-
     // TODO: explicit failover notification
     //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
     //       to the frontend client -- that way it can immediately start sending requests
@@ -72,10 +71,19 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     // - per-RequestException throw counters
 
     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+        this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId,
+            clientId, tree), new HashMap<>());
+    }
+
+    LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
+        final RangeSet<UnsignedLong> purgedHistories, final AbstractFrontendHistory standaloneHistory,
+        final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
         this.clientId = Preconditions.checkNotNull(clientId);
         this.tree = Preconditions.checkNotNull(tree);
-        standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree);
+        this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
+        this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
+        this.localHistories = Preconditions.checkNotNull(localHistories);
     }
 
     @Override
@@ -133,7 +141,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             lastSeenHistory = id.getHistoryId();
         }
 
-        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(id)));
+        localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id));
         LOG.debug("{}: created history {}", persistenceId, id);
         return new LocalHistorySuccess(id, request.getSequence());
     }
index 8e32c76..94c0965 100644 (file)
@@ -8,15 +8,16 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
-import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -25,20 +26,28 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 final class LocalFrontendHistory extends AbstractFrontendHistory {
-    private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class);
-
     private final ShardDataTreeTransactionChain chain;
-    private final ShardDataTree tree;
-
-    private Long lastSeenTransaction;
 
-    LocalFrontendHistory(final String persistenceId, final ShardDataTree tree,
-            final ShardDataTreeTransactionChain chain) {
-        super(persistenceId, tree.ticker());
-        this.tree = Preconditions.checkNotNull(tree);
+    private LocalFrontendHistory(final String persistenceId, final ShardDataTree tree,
+            final ShardDataTreeTransactionChain chain, final Map<UnsignedLong, Boolean> closedTransactions,
+            final RangeSet<UnsignedLong> purgedTransactions) {
+        super(persistenceId, tree, closedTransactions, purgedTransactions);
         this.chain = Preconditions.checkNotNull(chain);
     }
 
+    static LocalFrontendHistory create(final String persistenceId, final ShardDataTree tree,
+            final LocalHistoryIdentifier historyId) {
+        return new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(historyId), ImmutableMap.of(),
+            TreeRangeSet.create());
+    }
+
+    static LocalFrontendHistory recreate(final String persistenceId, final ShardDataTree tree,
+            final ShardDataTreeTransactionChain chain, final Map<UnsignedLong, Boolean> closedTransactions,
+            final RangeSet<UnsignedLong> purgedTransactions) {
+        return new LocalFrontendHistory(persistenceId, tree, chain, new HashMap<>(closedTransactions),
+            TreeRangeSet.create(purgedTransactions));
+    }
+
     @Override
     public LocalHistoryIdentifier getIdentifier() {
         return chain.getIdentifier();
@@ -46,23 +55,17 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
 
     @Override
     FrontendTransaction createOpenSnapshot(final TransactionIdentifier id) throws RequestException {
-        checkDeadTransaction(id);
-        lastSeenTransaction = id.getTransactionId();
         return FrontendReadOnlyTransaction.create(this, chain.newReadOnlyTransaction(id));
     }
 
     @Override
     FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
-        checkDeadTransaction(id);
-        lastSeenTransaction = id.getTransactionId();
         return FrontendReadWriteTransaction.createOpen(this, chain.newReadWriteTransaction(id));
     }
 
     @Override
     FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod)
             throws RequestException {
-        checkDeadTransaction(id);
-        lastSeenTransaction = id.getTransactionId();
         return FrontendReadWriteTransaction.createReady(this, id, mod);
     }
 
@@ -70,29 +73,4 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
         return chain.createReadyCohort(id, mod);
     }
-
-    void destroy(final long sequence, final RequestEnvelope envelope, final long now)
-            throws RequestException {
-        LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
-        tree.closeTransactionChain(getIdentifier(), () -> {
-            envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
-        });
-    }
-
-    void purge(final long sequence, final RequestEnvelope envelope, final long now) {
-        LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
-        tree.purgeTransactionChain(getIdentifier(), () -> {
-            envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
-        });
-    }
-
-    private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException {
-        // FIXME: check if this history is still open
-        // FIXME: check if the last transaction has been submitted
-
-        // Transaction identifiers within a local history have to have increasing IDs
-        if (lastSeenTransaction != null && Long.compareUnsigned(lastSeenTransaction, id.getTransactionId()) >= 0) {
-            throw new DeadTransactionException(lastSeenTransaction);
-        }
-    }
 }
index 040a652..4df1352 100644 (file)
@@ -11,12 +11,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 
 final class ReadOnlyShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeSnapshot> {
-    ReadOnlyShardDataTreeTransaction(final TransactionIdentifier id, final DataTreeSnapshot snapshot) {
-        super(id, snapshot);
-    }
-
-    @Override
-    void abort() {
-        close();
+    ReadOnlyShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id,
+        final DataTreeSnapshot snapshot) {
+        super(parent, id, snapshot);
     }
 }
index 492f6ec..fe5dba8 100644 (file)
@@ -12,24 +12,14 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
-    private final ShardDataTreeTransactionParent parent;
 
-    ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent,
-            final TransactionIdentifier id, final DataTreeModification modification) {
-        super(id, modification);
-        this.parent = Preconditions.checkNotNull(parent);
-    }
-
-    @Override
-    void abort() {
-        Preconditions.checkState(close(), "Transaction is already closed");
-
-        parent.abortTransaction(this);
+    ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id,
+        final DataTreeModification modification) {
+        super(parent, id, modification);
     }
 
     ShardDataTreeCohort ready() {
         Preconditions.checkState(close(), "Transaction is already closed");
-
-        return parent.finishTransaction(this);
+        return getParent().finishTransaction(this);
     }
 }
index 7ca79fc..7915a62 100644 (file)
@@ -18,13 +18,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
+import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Range;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
@@ -164,8 +165,8 @@ public class Shard extends RaftActor {
 
     private final ShardTransactionMessageRetrySupport messageRetrySupport;
 
-    private final FrontendMetadata frontendMetadata = new FrontendMetadata();
-    private final Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = new HashMap<>();
+    private final FrontendMetadata frontendMetadata;
+    private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
 
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
@@ -174,6 +175,7 @@ public class Shard extends RaftActor {
         this.name = builder.getId().toString();
         this.datastoreContext = builder.getDatastoreContext();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
+        this.frontendMetadata = new FrontendMetadata(name);
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -733,7 +735,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
-            store.closeAllTransactionChains();
+            store.purgeLeaderState();
         }
 
         if (hasLeader && !isIsolatedLeader()) {
@@ -745,8 +747,18 @@ public class Shard extends RaftActor {
     protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
 
-        boolean hasLeader = hasLeader();
-        if (hasLeader && !isLeader()) {
+        final boolean hasLeader = hasLeader();
+        if (!hasLeader) {
+            // No leader implies we are not the leader, lose frontend state if we have any. This also places
+            // an explicit guard so the map will not get modified accidentally.
+            if (!knownFrontends.isEmpty()) {
+                LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
+                knownFrontends = ImmutableMap.of();
+            }
+            return;
+        }
+
+        if (!isLeader()) {
             // Another leader was elected. If we were the previous leader and had pending transactions, convert
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
@@ -765,9 +777,13 @@ public class Shard extends RaftActor {
                 commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
                         + "change and the leader address isn't available.", this);
             }
+        } else {
+            // We have become the leader, we need to reconstruct frontend state
+            knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+            LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
         }
 
-        if (hasLeader && !isIsolatedLeader()) {
+        if (!isIsolatedLeader()) {
             messageRetrySupport.retryMessages();
         }
     }
index 613f9ad..e8469d4 100644 (file)
@@ -45,12 +45,14 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
@@ -323,6 +325,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     ((CommitTransactionPayload) payload).getCandidate();
             applyRecoveryCandidate(e.getValue());
             allMetadataCommittedTransaction(e.getKey());
+        } else if (payload instanceof AbortTransactionPayload) {
+            allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+        } else if (payload instanceof PurgeTransactionPayload) {
+            allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
         } else if (payload instanceof CreateLocalHistoryPayload) {
             allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof CloseLocalHistoryPayload) {
@@ -384,6 +390,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 Verify.verify(identifier instanceof TransactionIdentifier);
                 payloadReplicationComplete((TransactionIdentifier) identifier);
             }
+        } else if (payload instanceof AbortTransactionPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((AbortTransactionPayload) payload);
+            } else {
+                allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof PurgeTransactionPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((PurgeTransactionPayload) payload);
+            } else {
+                allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof CloseLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((CloseLocalHistoryPayload) payload);
+            } else {
+                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+            }
         } else if (payload instanceof CloseLocalHistoryPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((CloseLocalHistoryPayload) payload);
@@ -440,12 +464,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         finishCommit(current.cohort);
     }
 
+    private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionAborted(txId);
+        }
+    }
+
     private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
         for (ShardDataTreeMetadata<?> m : metadata) {
             m.onTransactionCommitted(txId);
         }
     }
 
+    private void allMetadataPurgedTransaction(final TransactionIdentifier txId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionPurged(txId);
+        }
+    }
+
     private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) {
         for (ShardDataTreeMetadata<?> m : metadata) {
             m.onHistoryCreated(historyId);
@@ -464,6 +500,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
+    /**
+     * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
+     * this method is used for re-establishing state when we are taking over
+     *
+     * @param historyId Local history identifier
+     * @param closed True if the chain should be created in closed state (i.e. pending purge)
+     * @return Transaction chain handle
+     */
+    ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+            final boolean closed) {
+        final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
+        final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
+        Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId,
+                existing);
+        return ret;
+    }
+
     ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
@@ -477,7 +530,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
         if (txId.getHistoryId().getHistoryId() == 0) {
-            return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
+            return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
         }
 
         return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
@@ -518,14 +571,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     /**
-     * Immediately close all transaction chains.
+     * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
+     * replication callbacks.
      */
-    void closeAllTransactionChains() {
+    void purgeLeaderState() {
         for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
             chain.close();
         }
 
         transactionChains.clear();
+        replicationCallbacks.clear();
     }
 
     /**
@@ -597,8 +652,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
-        // Intentional no-op
+    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+        final TransactionIdentifier id = transaction.getIdentifier();
+        LOG.debug("{}: aborting transaction {}", logContext, id);
+        replicatePayload(id, AbortTransactionPayload.create(id), callback);
+    }
+
+
+    @Override
+    void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+        LOG.debug("{}: purging transaction {}", logContext, id);
+        replicatePayload(id, PurgeTransactionPayload.create(id), callback);
     }
 
     @Override
index 47d07c0..7db3a22 100644 (file)
@@ -15,23 +15,52 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 
 abstract class ShardDataTreeMetadata<T extends ShardDataTreeSnapshotMetadata<T>> {
+    /**
+     * Apply a recovered metadata snapshot.
+     *
+     * @param snapshot Metadata snapshot
+     */
     final void applySnapshot(@Nonnull final ShardDataTreeSnapshotMetadata<?> snapshot) {
         Verify.verify(getSupportedType().isInstance(snapshot), "Snapshot %s misrouted to handler of %s", snapshot,
             getSupportedType());
         doApplySnapshot(getSupportedType().cast(snapshot));
     }
 
+    /**
+     * Reset metadata to empty state.
+     */
     abstract void reset();
 
+    /**
+     * Apply a recovered metadata snapshot. This is not a public entrypoint, just an interface between the base class
+     * and its subclasses.
+     *
+     * @param snapshot Metadata snapshot
+     */
     abstract void doApplySnapshot(@Nonnull T snapshot);
 
+    /**
+     * Return the type of metadata snapshot this object supports.
+     *
+     * @return Metadata type
+     */
     abstract @Nonnull Class<T> getSupportedType();
 
+    /**
+     * Take a snapshot of current metadata state.
+     *
+     * @return Metadata snapshot, or null if the metadata is empty.
+     */
     abstract @Nullable T toSnapshot();
 
     // Lifecycle events
+
+    abstract void onTransactionAborted(TransactionIdentifier txId);
+
     abstract void onTransactionCommitted(TransactionIdentifier txId);
 
+    abstract void onTransactionPurged(TransactionIdentifier txId);
+
     abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
 
     abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
index 2554151..dfd6680 100644 (file)
@@ -55,7 +55,7 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
         final DataTreeSnapshot snapshot = getSnapshot();
         LOG.debug("Allocated read-only transaction {} snapshot {}", txId, snapshot);
 
-        return new ReadOnlyShardDataTreeTransaction(txId, snapshot);
+        return new ReadOnlyShardDataTreeTransaction(this, txId, snapshot);
     }
 
     ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
@@ -72,17 +72,24 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
     }
 
     @Override
-    protected void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
+    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
         if (transaction instanceof ReadWriteShardDataTreeTransaction) {
             Preconditions.checkState(openTransaction != null,
                     "Attempted to abort transaction %s while none is outstanding", transaction);
-            LOG.debug("Aborted transaction {}", transaction);
+            LOG.debug("Aborted open transaction {}", transaction);
             openTransaction = null;
         }
+
+        dataTree.abortTransaction(transaction, callback);
+    }
+
+    @Override
+    void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+        dataTree.purgeTransaction(id, callback);
     }
 
     @Override
-    protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
         Preconditions.checkState(openTransaction != null,
                 "Attempted to finish transaction %s while none is outstanding", transaction);
 
index 20f5015..2b0b74d 100644 (file)
@@ -12,9 +12,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 
 abstract class ShardDataTreeTransactionParent {
 
-    abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
+    abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction, Runnable callback);
+
+    abstract void purgeTransaction(TransactionIdentifier id, Runnable callback);
 
     abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
 
     abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
+
 }
index e3fbb82..b4fee8e 100644 (file)
@@ -37,15 +37,16 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     private final ShardStats shardStats;
     private final TransactionIdentifier transactionId;
 
-    protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, TransactionIdentifier transactionId) {
+    protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
+            final TransactionIdentifier transactionId) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
         this.shardStats = shardStats;
         this.transactionId = Preconditions.checkNotNull(transactionId);
     }
 
-    public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction,
-            ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats) {
+    public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
+            final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
         return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
     }
 
@@ -60,7 +61,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     }
 
     @Override
-    public void handleReceive(Object message) {
+    public void handleReceive(final Object message) {
         if (CloseTransaction.isSerializedType(message)) {
             closeTransaction(true);
         } else if (message instanceof ReceiveTimeout) {
@@ -75,8 +76,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return true;
     }
 
-    private void closeTransaction(boolean sendReply) {
-        getDOMStoreTransaction().abort();
+    private void closeTransaction(final boolean sendReply) {
+        getDOMStoreTransaction().abort(null);
 
         if (sendReply && returnCloseTransactionReply()) {
             getSender().tell(new CloseTransactionReply(), getSelf());
@@ -85,7 +86,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 
-    private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
+    private boolean checkClosed(final AbstractShardDataTreeTransaction<?> transaction) {
         final boolean ret = transaction.isClosed();
         if (ret) {
             shardStats.incrementFailedReadTransactionsCount();
@@ -95,7 +96,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return ret;
     }
 
-    protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message) {
+    protected void readData(final AbstractShardDataTreeTransaction<?> transaction, final ReadData message) {
         if (checkClosed(transaction)) {
             return;
         }
@@ -106,7 +107,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         sender().tell(readDataReply.toSerializable(), self());
     }
 
-    protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message) {
+    protected void dataExists(final AbstractShardDataTreeTransaction<?> transaction, final DataExists message) {
         if (checkClosed(transaction)) {
             return;
         }
@@ -128,8 +129,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         final ShardStats shardStats;
         final TransactionType type;
 
-        ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction,
-                ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats) {
+        ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
+                final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
             this.transaction = Preconditions.checkNotNull(transaction);
             this.shardActor = shardActor;
             this.shardStats = shardStats;
index fe2588d..f08ff4a 100644 (file)
@@ -8,7 +8,12 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -25,13 +30,27 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory {
     private final LocalHistoryIdentifier identifier;
     private final ShardDataTree tree;
 
-    StandaloneFrontendHistory(final String persistenceId, final Ticker ticker, final ClientIdentifier clientId,
-            final ShardDataTree tree) {
-        super(persistenceId, ticker);
+    private StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId,
+            final ShardDataTree tree, final Map<UnsignedLong, Boolean> closedTransactions,
+            final RangeSet<UnsignedLong> purgedTransactions) {
+        super(persistenceId, tree, closedTransactions, purgedTransactions);
         this.identifier = new LocalHistoryIdentifier(clientId, 0);
         this.tree = Preconditions.checkNotNull(tree);
     }
 
+    static StandaloneFrontendHistory create(final String persistenceId, final ClientIdentifier clientId,
+            final ShardDataTree tree) {
+        return new StandaloneFrontendHistory(persistenceId, clientId, tree, ImmutableMap.of(),
+            TreeRangeSet.create());
+    }
+
+    static StandaloneFrontendHistory recreate(final String persistenceId, final ClientIdentifier clientId,
+            final ShardDataTree tree, final Map<UnsignedLong, Boolean> closedTransactions,
+            final RangeSet<UnsignedLong> purgedTransactions) {
+        return new StandaloneFrontendHistory(persistenceId, clientId, tree, new HashMap<>(closedTransactions),
+            purgedTransactions);
+    }
+
     @Override
     public LocalHistoryIdentifier getIdentifier() {
         return identifier;
index b553bf9..751f5b2 100644 (file)
@@ -7,11 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore.persisted;
 
+import com.google.common.base.Throwables;
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
 import java.io.DataInput;
 import java.io.IOException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Payload persisted when a transaction is aborted. It contains the transaction identifier.
@@ -45,15 +48,22 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload<T
         }
     }
 
+    private static final Logger LOG = LoggerFactory.getLogger(AbortTransactionPayload.class);
     private static final long serialVersionUID = 1L;
 
     AbortTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
         super(transactionId, serialized);
     }
 
-    public static AbortTransactionPayload create(final TransactionIdentifier transactionId) throws IOException {
+    public static AbortTransactionPayload create(final TransactionIdentifier transactionId) {
         final ByteArrayDataOutput out = ByteStreams.newDataOutput();
-        transactionId.writeTo(out);
+        try {
+            transactionId.writeTo(out);
+        } catch (IOException e) {
+            // This should never happen
+            LOG.error("Failed to serialize {}", transactionId, e);
+            throw Throwables.propagate(e);
+        }
         return new AbortTransactionPayload(transactionId, out.toByteArray());
     }
 
index 2a52ecc..5429fd1 100644 (file)
@@ -8,24 +8,37 @@
 package org.opendaylight.controller.cluster.datastore.persisted;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import org.opendaylight.yangtools.concepts.WritableObject;
 import org.opendaylight.yangtools.concepts.WritableObjects;
 
 public final class FrontendHistoryMetadata implements WritableObject {
+    private final RangeSet<UnsignedLong> purgedTransactions;
+    private final Map<UnsignedLong, Boolean> closedTransactions;
     private final long historyId;
     private final long cookie;
-    private final long nextTransaction;
     private final boolean closed;
 
-    public FrontendHistoryMetadata(final long historyId, final long cookie, final long nextTransaction,
-            final boolean closed) {
+    public FrontendHistoryMetadata(final long historyId, final long cookie, final boolean closed,
+            final Map<UnsignedLong, Boolean> closedTransactions, final RangeSet<UnsignedLong> purgedTransactions) {
         this.historyId = historyId;
         this.cookie = cookie;
-        this.nextTransaction = nextTransaction;
         this.closed = closed;
+        this.closedTransactions = ImmutableMap.copyOf(closedTransactions);
+        this.purgedTransactions = ImmutableRangeSet.copyOf(purgedTransactions);
     }
 
     public long getHistoryId() {
@@ -36,34 +49,70 @@ public final class FrontendHistoryMetadata implements WritableObject {
         return cookie;
     }
 
-    public long getNextTransaction() {
-        return nextTransaction;
-    }
-
     public boolean isClosed() {
         return closed;
     }
 
+    public Map<UnsignedLong, Boolean> getClosedTransactions() {
+        return closedTransactions;
+    }
+
+    public RangeSet<UnsignedLong> getPurgedTransactions() {
+        return purgedTransactions;
+    }
+
     @Override
     public void writeTo(final DataOutput out) throws IOException {
         WritableObjects.writeLongs(out, historyId, cookie);
-        WritableObjects.writeLong(out, nextTransaction);
         out.writeBoolean(closed);
+
+        final Set<Range<UnsignedLong>> purgedRanges = purgedTransactions.asRanges();
+        WritableObjects.writeLongs(out, closedTransactions.size(), purgedRanges.size());
+        for (Entry<UnsignedLong, Boolean> e : closedTransactions.entrySet()) {
+            WritableObjects.writeLong(out, e.getKey().longValue());
+            out.writeBoolean(e.getValue().booleanValue());
+        }
+        for (Range<UnsignedLong> r : purgedRanges) {
+            WritableObjects.writeLongs(out, r.lowerEndpoint().longValue(), r.upperEndpoint().longValue());
+        }
     }
 
     public static FrontendHistoryMetadata readFrom(final DataInput in) throws IOException {
-        final byte header = WritableObjects.readLongHeader(in);
+        byte header = WritableObjects.readLongHeader(in);
         final long historyId = WritableObjects.readFirstLong(in, header);
         final long cookie = WritableObjects.readSecondLong(in, header);
-        final long nextTransaction = WritableObjects.readLong(in);
         final boolean closed = in.readBoolean();
 
-        return new FrontendHistoryMetadata(historyId, cookie, nextTransaction, closed);
+        header = WritableObjects.readLongHeader(in);
+        long ls = WritableObjects.readFirstLong(in, header);
+        Verify.verify(ls >= 0 && ls <= Integer.MAX_VALUE);
+        final int csize = (int) ls;
+
+        ls = WritableObjects.readSecondLong(in, header);
+        Verify.verify(ls >= 0 && ls <= Integer.MAX_VALUE);
+        final int psize = (int) ls;
+
+        final Map<UnsignedLong, Boolean> closedTransactions = new HashMap<>(csize);
+        for (int i = 0; i < csize; ++i) {
+            final UnsignedLong key = UnsignedLong.fromLongBits(WritableObjects.readLong(in));
+            final Boolean value = Boolean.valueOf(in.readBoolean());
+            closedTransactions.put(key, value);
+        }
+        final RangeSet<UnsignedLong> purgedTransactions = TreeRangeSet.create();
+        for (int i = 0; i < psize; ++i) {
+            final byte h = WritableObjects.readLongHeader(in);
+            final UnsignedLong l = UnsignedLong.fromLongBits(WritableObjects.readFirstLong(in, h));
+            final UnsignedLong u = UnsignedLong.fromLongBits(WritableObjects.readSecondLong(in, h));
+            purgedTransactions.add(Range.closed(l, u));
+        }
+
+        return new FrontendHistoryMetadata(historyId, cookie, closed, closedTransactions, purgedTransactions);
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(FrontendHistoryMetadata.class).add("historiId", historyId)
-                .add("cookie", cookie).add("nextTransaction", nextTransaction).add("closed", closed).toString();
+                .add("cookie", cookie).add("closed", closed).add("closedTransactions", closedTransactions)
+                .add("purgedTransactions", purgedTransactions).toString();
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java
new file mode 100644 (file)
index 0000000..71e794b
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a transaction is purged from the frontend. It contains the transaction identifier.
+ *
+ * @author Robert Varga
+ */
+public final class PurgeTransactionPayload extends AbstractIdentifiablePayload<TransactionIdentifier> {
+    private static final class Proxy extends AbstractProxy<TransactionIdentifier> {
+        private static final long serialVersionUID = 1L;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final byte[] serialized) {
+            super(serialized);
+        }
+
+        @Override
+        protected TransactionIdentifier readIdentifier(final DataInput in) throws IOException {
+            return TransactionIdentifier.readFrom(in);
+        }
+
+        @Override
+        protected PurgeTransactionPayload createObject(final TransactionIdentifier identifier,
+                final byte[] serialized) {
+            return new PurgeTransactionPayload(identifier, serialized);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(PurgeTransactionPayload.class);
+    private static final long serialVersionUID = 1L;
+
+    PurgeTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
+        super(transactionId, serialized);
+    }
+
+    public static PurgeTransactionPayload create(final TransactionIdentifier transactionId) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        try {
+            transactionId.writeTo(out);
+        } catch (IOException e) {
+            // This should never happen
+            LOG.error("Failed to serialize {}", transactionId, e);
+            throw Throwables.propagate(e);
+        }
+        return new PurgeTransactionPayload(transactionId, out.toByteArray());
+    }
+
+    @Override
+    protected Proxy externalizableProxy(final byte[] serialized) {
+        return new Proxy(serialized);
+    }
+}
index 6ccede2..00087e3 100644 (file)
@@ -70,7 +70,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort();
+        subject.underlyingActor().getDOMStoreTransaction().abort(null);
 
         future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
                 DataStoreVersions.CURRENT_VERSION), 3000);
@@ -92,7 +92,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort();
+        subject.underlyingActor().getDOMStoreTransaction().abort(null);
 
         future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
                 DataStoreVersions.CURRENT_VERSION), 3000);
@@ -113,7 +113,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort();
+        subject.underlyingActor().getDOMStoreTransaction().abort(null);
 
         future = akka.pattern.Patterns.ask(subject,
                 new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
index aa2a9db..43b2c9e 100644 (file)
@@ -9,14 +9,13 @@ package org.opendaylight.controller.cluster.datastore.persisted;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.IOException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractTest;
 
 public class AbortTransactionPayloadTest extends AbstractTest {
     @Test
-    public void testPayloadSerDes() throws IOException {
+    public void testPayloadSerDes() {
         final AbortTransactionPayload template = AbortTransactionPayload.create(nextTransactionId());
         final AbortTransactionPayload cloned = SerializationUtils.clone(template);
         assertEquals(template.getIdentifier(), cloned.getIdentifier());
index e175a09..f873485 100644 (file)
@@ -12,6 +12,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 import com.google.common.collect.TreeRangeSet;
@@ -90,7 +91,7 @@ public class FrontendShardDataTreeSnapshotMetadataTest {
     }
 
     private static FrontendShardDataTreeSnapshotMetadata createMetadataSnapshot(final int size) {
-        final List<FrontendClientMetadata> clients = new ArrayList<>();
+        final List<FrontendClientMetadata> clients = new ArrayList<>(size);
         for (long i = 0; i < size; i++) {
             clients.add(createFrontedClientMetadata(i));
         }
@@ -107,8 +108,9 @@ public class FrontendShardDataTreeSnapshotMetadataTest {
         final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
         purgedHistories.add(Range.closed(UnsignedLong.ZERO, UnsignedLong.ONE));
 
-        final Collection<FrontendHistoryMetadata> currentHistories = Collections
-                .singleton(new FrontendHistoryMetadata(num, num, num, true));
+        final Collection<FrontendHistoryMetadata> currentHistories = Collections.singleton(
+            new FrontendHistoryMetadata(num, num, true, ImmutableMap.of(UnsignedLong.ZERO, Boolean.TRUE),
+                purgedHistories));
 
         return new FrontendClientMetadata(clientIdentifier, purgedHistories, currentHistories);
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.