From: Tom Pantelis Date: Wed, 13 May 2015 11:44:01 +0000 (-0400) Subject: Bug 3206: CDS - issues with direct commit X-Git-Tag: release/beryllium~570 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=103ceecd0195cca6c87fbd62a687d8addf128784 Bug 3206: CDS - issues with direct commit Modified RaftActor#persistData to send the ApplyState message instead of calling applyState directly in case the Shard tries to persist another entry during the persist callback. Modified Shard to use the correct sneder to send the CommitTransactionReply message. The third fix is to ensure direct commits in a chain aren't committed before previous coordinated commits. In the ShardCommitCoordinator, I changed it to enqueue all CohortEntries when they are readied so they are processed in that order. I also included the unit test scenario that caused the issues to occur. Change-Id: I65ffcbbac37d6be28c4e1c2dc17d3b0ca21847dc Signed-off-by: Tom Pantelis (cherry picked from commit cf088cc87d5e9c0dfd3fb8e47e0d6d7c5ddc19fd) --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 5dc1b9dcdf..f8bbf638a0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -342,7 +342,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { raftContext.setLastApplied(replicatedLogEntry.getIndex()); // Apply the state immediately - applyState(clientActor, identifier, data); + self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self()); // Send a ApplyJournalEntries message so that we write the fact that we applied // the state to durable storage diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index c32839c490..8c32eab61d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -81,6 +81,8 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { new Procedure() { @Override public void apply(ReplicatedLogEntry evt) throws Exception { + context.getLogger().debug("{}: persist complete {}", context.getId(), replicatedLogEntry); + int logEntrySize = replicatedLogEntry.size(); long dataSizeForCheck = dataSize(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 8ae79ceb2d..2f323aafd3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -36,7 +36,7 @@ public class DatastoreContext { public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000; public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500; public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10; - public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000; + public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000; public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES); public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS); public static final boolean DEFAULT_PERSISTENT = true; @@ -45,7 +45,8 @@ public class DatastoreContext { public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2; public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100; public static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; - public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100; + public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 100; + public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); private static Set globalDatastoreTypes = Sets.newConcurrentHashSet(); @@ -64,6 +65,7 @@ public class DatastoreContext { private String dataStoreType = UNKNOWN_DATA_STORE_TYPE; private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; private boolean writeOnlyTransactionOptimizationsEnabled = true; + private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS; public static Set getGlobalDatastoreTypes() { return globalDatastoreTypes; @@ -93,6 +95,7 @@ public class DatastoreContext { this.dataStoreType = other.dataStoreType; this.shardBatchedModificationCount = other.shardBatchedModificationCount; this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; + this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -201,6 +204,10 @@ public class DatastoreContext { return writeOnlyTransactionOptimizationsEnabled; } + public long getShardCommitQueueExpiryTimeoutInMillis() { + return shardCommitQueueExpiryTimeoutInMillis; + } + public static class Builder { private final DatastoreContext datastoreContext; private int maxShardDataChangeExecutorPoolSize = @@ -346,6 +353,17 @@ public class DatastoreContext { return this; } + public Builder shardCommitQueueExpiryTimeoutInMillis(long value) { + datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value; + return this; + } + + public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) { + datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert( + value, TimeUnit.SECONDS); + return this; + } + public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) { this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize; return this; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 66562975cc..71d9dabf37 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -198,7 +198,10 @@ final class RemoteTransactionContextSupport { isTxActorLocal, remoteTransactionVersion, parent.getCompleter()); } - TransactionContextCleanup.track(this, ret); + if(parent.getType() == TransactionType.READ_ONLY) { + TransactionContextCleanup.track(this, ret); + } + return ret; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c2cc1ac05b..6da9fe947a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -140,7 +140,7 @@ public class Shard extends RaftActor { } commitCoordinator = new ShardCommitCoordinator(store, - TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES), + datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(), datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name); setTransactionCommitTimeout(); @@ -160,7 +160,7 @@ public class Shard extends RaftActor { private void setTransactionCommitTimeout() { transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( - datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2; } public static Props props(final ShardIdentifier name, @@ -302,14 +302,15 @@ public class Shard extends RaftActor { private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { - long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime(); - if(elapsed > transactionCommitTimeout) { + if(cohortEntry.isExpired(transactionCommitTimeout)) { LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting", persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout); doAbortTransaction(cohortEntry.getTransactionID(), null); } } + + commitCoordinator.cleanupExpiredCohortEntries(); } private static boolean isEmptyCommit(final DataTreeCandidate candidate) { @@ -323,9 +324,9 @@ public class Shard extends RaftActor { // or if cohortEntry has no modifications // we can apply modification to the state immediately if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { - applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate); + applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate); } else { - Shard.this.persistData(getSender(), cohortEntry.getTransactionID(), + Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), DataTreeCandidatePayload.create(candidate)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 97816a55cc..f3e1e33e34 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -12,12 +12,11 @@ import akka.actor.Status; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalCause; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import com.google.common.base.Stopwatch; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -45,13 +44,15 @@ public class ShardCommitCoordinator { ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual); } - private final Cache cohortCache; + private final Map cohortCache = new HashMap<>(); private CohortEntry currentCohortEntry; private final ShardDataTree dataTree; - private final Queue queuedCohortEntries; + // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls + // since this should only be accessed on the shard's dispatcher. + private final Queue queuedCohortEntries = new LinkedList<>(); private int queueCapacity; @@ -59,15 +60,7 @@ public class ShardCommitCoordinator { private final String name; - private final RemovalListener cacheRemovalListener = - new RemovalListener() { - @Override - public void onRemoval(RemovalNotification notification) { - if(notification.getCause() == RemovalCause.EXPIRED) { - log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey()); - } - } - }; + private final long cacheExpiryTimeoutInMillis; // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. private CohortDecorator cohortDecorator; @@ -75,19 +68,13 @@ public class ShardCommitCoordinator { private ReadyTransactionReply readyTransactionReply; public ShardCommitCoordinator(ShardDataTree dataTree, - long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) { + long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) { this.queueCapacity = queueCapacity; this.log = log; this.name = name; this.dataTree = Preconditions.checkNotNull(dataTree); - - cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS). - removalListener(cacheRemovalListener).build(); - - // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls - // since this should only be accessed on the shard's dispatcher. - queuedCohortEntries = new LinkedList<>(); + this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis; } public void setQueueCapacity(int queueCapacity) { @@ -102,6 +89,23 @@ public class ShardCommitCoordinator { return readyTransactionReply; } + private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) { + if(queuedCohortEntries.size() < queueCapacity) { + queuedCohortEntries.offer(cohortEntry); + return true; + } else { + cohortCache.remove(cohortEntry.getTransactionID()); + + RuntimeException ex = new RuntimeException( + String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+ + " capacity %d has been reached.", + name, cohortEntry.getTransactionID(), queueCapacity)); + log.error(ex.getMessage()); + sender.tell(new Status.Failure(ex), shard.self()); + return false; + } + } + /** * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit. @@ -114,6 +118,10 @@ public class ShardCommitCoordinator { (MutableCompositeModification) ready.getModification()); cohortCache.put(ready.getTransactionID(), cohortEntry); + if(!queueCohortEntry(cohortEntry, sender, shard)) { + return; + } + if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) { // Return our actor path as we'll handle the three phase commit except if the Tx client // version < Helium-1 version which means the Tx was initiated by a base Helium version node. @@ -156,9 +164,9 @@ public class ShardCommitCoordinator { * * @throws ExecutionException if an error occurs loading the cache */ - boolean handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) + void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) throws ExecutionException { - CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID()); + CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID()); if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), dataTree.newReadWriteTransaction(batched.getTransactionID(), @@ -174,6 +182,10 @@ public class ShardCommitCoordinator { cohortEntry.applyModifications(batched.getModifications()); if(batched.isReady()) { + if(!queueCohortEntry(cohortEntry, sender, shard)) { + return; + } + if(log.isDebugEnabled()) { log.debug("{}: Readying Tx {}, client version {}", name, batched.getTransactionID(), batched.getVersion()); @@ -191,8 +203,6 @@ public class ShardCommitCoordinator { } else { sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self()); } - - return batched.isReady(); } /** @@ -208,6 +218,11 @@ public class ShardCommitCoordinator { final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort); cohortCache.put(message.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); + + if(!queueCohortEntry(cohortEntry, sender, shard)) { + return; + } + log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID()); if (message.isDoCommitOnReady()) { @@ -222,36 +237,31 @@ public class ShardCommitCoordinator { private void handleCanCommit(CohortEntry cohortEntry) { String transactionID = cohortEntry.getTransactionID(); - if(log.isDebugEnabled()) { - log.debug("{}: Processing canCommit for transaction {} for shard {}", - name, transactionID, cohortEntry.getShard().self().path()); - } + cohortEntry.updateLastAccessTime(); if(currentCohortEntry != null) { - // There's already a Tx commit in progress - attempt to queue this entry to be - // committed after the current Tx completes. - log.debug("{}: Transaction {} is already in progress - queueing transaction {}", - name, currentCohortEntry.getTransactionID(), transactionID); + // There's already a Tx commit in progress so we can't process this entry yet - but it's in the + // queue and will get processed after all prior entries complete. - if(queuedCohortEntries.size() < queueCapacity) { - queuedCohortEntries.offer(cohortEntry); - } else { - removeCohortEntry(transactionID); - - RuntimeException ex = new RuntimeException( - String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+ - " capacity %d has been reached.", - name, transactionID, queueCapacity)); - log.error(ex.getMessage()); - cohortEntry.getReplySender().tell(new Status.Failure(ex), cohortEntry.getShard().self()); + if(log.isDebugEnabled()) { + log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now", + name, currentCohortEntry.getTransactionID(), transactionID); } - } else { - // No Tx commit currently in progress - make this the current entry and proceed with - // canCommit. - cohortEntry.updateLastAccessTime(); - currentCohortEntry = cohortEntry; - doCanCommit(cohortEntry); + return; + } + + // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make + // it the current entry and proceed with canCommit. + // Purposely checking reference equality here. + if(queuedCohortEntries.peek() == cohortEntry) { + currentCohortEntry = queuedCohortEntries.poll(); + doCanCommit(currentCohortEntry); + } else { + if(log.isDebugEnabled()) { + log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", + name, queuedCohortEntries.peek().getTransactionID(), transactionID); + } } } @@ -265,7 +275,7 @@ public class ShardCommitCoordinator { public void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) { // Lookup the cohort entry that was cached previously (or should have been) by // transactionReady (via the ForwardedReadyTransaction message). - final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID); + final CohortEntry cohortEntry = cohortCache.get(transactionID); if(cohortEntry == null) { // Either canCommit was invoked before ready(shouldn't happen) or a long time passed // between canCommit and ready and the entry was expired from the cache. @@ -283,7 +293,6 @@ public class ShardCommitCoordinator { } private void doCanCommit(final CohortEntry cohortEntry) { - boolean canCommit = false; try { // We block on the future here so we don't have to worry about possibly accessing our @@ -291,6 +300,8 @@ public class ShardCommitCoordinator { // currently uses a same thread executor anyway. canCommit = cohortEntry.getCohort().canCommit().get(); + log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit); + if(cohortEntry.isDoImmediateCommit()) { if(canCommit) { doCommit(cohortEntry); @@ -304,7 +315,7 @@ public class ShardCommitCoordinator { CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self()); } } catch (Exception e) { - log.debug("{}: An exception occurred during canCommit: {}", name, e); + log.debug("{}: An exception occurred during canCommit", name, e); Throwable failure = e; if(e instanceof ExecutionException) { @@ -367,6 +378,7 @@ public class ShardCommitCoordinator { return false; } + cohortEntry.setReplySender(sender); return doCommit(cohortEntry); } @@ -391,13 +403,7 @@ public class ShardCommitCoordinator { } public CohortEntry getAndRemoveCohortEntry(String transactionID) { - CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID); - cohortCache.invalidate(transactionID); - return cohortEntry; - } - - public void removeCohortEntry(String transactionID) { - cohortCache.invalidate(transactionID); + return cohortCache.remove(transactionID); } public boolean isCurrentTransaction(String transactionID) { @@ -416,33 +422,66 @@ public class ShardCommitCoordinator { */ public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) { if(removeCohortEntry) { - removeCohortEntry(transactionID); + cohortCache.remove(transactionID); } if(isCurrentTransaction(transactionID)) { - // Dequeue the next cohort entry waiting in the queue. - currentCohortEntry = queuedCohortEntries.poll(); - if(currentCohortEntry != null) { - currentCohortEntry.updateLastAccessTime(); - doCanCommit(currentCohortEntry); + currentCohortEntry = null; + + log.debug("{}: currentTransactionComplete: {}", name, transactionID); + + maybeProcessNextCohortEntry(); + } + } + + private void maybeProcessNextCohortEntry() { + // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also + // clean out expired entries. + Iterator iter = queuedCohortEntries.iterator(); + while(iter.hasNext()) { + CohortEntry next = iter.next(); + if(next.isReadyToCommit()) { + if(currentCohortEntry == null) { + if(log.isDebugEnabled()) { + log.debug("{}: Next entry to canCommit {}", name, next); + } + + iter.remove(); + currentCohortEntry = next; + currentCohortEntry.updateLastAccessTime(); + doCanCommit(currentCohortEntry); + } + + break; + } else if(next.isExpired(cacheExpiryTimeoutInMillis)) { + log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache", + name, next.getTransactionID(), cacheExpiryTimeoutInMillis); + + iter.remove(); + cohortCache.remove(next.getTransactionID()); + } else { + break; } } } + void cleanupExpiredCohortEntries() { + maybeProcessNextCohortEntry(); + } + @VisibleForTesting void setCohortDecorator(CohortDecorator cohortDecorator) { this.cohortDecorator = cohortDecorator; } - static class CohortEntry { private final String transactionID; private ShardDataTreeCohort cohort; private final ReadWriteShardDataTreeTransaction transaction; private ActorRef replySender; private Shard shard; - private long lastAccessTime; private boolean doImmediateCommit; + private final Stopwatch lastAccessTimer = Stopwatch.createStarted(); CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { this.transaction = Preconditions.checkNotNull(transaction); @@ -463,11 +502,8 @@ public class ShardCommitCoordinator { } void updateLastAccessTime() { - lastAccessTime = System.currentTimeMillis(); - } - - long getLastAccessTime() { - return lastAccessTime; + lastAccessTimer.reset(); + lastAccessTimer.start(); } String getTransactionID() { @@ -497,6 +533,14 @@ public class ShardCommitCoordinator { } } + boolean isReadyToCommit() { + return replySender != null; + } + + boolean isExpired(long expireTimeInMillis) { + return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis; + } + boolean isDoImmediateCommit() { return doImmediateCommit; } @@ -520,5 +564,13 @@ public class ShardCommitCoordinator { void setShard(Shard shard) { this.shard = shard; } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=") + .append(doImmediateCommit).append("]"); + return builder.toString(); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java index 6b81792a57..981be366d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java @@ -34,6 +34,8 @@ public interface DatastoreConfigurationMXBean { int getShardTransactionCommitQueueCapacity(); + long getShardCommitQueueExpiryTimeoutInSeconds(); + long getShardInitializationTimeoutInSeconds(); long getShardLeaderElectionTimeoutInSeconds(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java index 79ff2a4e54..261b93de56 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore.jmx.mbeans; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; @@ -73,6 +74,11 @@ public class DatastoreConfigurationMXBeanImpl extends AbstractMXBean implements return context.getShardTransactionCommitTimeoutInSeconds(); } + @Override + public long getShardCommitQueueExpiryTimeoutInSeconds() { + return TimeUnit.SECONDS.convert(context.getShardCommitQueueExpiryTimeoutInMillis(), TimeUnit.MILLISECONDS); + } + @Override public int getShardTransactionCommitQueueCapacity() { return context.getShardTransactionCommitQueueCapacity(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index 252788203f..539797e659 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -64,6 +64,8 @@ public class DistributedConfigDataStoreProviderModule extends .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue()) .transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue()) .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue()) + .shardCommitQueueExpiryTimeoutInSeconds( + props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue()) .build(); return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index 08845654a5..45ed184de3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -65,6 +65,8 @@ public class DistributedOperationalDataStoreProviderModule extends .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue()) .transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue()) .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue()) + .shardCommitQueueExpiryTimeoutInSeconds( + props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue()) .build(); return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index dc83af9a75..3c753d02d5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -136,11 +136,21 @@ module distributed-datastore-provider { } leaf shard-transaction-commit-queue-capacity { - default 20000; + default 50000; type non-zero-uint32-type; description "The maximum allowed capacity for each shard's transaction commit queue."; } + leaf shard-commit-queue-expiry-timeout-in-seconds { + default 120; // 2 minutes + type non-zero-uint32-type; + description "The maximum amount of time a transaction can remain in a shard's commit queue waiting + to begin the CanCommit phase as coordinated by the broker front-end. Normally this should be + quick but latencies can occur in between transaction ready and CanCommit or a remote broker + could lose connection and CanCommit might never occur. Expiring transactions from the queue + allows subsequent pending transaction to be processed."; + } + leaf shard-initialization-timeout-in-seconds { default 300; // 5 minutes type non-zero-uint32-type; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index f3d93b896d..76ae3c7156 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -9,6 +9,7 @@ import akka.cluster.Cluster; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.io.IOException; @@ -788,12 +789,15 @@ public class DistributedDataStoreIntegrationTest { writeTx = txChain.newWriteOnlyTransaction(); - //writeTx.delete(personPath); + writeTx.delete(carPath); DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); - doCommit(cohort1); - doCommit(cohort2); + ListenableFuture canCommit1 = cohort1.canCommit(); + ListenableFuture canCommit2 = cohort2.canCommit(); + + doCommit(canCommit1, cohort1); + doCommit(canCommit2, cohort2); doCommit(cohort3); txChain.close(); @@ -801,12 +805,11 @@ public class DistributedDataStoreIntegrationTest { DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + assertEquals("isPresent", false, optional.isPresent()); optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - //assertEquals("isPresent", false, optional.isPresent()); assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); cleanup(dataStore); }}; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 7cef2fd743..be8ede9d6f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -33,6 +33,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -253,8 +254,8 @@ public class DistributedDataStoreRemotingIntegrationTest { } @Test - public void testTransactionChain() throws Exception { - initDatastores("testTransactionChain"); + public void testTransactionChainWithSingleShard() throws Exception { + initDatastores("testTransactionChainWithSingleShard"); DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -299,6 +300,61 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2); } + @Test + public void testTransactionChainWithMultipleShards() throws Exception{ + initDatastores("testTransactionChainWithMultipleShards"); + + DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + followerTestKit.doCommit(writeTx.ready()); + + DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); + + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); + + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.merge(personPath, person); + + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + + DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + + writeTx = txChain.newWriteOnlyTransaction(); + + writeTx.delete(personPath); + + DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); + + followerTestKit.doCommit(cohort2); + followerTestKit.doCommit(cohort3); + + txChain.close(); + + DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); + verifyCars(readTx, car); + + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); + } + @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { initDatastores("testReadyLocalTransactionForwardedToLeader"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index 109e77c523..94f20856ff 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -14,6 +14,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import com.google.common.base.Optional; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -122,6 +123,13 @@ class IntegrationTestKit extends ShardTestKit { cohort.commit().get(5, TimeUnit.SECONDS); } + void doCommit(final ListenableFuture canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort) throws Exception { + Boolean canCommit = canCommitFuture.get(7, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + } + void cleanup(DistributedDataStore dataStore) { if(dataStore != null) { dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index ee543164de..49cfcbc9c2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -1826,7 +1826,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testTransactionCommitQueueCapacityExceeded() throws Throwable { - dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1); + dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2); new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), @@ -1866,9 +1866,11 @@ public class ShardTest extends AbstractShardTest { cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); + // The 3rd Tx should exceed queue capacity and fail. + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, cohort3, modification3, true, false), getRef()); - expectMsgClass(duration, ReadyTransactionReply.class); + expectMsgClass(duration, akka.actor.Status.Failure.class); // canCommit 1st Tx. @@ -1888,6 +1890,125 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable { + dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1); + + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testTransactionCommitWithPriorExpiredCohortEntries"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + String transactionID3 = "tx3"; + MutableCompositeModification modification3 = new MutableCompositeModification(); + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3); + + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // All Tx's are readied. We'll send canCommit for the last one but not the others. The others + // should expire from the queue and the last one should be processed. + + shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable { + dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1); + + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testTransactionCommitWithSubsequentExpiredCohortEntry"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // CanCommit the first one so it's the current in-progress CohortEntry. + + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + + // Ready the second Tx. + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // Ready the third Tx. + + String transactionID3 = "tx3"; + DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification(); + new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME)) + .apply(modification3); + ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true); + + shard.tell(readyMessage, getRef()); + + // Commit the first Tx. After completing, the second should expire from the queue and the third + // Tx committed. + + shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + // Expect commit reply from the third Tx. + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + NormalizedNode node = readStore(shard, TestModel.TEST2_PATH); + assertNotNull(TestModel.TEST2_PATH + " not found", node); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCanCommitBeforeReadyFailure() throws Throwable { new ShardTestKit(getSystem()) {{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index 60420dcf23..07e9b29942 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -22,6 +22,9 @@ public class TestModel { public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", "test"); + public static final QName TEST2_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", + "test2"); + public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13", "junk"); @@ -36,6 +39,7 @@ public class TestModel { private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME); + public static final YangInstanceIdentifier TEST2_PATH = YangInstanceIdentifier.of(TEST2_QNAME); public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME); public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). node(OUTER_LIST_QNAME).build(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/odl-datastore-test.yang b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/odl-datastore-test.yang index f6d0202fd5..e36f38ea96 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/odl-datastore-test.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/odl-datastore-test.yang @@ -39,4 +39,7 @@ module odl-datastore-test { } } } + + container test2 { + } } \ No newline at end of file