BUG-2138: Allow creation of prefixed ShardDataTrees
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index bf3b200825bc0c07c2283e7ceaf5d0c25363561a..89c381a4ca60ef772f3bef5bc9e73d099828787d 100644 (file)
@@ -13,11 +13,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
 import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.primitives.UnsignedLong;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
@@ -62,9 +64,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -77,6 +81,7 @@ import scala.concurrent.duration.Duration;
  * e.g. it does not expose public interfaces and assumes it is only ever called from a
  * single thread.
  *
+ * <p>
  * This class is not part of the API contract and is subject to change at any time.
  */
 @NotThreadSafe
@@ -105,6 +110,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private final Shard shard;
     private Runnable runOnPendingTransactionsComplete;
 
+    /**
+     * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a
+     * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
+     * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current
+     * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself.
+     */
+    private TipProducingDataTreeTip tip;
+
     private SchemaContext schemaContext;
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
@@ -119,25 +132,32 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
         this.logContext = Preconditions.checkNotNull(logContext);
         this.metadata = ImmutableList.copyOf(metadata);
+        tip = dataTree;
     }
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+            final YangInstanceIdentifier root,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
-        this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType),
+        this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root),
                 treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
     }
 
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
-        this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
+        this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
+                new DefaultShardDataTreeChangeListenerPublisher(),
                 new DefaultShardDataChangeListenerPublisher(), "");
     }
 
-    String logContext() {
+    final String logContext() {
         return logContext;
     }
 
+    final Ticker ticker() {
+        return shard.ticker();
+    }
+
     public TipProducingDataTree getDataTree() {
         return dataTree;
     }
@@ -146,9 +166,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return schemaContext;
     }
 
-    void updateSchemaContext(final SchemaContext schemaContext) {
-        dataTree.setSchemaContext(schemaContext);
-        this.schemaContext = Preconditions.checkNotNull(schemaContext);
+    void updateSchemaContext(final SchemaContext newSchemaContext) {
+        dataTree.setSchemaContext(newSchemaContext);
+        this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
     }
 
     /**
@@ -171,7 +191,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
     }
 
-    private void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot,
+    private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot,
             final UnaryOperator<DataTreeModification> wrapper) throws DataValidationFailedException {
         final Stopwatch elapsed = Stopwatch.createStarted();
 
@@ -215,6 +235,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
     }
 
+    /**
+     * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+     * does not perform any pruning.
+     *
+     * @param snapshot Snapshot that needs to be applied
+     * @throws DataValidationFailedException when the snapshot fails to apply
+     */
+    void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+        applySnapshot(snapshot, UnaryOperator.identity());
+    }
+
     private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
         return new PruningDataTreeModification(delegate, dataTree, schemaContext);
     }
@@ -237,18 +268,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         applySnapshot(snapshot, this::wrapWithPruning);
     }
 
-
-    /**
-     * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
-     * does not perform any pruning.
-     *
-     * @param snapshot Snapshot that needs to be applied
-     * @throws DataValidationFailedException when the snapshot fails to apply
-     */
-    void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
-        applySnapshot(snapshot, UnaryOperator.identity());
-    }
-
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
         final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
         DataTreeCandidates.applyToModification(mod, candidate);
@@ -280,7 +300,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      */
     void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
         if (payload instanceof CommitTransactionPayload) {
-            final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+            final Entry<TransactionIdentifier, DataTreeCandidate> e =
+                    ((CommitTransactionPayload) payload).getCandidate();
             applyRecoveryCandidate(e.getValue());
             allMetadataCommittedTransaction(e.getKey());
         } else if (payload instanceof DataTreeCandidatePayload) {
@@ -330,7 +351,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
          */
         if (payload instanceof CommitTransactionPayload) {
             if (identifier == null) {
-                final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+                final Entry<TransactionIdentifier, DataTreeCandidate> e =
+                        ((CommitTransactionPayload) payload).getCandidate();
                 applyReplicatedCandidate(e.getKey(), e.getValue());
                 allMetadataCommittedTransaction(e.getKey());
             } else {
@@ -364,7 +386,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
         ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
@@ -437,22 +459,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
                     final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
                     final DataChangeScope scope) {
-        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+        DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
                 dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
 
         return new SimpleEntry<>(reg, readCurrentData());
     }
 
     private Optional<DataTreeCandidate> readCurrentData() {
-        final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
+        final Optional<NormalizedNode<?, ?>> currentState =
+                dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
         return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
             YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
     }
 
-    public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
-            final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
-        final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangeListenerPublisher.registerTreeChangeListener(
-                path, listener);
+    public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
+            registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
+        final ListenerRegistration<DOMDataTreeChangeListener> reg =
+                treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
 
         return new SimpleEntry<>(reg, readCurrentData());
     }
@@ -471,7 +494,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
 
-        return createReadyCohort(transaction.getId(), snapshot);
+        return createReadyCohort(transaction.getIdentifier(), snapshot);
     }
 
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
@@ -488,11 +511,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     /**
+     * Commits a modification.
+     *
      * @deprecated This method violates DataTree containment and will be removed.
      */
     @VisibleForTesting
     @Deprecated
     public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
+        // Direct modification commit is a utility, which cannot be used while we have transactions in-flight
+        Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending");
+
         modification.ready();
         dataTree.validate(modification);
         DataTreeCandidate candidate = dataTree.prepare(modification);
@@ -502,28 +530,30 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
         Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
-        for(CommitEntry entry: pendingTransactions) {
+        for (CommitEntry entry: pendingTransactions) {
             ret.add(entry.cohort);
         }
 
         pendingTransactions.clear();
+        tip = dataTree;
         return ret;
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void processNextTransaction() {
         while (!pendingTransactions.isEmpty()) {
             final CommitEntry entry = pendingTransactions.peek();
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
 
-            if(cohort.getState() != State.CAN_COMMIT_PENDING) {
+            if (cohort.getState() != State.CAN_COMMIT_PENDING) {
                 break;
             }
 
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
-                dataTree.validate(modification);
+                tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
                 entry.lastAccess = shard.ticker().read();
@@ -538,7 +568,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
                 // For debugging purposes, allow dumping of the modification. Coupled with the above
                 // precondition log, it should allow us to understand what went on.
-                LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
+                LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
+                        dataTree);
                 cause = new TransactionCommitFailedException("Data did not pass validation.", e);
             } catch (Exception e) {
                 LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
@@ -568,6 +599,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextTransaction();
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     void startPreCommit(final SimpleShardDataTreeCohort cohort) {
         final CommitEntry entry = pendingTransactions.peek();
         Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
@@ -576,7 +608,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
         final DataTreeCandidateTip candidate;
         try {
-            candidate = dataTree.prepare(cohort.getDataTreeModification());
+            candidate = tip.prepare(cohort.getDataTreeModification());
         } catch (Exception e) {
             failPreCommit(e);
             return;
@@ -589,6 +621,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        // Set the tip of the data tree.
+        tip = Verify.verifyNotNull(candidate);
+
         entry.lastAccess = shard.ticker().read();
         cohort.successfulPreCommit(candidate);
     }
@@ -599,6 +634,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextTransaction();
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void finishCommit(final SimpleShardDataTreeCohort cohort) {
         final TransactionIdentifier txId = cohort.getIdentifier();
         final DataTreeCandidate candidate = cohort.getCandidate();
@@ -613,6 +649,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        // All pending candidates have been committed, reset the tip to the data tree
+        if (tip == candidate) {
+            tip = dataTree;
+        }
+
         shard.getShardMBean().incrementCommittedTransactionCount();
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
@@ -658,6 +699,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         cohortRegistry.process(sender, message);
     }
 
+    @Override
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
             final DataTreeModification modification) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
@@ -666,6 +708,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return cohort;
     }
 
+    @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
         final long now = shard.ticker().read();
@@ -676,13 +719,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             boolean processNext = true;
             switch (currentTx.cohort.getState()) {
                 case CAN_COMMIT_PENDING:
-                    pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
+                    pendingTransactions.remove().cohort.failedCanCommit(new TimeoutException());
                     break;
                 case CAN_COMMIT_COMPLETE:
-                    pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+                    // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+                    // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+                    // in PRE_COMMIT_COMPLETE is changed.
+                    pendingTransactions.remove().cohort.reportFailure(new TimeoutException());
                     break;
                 case PRE_COMMIT_PENDING:
-                    pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException());
+                    pendingTransactions.remove().cohort.failedPreCommit(new TimeoutException());
                     break;
                 case PRE_COMMIT_COMPLETE:
                     // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
@@ -702,7 +748,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     //        In order to make the pre-commit timer working across failovers, though, we need
                     //        a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
                     //        restart the timer.
-                    pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+                    pendingTransactions.remove().cohort.reportFailure(new TimeoutException());
                     break;
                 case COMMIT_PENDING:
                     LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
@@ -715,7 +761,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 case FAILED:
                 case READY:
                 default:
-                    pendingTransactions.poll();
+                    pendingTransactions.remove();
             }
 
             if (processNext) {
@@ -724,38 +770,77 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    void startAbort(final SimpleShardDataTreeCohort cohort) {
+    boolean startAbort(final SimpleShardDataTreeCohort cohort) {
         final Iterator<CommitEntry> it = pendingTransactions.iterator();
         if (!it.hasNext()) {
             LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
-            return;
+            return true;
         }
 
         // First entry is special, as it may already be committing
         final CommitEntry first = it.next();
         if (cohort.equals(first.cohort)) {
             if (cohort.getState() != State.COMMIT_PENDING) {
-                LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+                LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
                     cohort.getIdentifier());
-                pendingTransactions.poll();
+
+                it.remove();
+                rebasePreCommittedTransactions(it, dataTree);
                 processNextTransaction();
-            } else {
-                LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+                return true;
             }
 
-            return;
+            LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+            return false;
         }
 
+        TipProducingDataTreeTip newTip = dataTree;
         while (it.hasNext()) {
             final CommitEntry e = it.next();
             if (cohort.equals(e.cohort)) {
                 LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
                 it.remove();
-                return;
+                rebasePreCommittedTransactions(it, newTip);
+                return true;
+            } else {
+                newTip = cohort.getCandidate();
             }
         }
 
         LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+        return true;
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void rebasePreCommittedTransactions(Iterator<CommitEntry> iter, TipProducingDataTreeTip newTip) {
+        tip = newTip;
+        while (iter.hasNext()) {
+            final SimpleShardDataTreeCohort cohort = iter.next().cohort;
+            if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
+                LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+
+                try {
+                    tip.validate(cohort.getDataTreeModification());
+                } catch (DataValidationFailedException | RuntimeException e) {
+                    LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+                    cohort.reportFailure(e);
+                }
+            } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
+                LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+
+                try {
+                    tip.validate(cohort.getDataTreeModification());
+                    DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
+                    cohort.userPreCommit(candidate);
+
+                    cohort.setNewCandidate(candidate);
+                    tip = candidate;
+                } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+                    LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+                    cohort.reportFailure(e);
+                }
+            }
+        }
     }
 
     void setRunOnPendingTransactionsComplete(final Runnable operation) {
@@ -764,12 +849,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     private void maybeRunOperationOnPendingTransactionsComplete() {
-      if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
-          LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
-                  runOnPendingTransactionsComplete);
-
-          runOnPendingTransactionsComplete.run();
-          runOnPendingTransactionsComplete = null;
-      }
-  }
+        if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+            LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+                    runOnPendingTransactionsComplete);
+
+            runOnPendingTransactionsComplete.run();
+            runOnPendingTransactionsComplete = null;
+        }
+    }
 }