raftContext.setLastApplied(replicatedLogEntry.getIndex());
// Apply the state immediately
- applyState(clientActor, identifier, data);
+ self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
// Send a ApplyJournalEntries message so that we write the fact that we applied
// the state to durable storage
new Procedure<ReplicatedLogEntry>() {
@Override
public void apply(ReplicatedLogEntry evt) throws Exception {
+ context.getLogger().debug("{}: persist complete {}", context.getId(), replicatedLogEntry);
+
int logEntrySize = replicatedLogEntry.size();
long dataSizeForCheck = dataSize();
public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
- public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000;
+ public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
public static final boolean DEFAULT_PERSISTENT = true;
public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
- public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100;
+ public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 100;
+ public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private boolean writeOnlyTransactionOptimizationsEnabled = true;
+ private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
public static Set<String> getGlobalDatastoreTypes() {
return globalDatastoreTypes;
this.dataStoreType = other.dataStoreType;
this.shardBatchedModificationCount = other.shardBatchedModificationCount;
this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
+ this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
return writeOnlyTransactionOptimizationsEnabled;
}
+ public long getShardCommitQueueExpiryTimeoutInMillis() {
+ return shardCommitQueueExpiryTimeoutInMillis;
+ }
+
public static class Builder {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
return this;
}
+ public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
+ datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
+ return this;
+ }
+
+ public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
+ datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
+ value, TimeUnit.SECONDS);
+ return this;
+ }
+
public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
return this;
isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
}
- TransactionContextCleanup.track(this, ret);
+ if(parent.getType() == TransactionType.READ_ONLY) {
+ TransactionContextCleanup.track(this, ret);
+ }
+
return ret;
}
}
}
commitCoordinator = new ShardCommitCoordinator(store,
- TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+ datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
setTransactionCommitTimeout();
private void setTransactionCommitTimeout() {
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
- datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
}
public static Props props(final ShardIdentifier name,
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
- long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
- if(elapsed > transactionCommitTimeout) {
+ if(cohortEntry.isExpired(transactionCommitTimeout)) {
LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
}
}
+
+ commitCoordinator.cleanupExpiredCohortEntries();
}
private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
// or if cohortEntry has no modifications
// we can apply modification to the state immediately
if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
- applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
+ applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
} else {
- Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+ Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
DataTreeCandidatePayload.create(candidate));
}
}
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.base.Stopwatch;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
}
- private final Cache<String, CohortEntry> cohortCache;
+ private final Map<String, CohortEntry> cohortCache = new HashMap<>();
private CohortEntry currentCohortEntry;
private final ShardDataTree dataTree;
- private final Queue<CohortEntry> queuedCohortEntries;
+ // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
+ // since this should only be accessed on the shard's dispatcher.
+ private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
private int queueCapacity;
private final String name;
- private final RemovalListener<String, CohortEntry> cacheRemovalListener =
- new RemovalListener<String, CohortEntry>() {
- @Override
- public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
- if(notification.getCause() == RemovalCause.EXPIRED) {
- log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
- }
- }
- };
+ private final long cacheExpiryTimeoutInMillis;
// This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
private CohortDecorator cohortDecorator;
private ReadyTransactionReply readyTransactionReply;
public ShardCommitCoordinator(ShardDataTree dataTree,
- long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
+ long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) {
this.queueCapacity = queueCapacity;
this.log = log;
this.name = name;
this.dataTree = Preconditions.checkNotNull(dataTree);
-
- cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
- removalListener(cacheRemovalListener).build();
-
- // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
- // since this should only be accessed on the shard's dispatcher.
- queuedCohortEntries = new LinkedList<>();
+ this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
}
public void setQueueCapacity(int queueCapacity) {
return readyTransactionReply;
}
+ private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
+ if(queuedCohortEntries.size() < queueCapacity) {
+ queuedCohortEntries.offer(cohortEntry);
+ return true;
+ } else {
+ cohortCache.remove(cohortEntry.getTransactionID());
+
+ RuntimeException ex = new RuntimeException(
+ String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
+ " capacity %d has been reached.",
+ name, cohortEntry.getTransactionID(), queueCapacity));
+ log.error(ex.getMessage());
+ sender.tell(new Status.Failure(ex), shard.self());
+ return false;
+ }
+ }
+
/**
* This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
* the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
(MutableCompositeModification) ready.getModification());
cohortCache.put(ready.getTransactionID(), cohortEntry);
+ if(!queueCohortEntry(cohortEntry, sender, shard)) {
+ return;
+ }
+
if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
// Return our actor path as we'll handle the three phase commit except if the Tx client
// version < Helium-1 version which means the Tx was initiated by a base Helium version node.
*
* @throws ExecutionException if an error occurs loading the cache
*/
- boolean handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
+ void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
throws ExecutionException {
- CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
+ CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
dataTree.newReadWriteTransaction(batched.getTransactionID(),
cohortEntry.applyModifications(batched.getModifications());
if(batched.isReady()) {
+ if(!queueCohortEntry(cohortEntry, sender, shard)) {
+ return;
+ }
+
if(log.isDebugEnabled()) {
log.debug("{}: Readying Tx {}, client version {}", name,
batched.getTransactionID(), batched.getVersion());
} else {
sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
}
-
- return batched.isReady();
}
/**
final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
cohortCache.put(message.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
+
+ if(!queueCohortEntry(cohortEntry, sender, shard)) {
+ return;
+ }
+
log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
if (message.isDoCommitOnReady()) {
private void handleCanCommit(CohortEntry cohortEntry) {
String transactionID = cohortEntry.getTransactionID();
- if(log.isDebugEnabled()) {
- log.debug("{}: Processing canCommit for transaction {} for shard {}",
- name, transactionID, cohortEntry.getShard().self().path());
- }
+ cohortEntry.updateLastAccessTime();
if(currentCohortEntry != null) {
- // There's already a Tx commit in progress - attempt to queue this entry to be
- // committed after the current Tx completes.
- log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
- name, currentCohortEntry.getTransactionID(), transactionID);
+ // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
+ // queue and will get processed after all prior entries complete.
- if(queuedCohortEntries.size() < queueCapacity) {
- queuedCohortEntries.offer(cohortEntry);
- } else {
- removeCohortEntry(transactionID);
-
- RuntimeException ex = new RuntimeException(
- String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
- " capacity %d has been reached.",
- name, transactionID, queueCapacity));
- log.error(ex.getMessage());
- cohortEntry.getReplySender().tell(new Status.Failure(ex), cohortEntry.getShard().self());
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
+ name, currentCohortEntry.getTransactionID(), transactionID);
}
- } else {
- // No Tx commit currently in progress - make this the current entry and proceed with
- // canCommit.
- cohortEntry.updateLastAccessTime();
- currentCohortEntry = cohortEntry;
- doCanCommit(cohortEntry);
+ return;
+ }
+
+ // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
+ // it the current entry and proceed with canCommit.
+ // Purposely checking reference equality here.
+ if(queuedCohortEntries.peek() == cohortEntry) {
+ currentCohortEntry = queuedCohortEntries.poll();
+ doCanCommit(currentCohortEntry);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
+ name, queuedCohortEntries.peek().getTransactionID(), transactionID);
+ }
}
}
public void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
// Lookup the cohort entry that was cached previously (or should have been) by
// transactionReady (via the ForwardedReadyTransaction message).
- final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
+ final CohortEntry cohortEntry = cohortCache.get(transactionID);
if(cohortEntry == null) {
// Either canCommit was invoked before ready(shouldn't happen) or a long time passed
// between canCommit and ready and the entry was expired from the cache.
}
private void doCanCommit(final CohortEntry cohortEntry) {
-
boolean canCommit = false;
try {
// We block on the future here so we don't have to worry about possibly accessing our
// currently uses a same thread executor anyway.
canCommit = cohortEntry.getCohort().canCommit().get();
+ log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
+
if(cohortEntry.isDoImmediateCommit()) {
if(canCommit) {
doCommit(cohortEntry);
CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
}
} catch (Exception e) {
- log.debug("{}: An exception occurred during canCommit: {}", name, e);
+ log.debug("{}: An exception occurred during canCommit", name, e);
Throwable failure = e;
if(e instanceof ExecutionException) {
return false;
}
+ cohortEntry.setReplySender(sender);
return doCommit(cohortEntry);
}
}
public CohortEntry getAndRemoveCohortEntry(String transactionID) {
- CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
- cohortCache.invalidate(transactionID);
- return cohortEntry;
- }
-
- public void removeCohortEntry(String transactionID) {
- cohortCache.invalidate(transactionID);
+ return cohortCache.remove(transactionID);
}
public boolean isCurrentTransaction(String transactionID) {
*/
public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
if(removeCohortEntry) {
- removeCohortEntry(transactionID);
+ cohortCache.remove(transactionID);
}
if(isCurrentTransaction(transactionID)) {
- // Dequeue the next cohort entry waiting in the queue.
- currentCohortEntry = queuedCohortEntries.poll();
- if(currentCohortEntry != null) {
- currentCohortEntry.updateLastAccessTime();
- doCanCommit(currentCohortEntry);
+ currentCohortEntry = null;
+
+ log.debug("{}: currentTransactionComplete: {}", name, transactionID);
+
+ maybeProcessNextCohortEntry();
+ }
+ }
+
+ private void maybeProcessNextCohortEntry() {
+ // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
+ // clean out expired entries.
+ Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
+ while(iter.hasNext()) {
+ CohortEntry next = iter.next();
+ if(next.isReadyToCommit()) {
+ if(currentCohortEntry == null) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Next entry to canCommit {}", name, next);
+ }
+
+ iter.remove();
+ currentCohortEntry = next;
+ currentCohortEntry.updateLastAccessTime();
+ doCanCommit(currentCohortEntry);
+ }
+
+ break;
+ } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
+ log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
+ name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
+
+ iter.remove();
+ cohortCache.remove(next.getTransactionID());
+ } else {
+ break;
}
}
}
+ void cleanupExpiredCohortEntries() {
+ maybeProcessNextCohortEntry();
+ }
+
@VisibleForTesting
void setCohortDecorator(CohortDecorator cohortDecorator) {
this.cohortDecorator = cohortDecorator;
}
-
static class CohortEntry {
private final String transactionID;
private ShardDataTreeCohort cohort;
private final ReadWriteShardDataTreeTransaction transaction;
private ActorRef replySender;
private Shard shard;
- private long lastAccessTime;
private boolean doImmediateCommit;
+ private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
this.transaction = Preconditions.checkNotNull(transaction);
}
void updateLastAccessTime() {
- lastAccessTime = System.currentTimeMillis();
- }
-
- long getLastAccessTime() {
- return lastAccessTime;
+ lastAccessTimer.reset();
+ lastAccessTimer.start();
}
String getTransactionID() {
}
}
+ boolean isReadyToCommit() {
+ return replySender != null;
+ }
+
+ boolean isExpired(long expireTimeInMillis) {
+ return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
+ }
+
boolean isDoImmediateCommit() {
return doImmediateCommit;
}
void setShard(Shard shard) {
this.shard = shard;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
+ .append(doImmediateCommit).append("]");
+ return builder.toString();
+ }
}
}
int getShardTransactionCommitQueueCapacity();
+ long getShardCommitQueueExpiryTimeoutInSeconds();
+
long getShardInitializationTimeoutInSeconds();
long getShardLeaderElectionTimeoutInSeconds();
*/
package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
return context.getShardTransactionCommitTimeoutInSeconds();
}
+ @Override
+ public long getShardCommitQueueExpiryTimeoutInSeconds() {
+ return TimeUnit.SECONDS.convert(context.getShardCommitQueueExpiryTimeoutInMillis(), TimeUnit.MILLISECONDS);
+ }
+
@Override
public int getShardTransactionCommitQueueCapacity() {
return context.getShardTransactionCommitQueueCapacity();
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
.transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue())
.shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
+ .shardCommitQueueExpiryTimeoutInSeconds(
+ props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
.build();
return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
.transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue())
.shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
+ .shardCommitQueueExpiryTimeoutInSeconds(
+ props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
.build();
return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
}
leaf shard-transaction-commit-queue-capacity {
- default 20000;
+ default 50000;
type non-zero-uint32-type;
description "The maximum allowed capacity for each shard's transaction commit queue.";
}
+ leaf shard-commit-queue-expiry-timeout-in-seconds {
+ default 120; // 2 minutes
+ type non-zero-uint32-type;
+ description "The maximum amount of time a transaction can remain in a shard's commit queue waiting
+ to begin the CanCommit phase as coordinated by the broker front-end. Normally this should be
+ quick but latencies can occur in between transaction ready and CanCommit or a remote broker
+ could lose connection and CanCommit might never occur. Expiring transactions from the queue
+ allows subsequent pending transaction to be processed.";
+ }
+
leaf shard-initialization-timeout-in-seconds {
default 300; // 5 minutes
type non-zero-uint32-type;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
writeTx = txChain.newWriteOnlyTransaction();
- //writeTx.delete(personPath);
+ writeTx.delete(carPath);
DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
- doCommit(cohort1);
- doCommit(cohort2);
+ ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
+ ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
+
+ doCommit(canCommit1, cohort1);
+ doCommit(canCommit2, cohort2);
doCommit(cohort3);
txChain.close();
DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", car, optional.get());
+ assertEquals("isPresent", false, optional.isPresent());
optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
- //assertEquals("isPresent", false, optional.isPresent());
assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", person, optional.get());
cleanup(dataStore);
}};
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
}
@Test
- public void testTransactionChain() throws Exception {
- initDatastores("testTransactionChain");
+ public void testTransactionChainWithSingleShard() throws Exception {
+ initDatastores("testTransactionChainWithSingleShard");
DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
}
+ @Test
+ public void testTransactionChainWithMultipleShards() throws Exception{
+ initDatastores("testTransactionChainWithMultipleShards");
+
+ DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
+
+ followerTestKit.doCommit(writeTx.ready());
+
+ DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
+
+ MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+ readWriteTx.write(carPath, car);
+
+ MapEntryNode person = PeopleModel.newPersonEntry("jack");
+ YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
+ readWriteTx.merge(personPath, person);
+
+ Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", car, optional.get());
+
+ optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", person, optional.get());
+
+ DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
+
+ writeTx = txChain.newWriteOnlyTransaction();
+
+ writeTx.delete(personPath);
+
+ DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
+
+ followerTestKit.doCommit(cohort2);
+ followerTestKit.doCommit(cohort3);
+
+ txChain.close();
+
+ DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+ verifyCars(readTx, car);
+
+ optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", false, optional.isPresent());
+ }
+
@Test
public void testReadyLocalTransactionForwardedToLeader() throws Exception {
initDatastores("testReadyLocalTransactionForwardedToLeader");
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
cohort.commit().get(5, TimeUnit.SECONDS);
}
+ void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
+ Boolean canCommit = canCommitFuture.get(7, TimeUnit.SECONDS);
+ assertEquals("canCommit", true, canCommit);
+ cohort.preCommit().get(5, TimeUnit.SECONDS);
+ cohort.commit().get(5, TimeUnit.SECONDS);
+ }
+
void cleanup(DistributedDataStore dataStore) {
if(dataStore != null) {
dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
@Test
public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
- dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
+ dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
cohort2, modification2, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
+ // The 3rd Tx should exceed queue capacity and fail.
+
shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
cohort3, modification3, true, false), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
// canCommit 1st Tx.
}};
}
+ @Test
+ public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
+ dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
+
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitWithPriorExpiredCohortEntries");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ String transactionID1 = "tx1";
+ MutableCompositeModification modification1 = new MutableCompositeModification();
+ ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ String transactionID2 = "tx2";
+ MutableCompositeModification modification2 = new MutableCompositeModification();
+ ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ String transactionID3 = "tx3";
+ MutableCompositeModification modification3 = new MutableCompositeModification();
+ ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+ TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+ cohort3, modification3, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
+ // should expire from the queue and the last one should be processed.
+
+ shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
+ dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
+
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitWithSubsequentExpiredCohortEntry");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ String transactionID1 = "tx1";
+ MutableCompositeModification modification1 = new MutableCompositeModification();
+ ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // CanCommit the first one so it's the current in-progress CohortEntry.
+
+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Ready the second Tx.
+
+ String transactionID2 = "tx2";
+ MutableCompositeModification modification2 = new MutableCompositeModification();
+ ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Ready the third Tx.
+
+ String transactionID3 = "tx3";
+ DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
+ new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
+ .apply(modification3);
+ ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
+
+ shard.tell(readyMessage, getRef());
+
+ // Commit the first Tx. After completing, the second should expire from the queue and the third
+ // Tx committed.
+
+ shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Expect commit reply from the third Tx.
+
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
+ assertNotNull(TestModel.TEST2_PATH + " not found", node);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
@Test
public void testCanCommitBeforeReadyFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
"test");
+ public static final QName TEST2_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
+ "test2");
+
public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
"junk");
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+ public static final YangInstanceIdentifier TEST2_PATH = YangInstanceIdentifier.of(TEST2_QNAME);
public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
node(OUTER_LIST_QNAME).build();
}
}
}
+
+ container test2 {
+ }
}
\ No newline at end of file