import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
/**
* Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
}
}
- private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+ private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(FiniteDuration.create(5, TimeUnit.SECONDS));
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
/**
}
}
- private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+ private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign)
throws DataValidationFailedException {
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
final DataTreeCandidate candidate = dataTree.prepare(mod);
dataTree.commit(candidate);
+ allMetadataCommittedTransaction(identifier);
notifyListeners(candidate);
}
* pre-Boron state -- which limits the number of options here.
*/
if (payload instanceof CommitTransactionPayload) {
- final TransactionIdentifier txId;
if (identifier == null) {
final Entry<TransactionIdentifier, DataTreeCandidate> e =
((CommitTransactionPayload) payload).getCandidate();
- txId = e.getKey();
- applyReplicatedCandidate(txId, e.getValue());
+ applyReplicatedCandidate(e.getKey(), e.getValue());
} else {
Verify.verify(identifier instanceof TransactionIdentifier);
- txId = (TransactionIdentifier) identifier;
- payloadReplicationComplete(txId);
+ payloadReplicationComplete((TransactionIdentifier) identifier);
}
- allMetadataCommittedTransaction(txId);
} else if (payload instanceof AbortTransactionPayload) {
if (identifier != null) {
payloadReplicationComplete((AbortTransactionPayload) payload);
final CommitEntry current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ allMetadataCommittedTransaction(txId);
return;
}
if (!current.cohort.getIdentifier().equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
current.cohort.getIdentifier(), txId);
+ allMetadataCommittedTransaction(txId);
return;
}
if (chain == null) {
chain = new ShardDataTreeTransactionChain(historyId, this);
transactionChains.put(historyId, chain);
- replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+ replicatePayload(historyId, CreateLocalHistoryPayload.create(
+ historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
} else if (callback != null) {
callback.run();
}
}
chain.close();
- replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+ replicatePayload(id, CloseLocalHistoryPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
/**
return;
}
- replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
+ replicatePayload(id, PurgeLocalHistoryPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
Optional<DataTreeCandidate> readCurrentData() {
void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
- replicatePayload(id, AbortTransactionPayload.create(id), callback);
+ replicatePayload(id, AbortTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
@Override
void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
LOG.debug("{}: purging transaction {}", logContext, id);
- replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+ replicatePayload(id, PurgeTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
processNextPendingTransaction();
}
- private void insertEntry(final Deque<CommitEntry> queue, final CommitEntry entry, final int atIndex) {
+ private static void insertEntry(final Deque<CommitEntry> queue, final CommitEntry entry, final int atIndex) {
if (atIndex == 0) {
queue.addFirst(entry);
return;
return;
}
+ allMetadataCommittedTransaction(txId);
shard.getShardMBean().incrementCommittedTransactionCount();
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
final TransactionIdentifier txId = cohort.getIdentifier();
final Payload payload;
try {
- payload = CommitTransactionPayload.create(txId, candidate);
+ payload = CommitTransactionPayload.create(txId, candidate,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
} catch (IOException e) {
LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
pendingCommits.poll().cohort.failedCommit(e);