X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=8b826015c4eaf7df45572dd747b5da90ec3dce6e;hb=refs%2Fchanges%2F33%2F78433%2F4;hp=5f1e67e988e60ccca5f434f2e04358cd484ba8b9;hpb=731e7284cf0895fdb1b89427f91762e80e67c2ff;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 5f1e67e988..8b826015c4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -83,7 +83,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFac import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; /** * Internal shard state, similar to a DOMStore, but optimized for use in the actor system, @@ -110,7 +110,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(FiniteDuration.create(5, TimeUnit.SECONDS)); private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); /** @@ -315,7 +315,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException { + private void applyRecoveryCandidate(final DataTreeCandidate candidate) { final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification()); DataTreeCandidates.applyToModification(mod, candidate); mod.ready(); @@ -344,7 +344,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws IOException when the snapshot fails to deserialize * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException, DataValidationFailedException { + void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException { if (payload instanceof CommitTransactionPayload) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); @@ -365,7 +365,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign) + private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); @@ -378,6 +378,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeCandidate candidate = dataTree.prepare(mod); dataTree.commit(candidate); + allMetadataCommittedTransaction(identifier); notifyListeners(candidate); } @@ -404,18 +405,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * pre-Boron state -- which limits the number of options here. */ if (payload instanceof CommitTransactionPayload) { - final TransactionIdentifier txId; if (identifier == null) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); - txId = e.getKey(); - applyReplicatedCandidate(txId, e.getValue()); + applyReplicatedCandidate(e.getKey(), e.getValue()); } else { Verify.verify(identifier instanceof TransactionIdentifier); - txId = (TransactionIdentifier) identifier; - payloadReplicationComplete(txId); + payloadReplicationComplete((TransactionIdentifier) identifier); } - allMetadataCommittedTransaction(txId); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); @@ -467,12 +464,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); + allMetadataCommittedTransaction(txId); return; } if (!current.cohort.getIdentifier().equals(txId)) { LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); + allMetadataCommittedTransaction(txId); return; } @@ -538,7 +537,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (chain == null) { chain = new ShardDataTreeTransactionChain(historyId, this); transactionChains.put(historyId, chain); - replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback); + replicatePayload(historyId, CreateLocalHistoryPayload.create( + historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } else if (callback != null) { callback.run(); } @@ -598,7 +598,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } chain.close(); - replicatePayload(id, CloseLocalHistoryPayload.create(id), callback); + replicatePayload(id, CloseLocalHistoryPayload.create( + id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } /** @@ -617,7 +618,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } - replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); + replicatePayload(id, PurgeLocalHistoryPayload.create( + id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } Optional readCurrentData() { @@ -641,7 +643,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { final TransactionIdentifier id = transaction.getIdentifier(); LOG.debug("{}: aborting transaction {}", logContext, id); - replicatePayload(id, AbortTransactionPayload.create(id), callback); + replicatePayload(id, AbortTransactionPayload.create( + id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } @Override @@ -661,7 +664,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { LOG.debug("{}: purging transaction {}", logContext, id); - replicatePayload(id, PurgeTransactionPayload.create(id), callback); + replicatePayload(id, PurgeTransactionPayload.create( + id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } public Optional> readNode(final YangInstanceIdentifier path) { @@ -738,8 +742,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // For debugging purposes, allow dumping of the modification. Coupled with the above // precondition log, it should allow us to understand what went on. - LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, - dataTree); + LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", logContext, cohort.getIdentifier(), + modification, dataTree); cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e); } catch (Exception e) { LOG.warn("{}: Unexpected failure in validation phase", logContext, e); @@ -873,7 +877,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingTransaction(); } - private void insertEntry(Deque queue, CommitEntry entry, int atIndex) { + private static void insertEntry(final Deque queue, final CommitEntry entry, final int atIndex) { if (atIndex == 0) { queue.addFirst(entry); return; @@ -892,7 +896,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } private Collection extractPrecedingShardNames( - java.util.Optional> participatingShardNames) { + final java.util.Optional> participatingShardNames) { return participatingShardNames.map((Function, Collection>) set -> set.headSet(shard.getShardName())).orElse(Collections.emptyList()); } @@ -972,6 +976,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } + allMetadataCommittedTransaction(txId); shard.getShardMBean().incrementCommittedTransactionCount(); shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -999,7 +1004,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final TransactionIdentifier txId = cohort.getIdentifier(); final Payload payload; try { - payload = CommitTransactionPayload.create(txId, candidate); + payload = CommitTransactionPayload.create(txId, candidate, + shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()); } catch (IOException e) { LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e); pendingCommits.poll().cohort.failedCommit(e); @@ -1057,7 +1063,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, final java.util.Optional> participatingShardNames) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, - cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable), + cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf, COMMIT_STEP_TIMEOUT), participatingShardNames); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort;