import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload;
import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeTip;
+import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.tree.api.ModificationType;
+import org.opendaylight.yangtools.yang.data.tree.api.TreeType;
+import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
dataTree.setEffectiveModelContext(newSchemaContext);
- this.schemaContext = newSchemaContext;
- this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
+ schemaContext = newSchemaContext;
+ dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
final void resetTransactionBatch() {
* @return A state snapshot
*/
@NonNull ShardDataTreeSnapshot takeStateSnapshot() {
- final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
+ final NormalizedNode rootNode = takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
ImmutableMap.builder();
}
}
- final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
+ final DataTreeModification unwrapped = newModification();
final DataTreeModification mod = wrapper.apply(unwrapped);
// delete everything first
mod.delete(YangInstanceIdentifier.empty());
@SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
- final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
+ final DataTreeModification unwrapped = newModification();
final PruningDataTreeModification mod = createPruningModification(unwrapped,
NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0);
allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
} else if (payload instanceof PurgeLocalHistoryPayload) {
allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload) {
+ allMetadataSkipTransactions((SkipTransactionsPayload) payload);
} else {
LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
}
final TransactionIdentifier identifier = entry.getKey();
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
- final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+ final DataTreeModification mod = newModification();
// TODO: check version here, which will enable us to perform forward-compatibility transformations
DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
mod.ready();
payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
}
allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((SkipTransactionsPayload)payload);
+ }
+ allMetadataSkipTransactions((SkipTransactionsPayload) payload);
} else {
LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
}
}
}
+ private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) {
+ final var historyId = payload.getIdentifier();
+ final var txIds = payload.getTransactionIds();
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionsSkipped(historyId, txIds);
+ }
+ }
+
/**
* Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
* this method is used for re-establishing state when we are taking over
return chain;
}
- final ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+ final @NonNull ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
shard.getShardMBean().incrementReadOnlyTransactionCount();
- if (txId.getHistoryId().getHistoryId() == 0) {
- return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
- }
+ final var historyId = txId.getHistoryId();
+ return historyId.getHistoryId() == 0 ? newStandaloneReadOnlyTransaction(txId)
+ : ensureTransactionChain(historyId, null).newReadOnlyTransaction(txId);
+ }
- return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
+ final @NonNull ReadOnlyShardDataTreeTransaction newStandaloneReadOnlyTransaction(final TransactionIdentifier txId) {
+ return new ReadOnlyShardDataTreeTransaction(this, txId, takeSnapshot());
}
- final ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+ final @NonNull ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
shard.getShardMBean().incrementReadWriteTransactionCount();
- if (txId.getHistoryId().getHistoryId() == 0) {
- return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot()
- .newModification());
- }
+ final var historyId = txId.getHistoryId();
+ return historyId.getHistoryId() == 0 ? newStandaloneReadWriteTransaction(txId)
+ : ensureTransactionChain(historyId, null).newReadWriteTransaction(txId);
+ }
- return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId);
+ final @NonNull ReadWriteShardDataTreeTransaction newStandaloneReadWriteTransaction(
+ final TransactionIdentifier txId) {
+ return new ReadWriteShardDataTreeTransaction(this, txId, newModification());
}
@VisibleForTesting
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
+ final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds,
+ final Runnable callback) {
+ final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+ if (chain == null) {
+ LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id);
+ if (callback != null) {
+ callback.run();
+ }
+ return;
+ }
+
+ replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+ }
+
final Optional<DataTreeCandidate> readCurrentData() {
- return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
- .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
+ return readNode(YangInstanceIdentifier.empty())
+ .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
}
final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
@VisibleForTesting
public final Optional<NormalizedNode> readNode(final YangInstanceIdentifier path) {
- return dataTree.takeSnapshot().readNode(path);
+ return takeSnapshot().readNode(path);
}
final DataTreeSnapshot takeSnapshot() {
@VisibleForTesting
final DataTreeModification newModification() {
- return dataTree.takeSnapshot().newModification();
+ return takeSnapshot().newModification();
}
final Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
return;
}
- cohort.userPreCommit(candidate, new FutureCallback<Void>() {
+ cohort.userPreCommit(candidate, new FutureCallback<>() {
@Override
- public void onSuccess(final Void noop) {
+ public void onSuccess(final Empty result) {
// Set the tip of the data tree.
tip = verifyNotNull(candidate);
// the newReadWriteTransaction()
final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Optional<SortedSet<String>> participatingShardNames) {
- if (txId.getHistoryId().getHistoryId() == 0) {
+ final var historyId = txId.getHistoryId();
+ if (historyId.getHistoryId() == 0) {
return createReadyCohort(txId, mod, participatingShardNames);
}
-
- return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames);
+ return ensureTransactionChain(historyId, null).createReadyCohort(txId, mod, participatingShardNames);
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")