Centralize DataTree operations
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 428cf8467c8188170d7985341118940a0186c13e..c4bbe5b411885f8de4c15a189081c7baedad9c4e 100644 (file)
@@ -65,31 +65,34 @@ import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionP
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeTip;
+import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.tree.api.ModificationType;
+import org.opendaylight.yangtools.yang.data.tree.api.TreeType;
+import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -220,8 +223,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
         dataTree.setEffectiveModelContext(newSchemaContext);
-        this.schemaContext = newSchemaContext;
-        this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
+        schemaContext = newSchemaContext;
+        dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
     }
 
     final void resetTransactionBatch() {
@@ -234,7 +237,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @return A state snapshot
      */
     @NonNull ShardDataTreeSnapshot takeStateSnapshot() {
-        final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
+        final NormalizedNode rootNode = takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
         final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
                 ImmutableMap.builder();
 
@@ -276,7 +279,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             }
         }
 
-        final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
+        final DataTreeModification unwrapped = newModification();
         final DataTreeModification mod = wrapper.apply(unwrapped);
         // delete everything first
         mod.delete(YangInstanceIdentifier.empty());
@@ -332,7 +335,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
         final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
-        final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
+        final DataTreeModification unwrapped = newModification();
         final PruningDataTreeModification mod = createPruningModification(unwrapped,
             NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0);
 
@@ -385,6 +388,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof SkipTransactionsPayload) {
+            allMetadataSkipTransactions((SkipTransactionsPayload) payload);
         } else {
             LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
         }
@@ -396,7 +401,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final TransactionIdentifier identifier = entry.getKey();
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
-        final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+        final DataTreeModification mod = newModification();
         // TODO: check version here, which will enable us to perform forward-compatibility transformations
         DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
         mod.ready();
@@ -473,6 +478,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
             }
             allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof SkipTransactionsPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((SkipTransactionsPayload)payload);
+            }
+            allMetadataSkipTransactions((SkipTransactionsPayload) payload);
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
@@ -569,6 +579,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
+    private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) {
+        final var historyId = payload.getIdentifier();
+        final var txIds = payload.getTransactionIds();
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionsSkipped(historyId, txIds);
+        }
+    }
+
     /**
      * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
      * this method is used for re-establishing state when we are taking over
@@ -600,25 +618,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return chain;
     }
 
-    final ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+    final @NonNull ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
         shard.getShardMBean().incrementReadOnlyTransactionCount();
 
-        if (txId.getHistoryId().getHistoryId() == 0) {
-            return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
-        }
+        final var historyId = txId.getHistoryId();
+        return historyId.getHistoryId() == 0 ? newStandaloneReadOnlyTransaction(txId)
+            : ensureTransactionChain(historyId, null).newReadOnlyTransaction(txId);
+    }
 
-        return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
+    final @NonNull ReadOnlyShardDataTreeTransaction newStandaloneReadOnlyTransaction(final TransactionIdentifier txId) {
+        return new ReadOnlyShardDataTreeTransaction(this, txId, takeSnapshot());
     }
 
-    final ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+    final @NonNull ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
         shard.getShardMBean().incrementReadWriteTransactionCount();
 
-        if (txId.getHistoryId().getHistoryId() == 0) {
-            return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot()
-                    .newModification());
-        }
+        final var historyId = txId.getHistoryId();
+        return historyId.getHistoryId() == 0 ? newStandaloneReadWriteTransaction(txId)
+            : ensureTransactionChain(historyId, null).newReadWriteTransaction(txId);
+    }
 
-        return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId);
+    final @NonNull ReadWriteShardDataTreeTransaction newStandaloneReadWriteTransaction(
+            final TransactionIdentifier txId) {
+        return new ReadWriteShardDataTreeTransaction(this, txId, newModification());
     }
 
     @VisibleForTesting
@@ -695,9 +717,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
+    final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds,
+            final Runnable callback) {
+        final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+        if (chain == null) {
+            LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id);
+            if (callback != null) {
+                callback.run();
+            }
+            return;
+        }
+
+        replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds,
+            shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+    }
+
     final Optional<DataTreeCandidate> readCurrentData() {
-        return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
-                .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
+        return readNode(YangInstanceIdentifier.empty())
+            .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
     }
 
     final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
@@ -743,7 +780,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     @VisibleForTesting
     public final Optional<NormalizedNode> readNode(final YangInstanceIdentifier path) {
-        return dataTree.takeSnapshot().readNode(path);
+        return takeSnapshot().readNode(path);
     }
 
     final DataTreeSnapshot takeSnapshot() {
@@ -752,7 +789,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     @VisibleForTesting
     final DataTreeModification newModification() {
-        return dataTree.takeSnapshot().newModification();
+        return takeSnapshot().newModification();
     }
 
     final Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
@@ -1002,9 +1039,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
-        cohort.userPreCommit(candidate, new FutureCallback<Void>() {
+        cohort.userPreCommit(candidate, new FutureCallback<>() {
             @Override
-            public void onSuccess(final Void noop) {
+            public void onSuccess(final Empty result) {
                 // Set the tip of the data tree.
                 tip = verifyNotNull(candidate);
 
@@ -1151,11 +1188,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     // the newReadWriteTransaction()
     final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final Optional<SortedSet<String>> participatingShardNames) {
-        if (txId.getHistoryId().getHistoryId() == 0) {
+        final var historyId = txId.getHistoryId();
+        if (historyId.getHistoryId() == 0) {
             return createReadyCohort(txId, mod, participatingShardNames);
         }
-
-        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames);
+        return ensureTransactionChain(historyId, null).createReadyCohort(txId, mod, participatingShardNames);
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")