Add supervisor to EOS singleton actor
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 428cf8467c8188170d7985341118940a0186c13e..c1b83923a662d0753cef52f67ff85e7919098b48 100644 (file)
@@ -65,7 +65,9 @@ import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionP
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -74,22 +76,23 @@ import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeTip;
+import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.tree.api.ModificationType;
+import org.opendaylight.yangtools.yang.data.tree.api.TreeType;
+import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -220,8 +223,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
         dataTree.setEffectiveModelContext(newSchemaContext);
-        this.schemaContext = newSchemaContext;
-        this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
+        schemaContext = newSchemaContext;
+        dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
     }
 
     final void resetTransactionBatch() {
@@ -385,6 +388,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof SkipTransactionsPayload) {
+            allMetadataSkipTransactions((SkipTransactionsPayload) payload);
         } else {
             LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
         }
@@ -473,6 +478,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
             }
             allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof SkipTransactionsPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((SkipTransactionsPayload)payload);
+            }
+            allMetadataSkipTransactions((SkipTransactionsPayload) payload);
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
@@ -569,6 +579,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
+    private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) {
+        final var historyId = payload.getIdentifier();
+        final var txIds = payload.getTransactionIds();
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionsSkipped(historyId, txIds);
+        }
+    }
+
     /**
      * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
      * this method is used for re-establishing state when we are taking over
@@ -695,6 +713,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
+    final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds,
+            final Runnable callback) {
+        final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+        if (chain == null) {
+            LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id);
+            if (callback != null) {
+                callback.run();
+            }
+            return;
+        }
+
+        replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds,
+            shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+    }
+
     final Optional<DataTreeCandidate> readCurrentData() {
         return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
                 .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
@@ -1002,9 +1035,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
-        cohort.userPreCommit(candidate, new FutureCallback<Void>() {
+        cohort.userPreCommit(candidate, new FutureCallback<>() {
             @Override
-            public void onSuccess(final Void noop) {
+            public void onSuccess(final Empty result) {
                 // Set the tip of the data tree.
                 tip = verifyNotNull(candidate);