X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;fp=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=d908c7fc620c7ec58fc7c23bfa6d98ab0d1c3896;hb=abeaf223cadd818e2054b516e39c20305ea144b8;hp=4aad6a53df9bb754e7dcdeed670f770eca52ef28;hpb=87979c22dfbeb25924600a7d3cc6e875edb82c5e;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 4aad6a53df..d908c7fc62 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -66,7 +66,9 @@ import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionP import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; +import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -218,8 +220,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) { dataTree.setEffectiveModelContext(newSchemaContext); - this.schemaContext = newSchemaContext; - this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); + schemaContext = newSchemaContext; + dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); } void resetTransactionBatch() { @@ -383,6 +385,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof SkipTransactionsPayload) { + allMetadataSkipTransactions((SkipTransactionsPayload) payload); } else { LOG.debug("{}: ignoring unhandled payload {}", logContext, payload); } @@ -471,6 +475,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { payloadReplicationComplete((PurgeLocalHistoryPayload)payload); } allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof SkipTransactionsPayload) { + if (identifier != null) { + payloadReplicationComplete((SkipTransactionsPayload)payload); + } + allMetadataSkipTransactions((SkipTransactionsPayload) payload); } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } @@ -567,6 +576,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) { + final var historyId = payload.getIdentifier(); + final var txIds = payload.getTransactionIds(); + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionsSkipped(historyId, txIds); + } + } + /** * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)}, * this method is used for re-establishing state when we are taking over @@ -693,7 +710,22 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } - Optional readCurrentData() { + final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds, + final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.get(id); + if (chain == null) { + LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; + } + + replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds, + shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); + } + + final Optional readCurrentData() { return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()) .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state)); }