BUG-2138: Allow creation of prefixed ShardDataTrees
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index bc9549a64071649daf33ee06e66a2a78dcd0511e..89c381a4ca60ef772f3bef5bc9e73d099828787d 100644 (file)
@@ -64,9 +64,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -108,6 +110,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private final Shard shard;
     private Runnable runOnPendingTransactionsComplete;
 
+    /**
+     * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a
+     * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
+     * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current
+     * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself.
+     */
+    private TipProducingDataTreeTip tip;
+
     private SchemaContext schemaContext;
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
@@ -122,18 +132,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
         this.logContext = Preconditions.checkNotNull(logContext);
         this.metadata = ImmutableList.copyOf(metadata);
+        tip = dataTree;
     }
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+            final YangInstanceIdentifier root,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
-        this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType),
+        this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root),
                 treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
     }
 
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
-        this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
+        this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
+                new DefaultShardDataTreeChangeListenerPublisher(),
                 new DefaultShardDataChangeListenerPublisher(), "");
     }
 
@@ -505,6 +518,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     @Deprecated
     public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
+        // Direct modification commit is a utility, which cannot be used while we have transactions in-flight
+        Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending");
+
         modification.ready();
         dataTree.validate(modification);
         DataTreeCandidate candidate = dataTree.prepare(modification);
@@ -519,6 +535,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
 
         pendingTransactions.clear();
+        tip = dataTree;
         return ret;
     }
 
@@ -536,7 +553,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
-                dataTree.validate(modification);
+                tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
                 entry.lastAccess = shard.ticker().read();
@@ -591,7 +608,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
         final DataTreeCandidateTip candidate;
         try {
-            candidate = dataTree.prepare(cohort.getDataTreeModification());
+            candidate = tip.prepare(cohort.getDataTreeModification());
         } catch (Exception e) {
             failPreCommit(e);
             return;
@@ -604,6 +621,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        // Set the tip of the data tree.
+        tip = Verify.verifyNotNull(candidate);
+
         entry.lastAccess = shard.ticker().read();
         cohort.successfulPreCommit(candidate);
     }
@@ -629,6 +649,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        // All pending candidates have been committed, reset the tip to the data tree
+        if (tip == candidate) {
+            tip = dataTree;
+        }
+
         shard.getShardMBean().incrementCommittedTransactionCount();
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
@@ -745,39 +770,77 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    void startAbort(final SimpleShardDataTreeCohort cohort) {
+    boolean startAbort(final SimpleShardDataTreeCohort cohort) {
         final Iterator<CommitEntry> it = pendingTransactions.iterator();
         if (!it.hasNext()) {
             LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
-            return;
+            return true;
         }
 
         // First entry is special, as it may already be committing
         final CommitEntry first = it.next();
         if (cohort.equals(first.cohort)) {
             if (cohort.getState() != State.COMMIT_PENDING) {
-                LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+                LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
                     cohort.getIdentifier());
 
-                pendingTransactions.remove();
+                it.remove();
+                rebasePreCommittedTransactions(it, dataTree);
                 processNextTransaction();
-            } else {
-                LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+                return true;
             }
 
-            return;
+            LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+            return false;
         }
 
+        TipProducingDataTreeTip newTip = dataTree;
         while (it.hasNext()) {
             final CommitEntry e = it.next();
             if (cohort.equals(e.cohort)) {
                 LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
                 it.remove();
-                return;
+                rebasePreCommittedTransactions(it, newTip);
+                return true;
+            } else {
+                newTip = cohort.getCandidate();
             }
         }
 
         LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+        return true;
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void rebasePreCommittedTransactions(Iterator<CommitEntry> iter, TipProducingDataTreeTip newTip) {
+        tip = newTip;
+        while (iter.hasNext()) {
+            final SimpleShardDataTreeCohort cohort = iter.next().cohort;
+            if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
+                LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+
+                try {
+                    tip.validate(cohort.getDataTreeModification());
+                } catch (DataValidationFailedException | RuntimeException e) {
+                    LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+                    cohort.reportFailure(e);
+                }
+            } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
+                LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+
+                try {
+                    tip.validate(cohort.getDataTreeModification());
+                    DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
+                    cohort.userPreCommit(candidate);
+
+                    cohort.setNewCandidate(candidate);
+                    tip = candidate;
+                } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+                    LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+                    cohort.reportFailure(e);
+                }
+            }
+        }
     }
 
     void setRunOnPendingTransactionsComplete(final Runnable operation) {