import akka.actor.ActorRef;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
+import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
+import scala.concurrent.duration.Duration;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
private final ShardDataTree dataTree;
+ private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
+
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
private Runnable runOnPendingTransactionsComplete;
- ShardCommitCoordinator(ShardDataTree dataTree,
- long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
+
+ private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+
+ ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
+ String name) {
this.queueCapacity = queueCapacity;
this.log = log;
} else {
cohortCache.remove(cohortEntry.getTransactionID());
- RuntimeException ex = new RuntimeException(
+ final RuntimeException ex = new RuntimeException(
String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
" capacity %d has been reached.",
name, cohortEntry.getTransactionID(), queueCapacity));
* @param ready the ForwardedReadyTransaction message to process
* @param sender the sender of the message
* @param shard the transaction's shard actor
+ * @param schema
*/
- void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
+ void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
+ SchemaContext schema) {
log.debug("{}: Readying transaction {}, client version {}", name,
ready.getTransactionID(), ready.getTxnClientVersion());
- ShardDataTreeCohort cohort = ready.getTransaction().ready();
- CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
+ final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+ final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
cohortCache.put(ready.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
* @param sender the sender of the message
* @param shard the transaction's shard actor
*/
- void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
+ void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
- dataTree.newReadWriteTransaction(batched.getTransactionID(),
- batched.getTransactionChainID()), batched.getVersion());
+ dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
+ cohortRegistry, schema, batched.getVersion());
cohortCache.put(batched.getTransactionID(), cohortEntry);
}
/**
* This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
- * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
+ * been prepared beforehand by the sender and we just need to drive them through into the
+ * dataTree.
*
* @param message the ReadyLocalTransaction message to process
* @param sender the sender of the message
* @param shard the transaction's shard actor
*/
- void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
+ void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard,
+ SchemaContext schema) {
final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
message.getTransactionID());
- final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+ final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
DataStoreVersions.CURRENT_VERSION);
cohortCache.put(message.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
private List<CohortEntry> getAndClearPendingCohortEntries() {
List<CohortEntry> cohortEntries = new ArrayList<>();
+
if(currentCohortEntry != null) {
cohortEntries.add(currentCohortEntry);
cohortCache.remove(currentCohortEntry.getTransactionID());
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
cohortEntry.getClientVersion(), ""));
- }
+ }
return newModifications.getLast();
}
private void maybeProcessNextCohortEntry() {
// Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
// clean out expired entries.
- Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
+ final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
while(iter.hasNext()) {
- CohortEntry next = iter.next();
+ final CohortEntry next = iter.next();
if(next.isReadyToCommit()) {
if(currentCohortEntry == null) {
if(log.isDebugEnabled()) {
this.cohortDecorator = cohortDecorator;
}
+ void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
+ cohortRegistry.process(sender, message);
+ }
+
static class CohortEntry {
enum State {
PENDING,
PRE_COMMITTED,
COMMITTED,
ABORTED
- }
+ }
private final String transactionID;
private ShardDataTreeCohort cohort;
private int totalBatchedModificationsReceived;
private State state = State.PENDING;
private final short clientVersion;
+ private final CompositeDataTreeCohort userCohorts;
- CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
+ CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
+ DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
this.transactionID = transactionID;
this.clientVersion = clientVersion;
+ this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
- CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
+ CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
+ SchemaContext schema, short clientVersion) {
this.transactionID = transactionID;
this.cohort = cohort;
this.transaction = null;
this.clientVersion = clientVersion;
+ this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
void updateLastAccessTime() {
return cohort.canCommit().get();
}
- void preCommit() throws InterruptedException, ExecutionException {
+
+
+ void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
state = State.PRE_COMMITTED;
cohort.preCommit().get();
+ userCohorts.canCommit(cohort.getCandidate());
+ userCohorts.preCommit();
}
- void commit() throws InterruptedException, ExecutionException {
+ void commit() throws InterruptedException, ExecutionException, TimeoutException {
state = State.COMMITTED;
cohort.commit().get();
+ userCohorts.commit();
}
- void abort() throws InterruptedException, ExecutionException {
+ void abort() throws InterruptedException, ExecutionException, TimeoutException {
state = State.ABORTED;
cohort.abort().get();
+ userCohorts.abort();
}
void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
.append(doImmediateCommit).append("]");
return builder.toString();