BUG-5280: add {Create,Close,Purge}LocalHistoryPayload 26/39426/73
authorRobert Varga <rovarga@cisco.com>
Wed, 25 May 2016 15:44:53 +0000 (17:44 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 27 Feb 2017 12:45:33 +0000 (12:45 +0000)
This patch introduces three new payloads which deal with replicating
local histories to followers. These are persisted whenever a transaction
chain (e.g. local history) is created or closed cleanly on the shard
leader. Followers can use these to track transaction chains and pick up
processing in case of a leader failover.

Change-Id: I3fe5ac153c88f23f9b871bd23cb04a8e2410af91
Signed-off-by: Robert Varga <rovarga@cisco.com>
12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 15e4304a4693942532c54abfea818f8c4af90e56..cd1235b46072ce68a269fb393375795c9bed0d0e 100644 (file)
@@ -54,6 +54,11 @@ final class FrontendClientMetadataBuilder implements Builder<FrontendClientMetad
         return identifier;
     }
 
         return identifier;
     }
 
+    void onHistoryCreated(final LocalHistoryIdentifier historyId) {
+        // TODO Auto-generated method stub
+
+    }
+
     void onHistoryClosed(final LocalHistoryIdentifier historyId) {
         ensureHistory(historyId).onHistoryClosed();
     }
     void onHistoryClosed(final LocalHistoryIdentifier historyId) {
         ensureHistory(historyId).onHistoryClosed();
     }
index 6a04674b6ca2919c32b63a28e1b699a037d73cc1..1788f994c872503e73ebe64e64eb33175e2493bd 100644 (file)
@@ -74,6 +74,11 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
         return client;
     }
 
         return client;
     }
 
+    @Override
+    void onHistoryCreated(final LocalHistoryIdentifier historyId) {
+        ensureClient(historyId.getClientId()).onHistoryCreated(historyId);
+    }
+
     @Override
     void onHistoryClosed(final LocalHistoryIdentifier historyId) {
         ensureClient(historyId.getClientId()).onHistoryClosed(historyId);
     @Override
     void onHistoryClosed(final LocalHistoryIdentifier historyId) {
         ensureClient(historyId.getClientId()).onHistoryClosed(historyId);
index 297759b5c86ffd798b0612ce32e96fe6f7e6af25..deca605a5b97ec18d041d050d5fe0c467feb436e 100644 (file)
@@ -101,9 +101,9 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             if (request instanceof CreateLocalHistoryRequest) {
                 return handleCreateHistory((CreateLocalHistoryRequest) request);
             } else if (request instanceof DestroyLocalHistoryRequest) {
             if (request instanceof CreateLocalHistoryRequest) {
                 return handleCreateHistory((CreateLocalHistoryRequest) request);
             } else if (request instanceof DestroyLocalHistoryRequest) {
-                return handleDestroyHistory((DestroyLocalHistoryRequest) request, now);
+                return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof PurgeLocalHistoryRequest) {
             } else if (request instanceof PurgeLocalHistoryRequest) {
-                return handlePurgeHistory((PurgeLocalHistoryRequest)request, now);
+                return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
             } else {
                 throw new UnsupportedRequestException(request);
             }
             } else {
                 throw new UnsupportedRequestException(request);
             }
@@ -133,12 +133,13 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             lastSeenHistory = id.getHistoryId();
         }
 
             lastSeenHistory = id.getHistoryId();
         }
 
-        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ticker(), tree.ensureTransactionChain(id)));
+        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(id)));
         LOG.debug("{}: created history {}", persistenceId, id);
         return new LocalHistorySuccess(id, request.getSequence());
     }
 
         LOG.debug("{}: created history {}", persistenceId, id);
         return new LocalHistorySuccess(id, request.getSequence());
     }
 
-    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, final long now)
+    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
+            final RequestEnvelope envelope, final long now)
             throws RequestException {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.get(id);
             throws RequestException {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.get(id);
@@ -148,29 +149,23 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             return new LocalHistorySuccess(id, request.getSequence());
         }
 
             return new LocalHistorySuccess(id, request.getSequence());
         }
 
-        return existing.destroy(request.getSequence(), now);
+        existing.destroy(request.getSequence(), envelope, now);
+        return null;
     }
 
     }
 
-    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, final long now)
-            throws RequestException {
+    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.remove(id);
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.remove(id);
-        if (existing != null) {
-            purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
-
-            if (!existing.isDestroyed()) {
-                LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
-                existing.destroy(request.getSequence(), now);
-            }
-
-            // FIXME: record a PURGE tombstone in the journal
-
-            LOG.debug("{}: purged history {}", persistenceId, id);
-        } else {
+        if (existing == null) {
             LOG.debug("{}: history {} has already been purged", persistenceId, id);
             LOG.debug("{}: history {} has already been purged", persistenceId, id);
+            return new LocalHistorySuccess(id, request.getSequence());
         }
 
         }
 
-        return new LocalHistorySuccess(id, request.getSequence());
+        LOG.debug("{}: purging history {}", persistenceId, id);
+        purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
+        existing.purge(request.getSequence(), envelope, now);
+        return null;
     }
 
     @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
     }
 
     @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
index cd0cc30a09738e49915939cd967a0dc04df472c4..8e32c76ba794679dc3cdd84004946f1b57087ade 100644 (file)
@@ -8,10 +8,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
 import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
@@ -25,20 +25,17 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 final class LocalFrontendHistory extends AbstractFrontendHistory {
  * @author Robert Varga
  */
 final class LocalFrontendHistory extends AbstractFrontendHistory {
-    private enum State {
-        OPEN,
-        CLOSED,
-    }
-
     private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class);
 
     private final ShardDataTreeTransactionChain chain;
     private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class);
 
     private final ShardDataTreeTransactionChain chain;
+    private final ShardDataTree tree;
 
     private Long lastSeenTransaction;
 
     private Long lastSeenTransaction;
-    private State state = State.OPEN;
 
 
-    LocalFrontendHistory(final String persistenceId, final Ticker ticker, final ShardDataTreeTransactionChain chain) {
-        super(persistenceId, ticker);
+    LocalFrontendHistory(final String persistenceId, final ShardDataTree tree,
+            final ShardDataTreeTransactionChain chain) {
+        super(persistenceId, tree.ticker());
+        this.tree = Preconditions.checkNotNull(tree);
         this.chain = Preconditions.checkNotNull(chain);
     }
 
         this.chain = Preconditions.checkNotNull(chain);
     }
 
@@ -74,20 +71,19 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
         return chain.createReadyCohort(id, mod);
     }
 
         return chain.createReadyCohort(id, mod);
     }
 
-    LocalHistorySuccess destroy(final long sequence, final long now) throws RequestException {
-        if (state != State.CLOSED) {
-            LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
-
-            // FIXME: add any finalization as needed
-            state = State.CLOSED;
-        }
-
-        // FIXME: record a DESTROY tombstone in the journal
-        return new LocalHistorySuccess(getIdentifier(), sequence);
+    void destroy(final long sequence, final RequestEnvelope envelope, final long now)
+            throws RequestException {
+        LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
+        tree.closeTransactionChain(getIdentifier(), () -> {
+            envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
+        });
     }
 
     }
 
-    boolean isDestroyed() {
-        return state == State.CLOSED;
+    void purge(final long sequence, final RequestEnvelope envelope, final long now) {
+        LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
+        tree.purgeTransactionChain(getIdentifier(), () -> {
+            envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
+        });
     }
 
     private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException {
     }
 
     private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException {
index 26295fe852ef6dea3578c3f6697d29d8879d2866..cb072b5c6a031a9ee5e5c3ba1753939cb66509ab 100644 (file)
@@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -453,13 +454,13 @@ public class Shard extends RaftActor {
     }
 
     // applyState() will be invoked once consensus is reached on the payload
     }
 
     // applyState() will be invoked once consensus is reached on the payload
-    void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) {
+    void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
         boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
         if (canSkipPayload) {
         boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
         if (canSkipPayload) {
-            applyState(self(), transactionId, payload);
+            applyState(self(), id, payload);
         } else {
             // We are faking the sender
         } else {
             // We are faking the sender
-            persistData(self(), transactionId, payload, batchHint);
+            persistData(self(), id, payload, batchHint);
         }
     }
 
         }
     }
 
@@ -614,7 +615,7 @@ public class Shard extends RaftActor {
         doAbortTransaction(abort.getTransactionId(), getSender());
     }
 
         doAbortTransaction(abort.getTransactionId(), getSender());
     }
 
-    void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
+    void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
         commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
         commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
@@ -630,7 +631,8 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        store.closeTransactionChain(closeTransactionChain.getIdentifier());
+        final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+        store.closeTransactionChain(id, () -> store.purgeTransactionChain(id, null));
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
index d398afefa7869cb70cc2eec74cf9153a918b162f..613f9adbc9355c4cabb8a776f31069bdfba6660c 100644 (file)
@@ -39,13 +39,18 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.UnaryOperator;
 import javax.annotation.Nonnull;
 import java.util.function.Consumer;
 import java.util.function.UnaryOperator;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
+import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
@@ -102,10 +107,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
+
+    /**
+     * Callbacks that need to be invoked once a payload is replicated.
+     */
+    private final Map<Payload, Runnable> replicationCallbacks = new HashMap<>();
+
     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
     private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
     private final Collection<ShardDataTreeMetadata<?>> metadata;
     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
     private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
     private final Collection<ShardDataTreeMetadata<?>> metadata;
@@ -150,8 +162,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
-                new DefaultShardDataTreeChangeListenerPublisher(),
-                new DefaultShardDataChangeListenerPublisher(), "");
+                new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), "");
     }
 
     final String logContext() {
     }
 
     final String logContext() {
@@ -312,6 +323,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     ((CommitTransactionPayload) payload).getCandidate();
             applyRecoveryCandidate(e.getValue());
             allMetadataCommittedTransaction(e.getKey());
                     ((CommitTransactionPayload) payload).getCandidate();
             applyRecoveryCandidate(e.getValue());
             allMetadataCommittedTransaction(e.getKey());
+        } else if (payload instanceof CreateLocalHistoryPayload) {
+            allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof CloseLocalHistoryPayload) {
+            allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof PurgeLocalHistoryPayload) {
+            allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof DataTreeCandidatePayload) {
             applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
         } else {
         } else if (payload instanceof DataTreeCandidatePayload) {
             applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
         } else {
@@ -367,11 +384,46 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 Verify.verify(identifier instanceof TransactionIdentifier);
                 payloadReplicationComplete((TransactionIdentifier) identifier);
             }
                 Verify.verify(identifier instanceof TransactionIdentifier);
                 payloadReplicationComplete((TransactionIdentifier) identifier);
             }
+        } else if (payload instanceof CloseLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((CloseLocalHistoryPayload) payload);
+            } else {
+                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof CreateLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((CreateLocalHistoryPayload)payload);
+            } else {
+                allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof PurgeLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
+            } else {
+                allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+            }
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
     }
 
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
     }
 
+    private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) {
+        if (callback != null) {
+            replicationCallbacks.put(payload, callback);
+        }
+        shard.persistPayload(id, payload, true);
+    }
+
+    private void payloadReplicationComplete(final AbstractIdentifiablePayload<?> payload) {
+        final Runnable callback = replicationCallbacks.remove(payload);
+        if (callback != null) {
+            LOG.debug("{}: replication of {} completed, invoking {}", logContext, payload.getIdentifier(), callback);
+            callback.run();
+        } else {
+            LOG.debug("{}: replication of {} has no callback", logContext, payload.getIdentifier());
+        }
+    }
+
     private void payloadReplicationComplete(final TransactionIdentifier txId) {
         final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
     private void payloadReplicationComplete(final TransactionIdentifier txId) {
         final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
@@ -394,11 +446,30 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
         }
     }
 
-    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
-        ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
+    private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onHistoryCreated(historyId);
+        }
+    }
+
+    private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onHistoryClosed(historyId);
+        }
+    }
+
+    private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onHistoryPurged(historyId);
+        }
+    }
+
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+        ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
         if (chain == null) {
-            chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
-            transactionChains.put(localHistoryIdentifier, chain);
+            chain = new ShardDataTreeTransactionChain(historyId, this);
+            transactionChains.put(historyId, chain);
+            shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true);
         }
 
         return chain;
         }
 
         return chain;
@@ -446,6 +517,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
         }
     }
 
+    /**
+     * Immediately close all transaction chains.
+     */
     void closeAllTransactionChains() {
         for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
             chain.close();
     void closeAllTransactionChains() {
         for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
             chain.close();
@@ -454,13 +528,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         transactionChains.clear();
     }
 
         transactionChains.clear();
     }
 
-    void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) {
-        final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
-        if (chain != null) {
-            chain.close();
-        } else {
-            LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId);
+    /**
+     * Close a single transaction chain.
+     *
+     * @param id History identifier
+     * @param callback Callback to invoke upon completion, may be null
+     */
+    void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+        final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+        if (chain == null) {
+            LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id);
+            if (callback != null) {
+                callback.run();
+            }
+            return;
         }
         }
+
+        chain.close();
+        replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+    }
+
+    /**
+     * Purge a single transaction chain.
+     *
+     * @param id History identifier
+     * @param callback Callback to invoke upon completion, may be null
+     */
+    void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+        final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
+        if (chain == null) {
+            LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
+            if (callback != null) {
+                callback.run();
+            }
+            return;
+        }
+
+        replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
     }
 
     Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
     }
 
     Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
@@ -570,7 +674,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
                 tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
-                entry.lastAccess = shard.ticker().read();
+                entry.lastAccess = ticker().read();
                 return;
             } catch (ConflictingModificationAppliedException e) {
                 LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
                 return;
             } catch (ConflictingModificationAppliedException e) {
                 LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
@@ -600,7 +704,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingTransaction();
     }
 
         processNextPendingTransaction();
     }
 
-    private void processNextPending(Queue<CommitEntry> queue, State allowedState, Consumer<CommitEntry> processor) {
+    private void processNextPending(final Queue<CommitEntry> queue, final State allowedState,
+            final Consumer<CommitEntry> processor) {
         while (!queue.isEmpty()) {
             final CommitEntry entry = queue.peek();
             final SimpleShardDataTreeCohort cohort = entry.cohort;
         while (!queue.isEmpty()) {
             final CommitEntry entry = queue.peek();
             final SimpleShardDataTreeCohort cohort = entry.cohort;
@@ -669,7 +774,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         // Set the tip of the data tree.
         tip = Verify.verifyNotNull(candidate);
 
         // Set the tip of the data tree.
         tip = Verify.verifyNotNull(candidate);
 
-        entry.lastAccess = shard.ticker().read();
+        entry.lastAccess = ticker().read();
 
         pendingTransactions.remove();
         pendingCommits.add(entry);
 
         pendingTransactions.remove();
         pendingCommits.add(entry);
@@ -785,14 +890,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             final DataTreeModification modification) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
                 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
             final DataTreeModification modification) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
                 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
-        pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read()));
+        pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
         return cohort;
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
         return cohort;
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
-        final long now = shard.ticker().read();
+        final long now = ticker().read();
 
         final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
             !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
 
         final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
             !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
@@ -904,7 +1009,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void rebaseTransactions(Iterator<CommitEntry> iter, @Nonnull TipProducingDataTreeTip newTip) {
+    private void rebaseTransactions(final Iterator<CommitEntry> iter, @Nonnull final TipProducingDataTreeTip newTip) {
         tip = Preconditions.checkNotNull(newTip);
         while (iter.hasNext()) {
             final SimpleShardDataTreeCohort cohort = iter.next().cohort;
         tip = Preconditions.checkNotNull(newTip);
         while (iter.hasNext()) {
             final SimpleShardDataTreeCohort cohort = iter.next().cohort;
index c0c3d6cbb0a584b1523473d2baeb01a12f96e499..47d07c0892f4e73f5c03dd6a8bb617862aaedd8f 100644 (file)
@@ -32,7 +32,10 @@ abstract class ShardDataTreeMetadata<T extends ShardDataTreeSnapshotMetadata<T>>
     // Lifecycle events
     abstract void onTransactionCommitted(TransactionIdentifier txId);
 
     // Lifecycle events
     abstract void onTransactionCommitted(TransactionIdentifier txId);
 
+    abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
+
     abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
 
     abstract void onHistoryPurged(LocalHistoryIdentifier historyId);
     abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
 
     abstract void onHistoryPurged(LocalHistoryIdentifier historyId);
+
 }
 }
index 312e11290c88e713827fe2b051d96b667eb633da..2554151dd73c6e4a0442169474530f21d0bcd586 100644 (file)
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
 @NotThreadSafe
 final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
         implements Identifiable<LocalHistoryIdentifier> {
 @NotThreadSafe
 final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
         implements Identifiable<LocalHistoryIdentifier> {
+
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class);
     private final LocalHistoryIdentifier chainId;
     private final ShardDataTree dataTree;
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class);
     private final LocalHistoryIdentifier chainId;
     private final ShardDataTree dataTree;
@@ -67,6 +68,7 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
 
     void close() {
         closed = true;
 
     void close() {
         closed = true;
+        LOG.debug("Closing chain {}", chainId);
     }
 
     @Override
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java
new file mode 100644 (file)
index 0000000..5dc8e5f
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a local history is closed cleanly. It contains a {@link LocalHistoryIdentifier}.
+ *
+ * @author Robert Varga
+ */
+public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload<LocalHistoryIdentifier> {
+    private static final class Proxy extends AbstractProxy<LocalHistoryIdentifier> {
+        private static final long serialVersionUID = 1L;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final byte[] serialized) {
+            super(serialized);
+        }
+
+        @Override
+        protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException {
+            return LocalHistoryIdentifier.readFrom(in);
+        }
+
+        @Override
+        protected CloseLocalHistoryPayload createObject(final LocalHistoryIdentifier identifier,
+                final byte[] serialized) {
+            return new CloseLocalHistoryPayload(identifier, serialized);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(CloseLocalHistoryPayload.class);
+    private static final long serialVersionUID = 1L;
+
+    CloseLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
+        super(historyId, serialized);
+    }
+
+    public static CloseLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        try {
+            historyId.writeTo(out);
+        } catch (IOException e) {
+            // This should never happen
+            LOG.error("Failed to serialize {}", historyId, e);
+            throw Throwables.propagate(e);
+        }
+        return new CloseLocalHistoryPayload(historyId, out.toByteArray());
+    }
+
+    @Override
+    protected Proxy externalizableProxy(final byte[] serialized) {
+        return new Proxy(serialized);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java
new file mode 100644 (file)
index 0000000..4b824bb
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a local history is created. It contains a {@link LocalHistoryIdentifier}.
+ *
+ * @author Robert Varga
+ */
+public final class CreateLocalHistoryPayload extends AbstractIdentifiablePayload<LocalHistoryIdentifier> {
+    private static final class Proxy extends AbstractProxy<LocalHistoryIdentifier> {
+        private static final long serialVersionUID = 1L;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final byte[] serialized) {
+            super(serialized);
+        }
+
+        @Override
+        protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException {
+            return LocalHistoryIdentifier.readFrom(in);
+        }
+
+        @Override
+        protected CreateLocalHistoryPayload createObject(final LocalHistoryIdentifier identifier,
+                final byte[] serialized) {
+            return new CreateLocalHistoryPayload(identifier, serialized);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(CreateLocalHistoryPayload.class);
+    private static final long serialVersionUID = 1L;
+
+    CreateLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
+        super(historyId, serialized);
+    }
+
+    public static CreateLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        try {
+            historyId.writeTo(out);
+        } catch (IOException e) {
+            // This should never happen
+            LOG.error("Failed to serialize {}", historyId, e);
+            throw Throwables.propagate(e);
+        }
+        return new CreateLocalHistoryPayload(historyId, out.toByteArray());
+    }
+
+    @Override
+    protected Proxy externalizableProxy(final byte[] serialized) {
+        return new Proxy(serialized);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java
new file mode 100644 (file)
index 0000000..91ad74d
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a local history is completely purged, i.e. the frontend has removed it from its tracking.
+ * It contains a {@link LocalHistoryIdentifier}.
+ *
+ * @author Robert Varga
+ */
+public final class PurgeLocalHistoryPayload extends AbstractIdentifiablePayload<LocalHistoryIdentifier> {
+    private static final class Proxy extends AbstractProxy<LocalHistoryIdentifier> {
+        private static final long serialVersionUID = 1L;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final byte[] serialized) {
+            super(serialized);
+        }
+
+        @Override
+        protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException {
+            return LocalHistoryIdentifier.readFrom(in);
+        }
+
+        @Override
+        protected PurgeLocalHistoryPayload createObject(final LocalHistoryIdentifier identifier,
+                final byte[] serialized) {
+            return new PurgeLocalHistoryPayload(identifier, serialized);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(PurgeLocalHistoryPayload.class);
+    private static final long serialVersionUID = 1L;
+
+    PurgeLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
+        super(historyId, serialized);
+    }
+
+    public static PurgeLocalHistoryPayload create(final LocalHistoryIdentifier historyId) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        try {
+            historyId.writeTo(out);
+        } catch (IOException e) {
+            // This should never happen
+            LOG.error("Failed to serialize {}", historyId, e);
+            throw Throwables.propagate(e);
+        }
+        return new PurgeLocalHistoryPayload(historyId, out.toByteArray());
+    }
+
+    @Override
+    protected Proxy externalizableProxy(final byte[] serialized) {
+        return new Proxy(serialized);
+    }
+}
index 45cfd29d255af37b91df19553261570079c65081..6cfde54af73e9e147867bf4c4c9f41be6c2dd6c8 100644 (file)
@@ -105,6 +105,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -1445,15 +1446,15 @@ public class ShardTest extends AbstractShardTest {
             {
                 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
                     @Override
             {
                 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
                     @Override
-                    void persistPayload(final TransactionIdentifier transactionId, final Payload payload,
-                            boolean batchHint) {
+                    void persistPayload(final Identifier id, final Payload payload,
+                            final boolean batchHint) {
                         // Simulate an AbortTransaction message occurring during
                         // replication, after
                         // persisting and before finishing the commit to the
                         // in-memory store.
 
                         // Simulate an AbortTransaction message occurring during
                         // replication, after
                         // persisting and before finishing the commit to the
                         // in-memory store.
 
-                        doAbortTransaction(transactionId, null);
-                        super.persistPayload(transactionId, payload, batchHint);
+                        doAbortTransaction(id, null);
+                        super.persistPayload(id, payload, batchHint);
                     }
                 };
 
                     }
                 };