X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=4aa7a7b786b6a0b925c4233a9043db59cb2be355;hb=refs%2Fchanges%2F49%2F85749%2F63;hp=428cf8467c8188170d7985341118940a0186c13e;hpb=1808c7ff4e755fb475253f5b6c3b5ef627a1bdc0;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 428cf8467c..4aa7a7b786 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -65,7 +65,9 @@ import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionP import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; +import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -220,8 +222,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) { dataTree.setEffectiveModelContext(newSchemaContext); - this.schemaContext = newSchemaContext; - this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); + schemaContext = newSchemaContext; + dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); } final void resetTransactionBatch() { @@ -385,6 +387,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof SkipTransactionsPayload) { + allMetadataSkipTransactions((SkipTransactionsPayload) payload); } else { LOG.debug("{}: ignoring unhandled payload {}", logContext, payload); } @@ -473,6 +477,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { payloadReplicationComplete((PurgeLocalHistoryPayload)payload); } allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof SkipTransactionsPayload) { + if (identifier != null) { + payloadReplicationComplete((SkipTransactionsPayload)payload); + } + allMetadataSkipTransactions((SkipTransactionsPayload) payload); } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } @@ -569,6 +578,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) { + final var historyId = payload.getIdentifier(); + final var txIds = payload.getTransactionIds(); + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionsSkipped(historyId, txIds); + } + } + /** * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)}, * this method is used for re-establishing state when we are taking over @@ -695,6 +712,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } + final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds, + final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.get(id); + if (chain == null) { + LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; + } + + replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds, + shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); + } + final Optional readCurrentData() { return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()) .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));