import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload;
import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
dataTree.setEffectiveModelContext(newSchemaContext);
- this.schemaContext = newSchemaContext;
- this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
+ schemaContext = newSchemaContext;
+ dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
void resetTransactionBatch() {
allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
} else if (payload instanceof PurgeLocalHistoryPayload) {
allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload) {
+ allMetadataSkipTransactions((SkipTransactionsPayload) payload);
} else {
LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
}
payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
}
allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((SkipTransactionsPayload)payload);
+ }
+ allMetadataSkipTransactions((SkipTransactionsPayload) payload);
} else {
LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
}
}
}
+ private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) {
+ final var historyId = payload.getIdentifier();
+ final var txIds = payload.getTransactionIds();
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionsSkipped(historyId, txIds);
+ }
+ }
+
/**
* Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
* this method is used for re-establishing state when we are taking over
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
- Optional<DataTreeCandidate> readCurrentData() {
+ final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds,
+ final Runnable callback) {
+ final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+ if (chain == null) {
+ LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id);
+ if (callback != null) {
+ callback.run();
+ }
+ return;
+ }
+
+ replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+ }
+
+ final Optional<DataTreeCandidate> readCurrentData() {
return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
.map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
}