BUG-5280: add {Create,Close,Purge}LocalHistoryPayload
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index d398afefa7869cb70cc2eec74cf9153a918b162f..613f9adbc9355c4cabb8a776f31069bdfba6660c 100644 (file)
@@ -39,13 +39,18 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.UnaryOperator;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
+import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
@@ -102,10 +107,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
+
+    /**
+     * Callbacks that need to be invoked once a payload is replicated.
+     */
+    private final Map<Payload, Runnable> replicationCallbacks = new HashMap<>();
+
     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
     private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
     private final Collection<ShardDataTreeMetadata<?>> metadata;
@@ -150,8 +162,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
-                new DefaultShardDataTreeChangeListenerPublisher(),
-                new DefaultShardDataChangeListenerPublisher(), "");
+                new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), "");
     }
 
     final String logContext() {
@@ -312,6 +323,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     ((CommitTransactionPayload) payload).getCandidate();
             applyRecoveryCandidate(e.getValue());
             allMetadataCommittedTransaction(e.getKey());
+        } else if (payload instanceof CreateLocalHistoryPayload) {
+            allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof CloseLocalHistoryPayload) {
+            allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof PurgeLocalHistoryPayload) {
+            allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof DataTreeCandidatePayload) {
             applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
         } else {
@@ -367,11 +384,46 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 Verify.verify(identifier instanceof TransactionIdentifier);
                 payloadReplicationComplete((TransactionIdentifier) identifier);
             }
+        } else if (payload instanceof CloseLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((CloseLocalHistoryPayload) payload);
+            } else {
+                allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof CreateLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((CreateLocalHistoryPayload)payload);
+            } else {
+                allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+            }
+        } else if (payload instanceof PurgeLocalHistoryPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
+            } else {
+                allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+            }
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
     }
 
+    private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) {
+        if (callback != null) {
+            replicationCallbacks.put(payload, callback);
+        }
+        shard.persistPayload(id, payload, true);
+    }
+
+    private void payloadReplicationComplete(final AbstractIdentifiablePayload<?> payload) {
+        final Runnable callback = replicationCallbacks.remove(payload);
+        if (callback != null) {
+            LOG.debug("{}: replication of {} completed, invoking {}", logContext, payload.getIdentifier(), callback);
+            callback.run();
+        } else {
+            LOG.debug("{}: replication of {} has no callback", logContext, payload.getIdentifier());
+        }
+    }
+
     private void payloadReplicationComplete(final TransactionIdentifier txId) {
         final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
@@ -394,11 +446,30 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
-        ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
+    private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onHistoryCreated(historyId);
+        }
+    }
+
+    private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onHistoryClosed(historyId);
+        }
+    }
+
+    private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) {
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onHistoryPurged(historyId);
+        }
+    }
+
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+        ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
-            chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
-            transactionChains.put(localHistoryIdentifier, chain);
+            chain = new ShardDataTreeTransactionChain(historyId, this);
+            transactionChains.put(historyId, chain);
+            shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true);
         }
 
         return chain;
@@ -446,6 +517,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
+    /**
+     * Immediately close all transaction chains.
+     */
     void closeAllTransactionChains() {
         for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
             chain.close();
@@ -454,13 +528,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         transactionChains.clear();
     }
 
-    void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) {
-        final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
-        if (chain != null) {
-            chain.close();
-        } else {
-            LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId);
+    /**
+     * Close a single transaction chain.
+     *
+     * @param id History identifier
+     * @param callback Callback to invoke upon completion, may be null
+     */
+    void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+        final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+        if (chain == null) {
+            LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id);
+            if (callback != null) {
+                callback.run();
+            }
+            return;
         }
+
+        chain.close();
+        replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+    }
+
+    /**
+     * Purge a single transaction chain.
+     *
+     * @param id History identifier
+     * @param callback Callback to invoke upon completion, may be null
+     */
+    void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+        final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
+        if (chain == null) {
+            LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
+            if (callback != null) {
+                callback.run();
+            }
+            return;
+        }
+
+        replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
     }
 
     Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
@@ -570,7 +674,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
-                entry.lastAccess = shard.ticker().read();
+                entry.lastAccess = ticker().read();
                 return;
             } catch (ConflictingModificationAppliedException e) {
                 LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
@@ -600,7 +704,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingTransaction();
     }
 
-    private void processNextPending(Queue<CommitEntry> queue, State allowedState, Consumer<CommitEntry> processor) {
+    private void processNextPending(final Queue<CommitEntry> queue, final State allowedState,
+            final Consumer<CommitEntry> processor) {
         while (!queue.isEmpty()) {
             final CommitEntry entry = queue.peek();
             final SimpleShardDataTreeCohort cohort = entry.cohort;
@@ -669,7 +774,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         // Set the tip of the data tree.
         tip = Verify.verifyNotNull(candidate);
 
-        entry.lastAccess = shard.ticker().read();
+        entry.lastAccess = ticker().read();
 
         pendingTransactions.remove();
         pendingCommits.add(entry);
@@ -785,14 +890,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             final DataTreeModification modification) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
                 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
-        pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read()));
+        pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
         return cohort;
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
-        final long now = shard.ticker().read();
+        final long now = ticker().read();
 
         final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
             !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
@@ -904,7 +1009,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void rebaseTransactions(Iterator<CommitEntry> iter, @Nonnull TipProducingDataTreeTip newTip) {
+    private void rebaseTransactions(final Iterator<CommitEntry> iter, @Nonnull final TipProducingDataTreeTip newTip) {
         tip = Preconditions.checkNotNull(newTip);
         while (iter.hasNext()) {
             final SimpleShardDataTreeCohort cohort = iter.next().cohort;