import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
/**
* Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
}
}
- private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+ private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(FiniteDuration.create(5, TimeUnit.SECONDS));
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
/**
private DataTreeTip tip;
private SchemaContext schemaContext;
+ private DataSchemaContextTree dataSchemaContext;
private int currentTransactionBatch;
void updateSchemaContext(final SchemaContext newSchemaContext) {
dataTree.setSchemaContext(newSchemaContext);
this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
+ this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
void resetTransactionBatch() {
}
private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
- return new PruningDataTreeModification(delegate, dataTree, schemaContext);
+ return new PruningDataTreeModification(delegate, dataTree, dataSchemaContext);
}
private static DataTreeModification unwrap(final DataTreeModification modification) {
}
}
- private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+ private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign)
throws DataValidationFailedException {
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
final DataTreeCandidate candidate = dataTree.prepare(mod);
dataTree.commit(candidate);
+ allMetadataCommittedTransaction(identifier);
notifyListeners(candidate);
}
* pre-Boron state -- which limits the number of options here.
*/
if (payload instanceof CommitTransactionPayload) {
- final TransactionIdentifier txId;
if (identifier == null) {
final Entry<TransactionIdentifier, DataTreeCandidate> e =
((CommitTransactionPayload) payload).getCandidate();
- txId = e.getKey();
- applyReplicatedCandidate(txId, e.getValue());
+ applyReplicatedCandidate(e.getKey(), e.getValue());
} else {
Verify.verify(identifier instanceof TransactionIdentifier);
- txId = (TransactionIdentifier) identifier;
- payloadReplicationComplete(txId);
+ payloadReplicationComplete((TransactionIdentifier) identifier);
}
- allMetadataCommittedTransaction(txId);
} else if (payload instanceof AbortTransactionPayload) {
if (identifier != null) {
payloadReplicationComplete((AbortTransactionPayload) payload);
final CommitEntry current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ allMetadataCommittedTransaction(txId);
return;
}
if (!current.cohort.getIdentifier().equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
current.cohort.getIdentifier(), txId);
+ allMetadataCommittedTransaction(txId);
return;
}
if (chain == null) {
chain = new ShardDataTreeTransactionChain(historyId, this);
transactionChains.put(historyId, chain);
- replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+ replicatePayload(historyId, CreateLocalHistoryPayload.create(
+ historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
} else if (callback != null) {
callback.run();
}
* @param callback Callback to invoke upon completion, may be null
*/
void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+ if (commonCloseTransactionChain(id, callback)) {
+ replicatePayload(id, CloseLocalHistoryPayload.create(id,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+ }
+ }
+
+ /**
+ * Close a single transaction chain which is received through ask-based protocol. It does not keep a commit record.
+ *
+ * @param id History identifier
+ */
+ void closeTransactionChain(final LocalHistoryIdentifier id) {
+ commonCloseTransactionChain(id, null);
+ }
+
+ private boolean commonCloseTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
final ShardDataTreeTransactionChain chain = transactionChains.get(id);
if (chain == null) {
LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id);
if (callback != null) {
callback.run();
}
- return;
+ return false;
}
chain.close();
- replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+ return true;
}
/**
return;
}
- replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
+ replicatePayload(id, PurgeLocalHistoryPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
Optional<DataTreeCandidate> readCurrentData() {
void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
- replicatePayload(id, AbortTransactionPayload.create(id), callback);
+ replicatePayload(id, AbortTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
@Override
ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
final java.util.Optional<SortedSet<String>> participatingShardNames) {
final DataTreeModification snapshot = transaction.getSnapshot();
+ final TransactionIdentifier id = transaction.getIdentifier();
+ LOG.debug("{}: readying transaction {}", logContext, id);
snapshot.ready();
+ LOG.debug("{}: transaction {} ready", logContext, id);
return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
}
void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
LOG.debug("{}: purging transaction {}", logContext, id);
- replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+ replicatePayload(id, PurgeTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
processNextPendingTransaction();
}
- private void insertEntry(final Deque<CommitEntry> queue, final CommitEntry entry, final int atIndex) {
+ private static void insertEntry(final Deque<CommitEntry> queue, final CommitEntry entry, final int atIndex) {
if (atIndex == 0) {
queue.addFirst(entry);
return;
final SimpleShardDataTreeCohort current = entry.cohort;
Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
- LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier());
+ final TransactionIdentifier currentId = current.getIdentifier();
+ LOG.debug("{}: Preparing transaction {}", logContext, currentId);
final DataTreeCandidateTip candidate;
try {
candidate = tip.prepare(cohort.getDataTreeModification());
+ LOG.debug("{}: Transaction {} candidate ready", logContext, currentId);
} catch (RuntimeException e) {
failPreCommit(e);
return;
pendingTransactions.remove();
pendingCommits.add(entry);
- LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+ LOG.debug("{}: Transaction {} prepared", logContext, currentId);
cohort.successfulPreCommit(candidate);
return;
}
+ allMetadataCommittedTransaction(txId);
shard.getShardMBean().incrementCommittedTransactionCount();
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
final TransactionIdentifier txId = cohort.getIdentifier();
final Payload payload;
try {
- payload = CommitTransactionPayload.create(txId, candidate);
+ payload = CommitTransactionPayload.create(txId, candidate,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
} catch (IOException e) {
LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
pendingCommits.poll().cohort.failedCommit(e);