From c7636ed2c99ab891dbecb719db53c1a4ce1b54b8 Mon Sep 17 00:00:00 2001 From: tpantelis Date: Sun, 12 Oct 2014 10:44:35 -0400 Subject: [PATCH] Bug 2055: Handle Tx create in TransactionProxy resiliently Modified TransactionProxy to utilize the FindPrimaryShard functionality to wait until the shard is initialized. Also added retries to the CreateTransaction operation to wait a period of time for the shard to elect a leader. This makes it more resilient to avoid transient timing/sequencing failures, particularly on startup. In addition, the FindPrimaryShard and CreateTransaction operations are now done async (non-blocking). The futures are combined to yield the CreateTransactionReply which is passed to a new class TransactionFutureCallback on completion. The TransactionFutureCallback class creates and stores the TransactionContext which subsequent read/write/delete/ready transaction operations can access. If a transaction operation occurs before CreateTransaction completes, the operation is cached in TransactionFutureCallback and executed when the CreateTransaction future completes. Change-Id: Id9cdc9641038922d6209c44d924bd168658a71fb Signed-off-by: tpantelis --- .../DataChangeListenerRegistrationProxy.java | 8 +- .../cluster/datastore/DatastoreContext.java | 31 +- .../datastore/DistributedDataStore.java | 5 +- .../controller/cluster/datastore/Shard.java | 5 +- .../cluster/datastore/TransactionProxy.java | 635 +++++++++++------- .../exceptions/NoShardLeaderException.java | 21 + .../cluster/datastore/utils/ActorContext.java | 100 ++- ...tributedConfigDataStoreProviderModule.java | 4 + ...tedOperationalDataStoreProviderModule.java | 4 + .../yang/distributed-datastore-provider.yang | 15 + ...taChangeListenerRegistrationProxyTest.java | 7 +- .../DistributedDataStoreIntegrationTest.java | 422 +++++++++++- .../datastore/TransactionProxyTest.java | 109 ++- .../datastore/utils/InMemoryJournal.java | 11 + 14 files changed, 1076 insertions(+), 301 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NoShardLeaderException.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index b2ae060c3d..06f3afc57c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -8,12 +8,10 @@ package org.opendaylight.controller.cluster.datastore; -import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.dispatch.OnComplete; -import akka.util.Timeout; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -42,8 +40,6 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class); - public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES); - private volatile ActorSelection listenerRegistrationActor; private final AsyncDataChangeListener> listener; private ActorRef dataChangeListenerActor; @@ -99,7 +95,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration dataChangeListenerActor = actorContext.getActorSystem().actorOf( DataChangeListener.props(listener)); - Future findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT); + Future findFuture = actorContext.findLocalShardAsync(shardName); findFuture.onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, ActorRef shard) { @@ -121,7 +117,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration Future future = actorContext.executeOperationAsync(shard, new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), - REGISTER_TIMEOUT); + actorContext.getDatastoreContext().getShardInitializationTimeout()); future.onComplete(new OnComplete(){ @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 722e23046e..03d331b558 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; +import akka.util.Timeout; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.util.concurrent.TimeUnit; @@ -29,11 +30,14 @@ public class DatastoreContext { private final ConfigParams shardRaftConfig; private final int shardTransactionCommitTimeoutInSeconds; private final int shardTransactionCommitQueueCapacity; + private final Timeout shardInitializationTimeout; + private final Timeout shardLeaderElectionTimeout; private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties, ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds, Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds, - int shardTransactionCommitQueueCapacity) { + int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout, + Timeout shardLeaderElectionTimeout) { this.dataStoreProperties = dataStoreProperties; this.shardRaftConfig = shardRaftConfig; this.dataStoreMXBeanType = dataStoreMXBeanType; @@ -41,6 +45,8 @@ public class DatastoreContext { this.shardTransactionIdleTimeout = shardTransactionIdleTimeout; this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds; this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity; + this.shardInitializationTimeout = shardInitializationTimeout; + this.shardLeaderElectionTimeout = shardLeaderElectionTimeout; } public static Builder newBuilder() { @@ -75,6 +81,14 @@ public class DatastoreContext { return shardTransactionCommitQueueCapacity; } + public Timeout getShardInitializationTimeout() { + return shardInitializationTimeout; + } + + public Timeout getShardLeaderElectionTimeout() { + return shardLeaderElectionTimeout; + } + public static class Builder { private InMemoryDOMDataStoreConfigProperties dataStoreProperties; private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES); @@ -85,6 +99,8 @@ public class DatastoreContext { private int shardSnapshotBatchCount = 20000; private int shardHeartbeatIntervalInMillis = 500; private int shardTransactionCommitQueueCapacity = 20000; + private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES); + private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS); public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) { this.shardTransactionIdleTimeout = shardTransactionIdleTimeout; @@ -131,6 +147,16 @@ public class DatastoreContext { return this; } + public Builder shardInitializationTimeout(long timeout, TimeUnit unit) { + this.shardInitializationTimeout = new Timeout(timeout, unit); + return this; + } + + public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) { + this.shardLeaderElectionTimeout = new Timeout(timeout, unit); + return this; + } + public DatastoreContext build() { DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, @@ -140,7 +166,8 @@ public class DatastoreContext { return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType, operationTimeoutInSeconds, shardTransactionIdleTimeout, - shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity); + shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity, + shardInitializationTimeout, shardLeaderElectionTimeout); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 2c73807dca..930c5f7257 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -53,9 +53,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au actorContext = new ActorContext(actorSystem, actorSystem.actorOf( ShardManager.props(type, cluster, configuration, datastoreContext) - .withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration); - - actorContext.setOperationTimeout(datastoreContext.getOperationTimeoutInSeconds()); + .withMailbox(ActorContext.MAILBOX), shardManagerId ), + cluster, configuration, datastoreContext); } public DistributedDataStore(ActorContext actorContext) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 789d51a19f..770cdec39c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -30,6 +30,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; @@ -442,9 +443,9 @@ public class Shard extends RaftActor { } else if (getLeader() != null) { getLeader().forward(message, getContext()); } else { - getSender().tell(new akka.actor.Status.Failure(new IllegalStateException( + getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException( "Could not find shard leader so transaction cannot be created. This typically happens" + - " when system is coming up or recovering and a leader is being elected. Try again" + + " when the system is coming up or recovering and a leader is being elected. Try again" + " later.")), getSelf()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 715f48c349..239207a60a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -9,8 +9,8 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; +import akka.dispatch.Mapper; import akka.dispatch.OnComplete; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.FinalizablePhantomReference; import com.google.common.base.FinalizableReferenceQueue; @@ -18,10 +18,10 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; - -import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -46,17 +46,17 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import scala.Function1; import scala.concurrent.Future; -import scala.runtime.AbstractFunction1; - +import scala.concurrent.Promise; +import scala.concurrent.duration.FiniteDuration; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; /** * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard @@ -72,18 +72,14 @@ import java.util.concurrent.atomic.AtomicLong; */ public class TransactionProxy implements DOMStoreReadWriteTransaction { - private final TransactionChainProxy transactionChainProxy; - - - - public enum TransactionType { + public static enum TransactionType { READ_ONLY, WRITE_ONLY, READ_WRITE } - static Function1 SAME_FAILURE_TRANSFORMER = new AbstractFunction1< - Throwable, Throwable>() { + static final Mapper SAME_FAILURE_TRANSFORMER = + new Mapper() { @Override public Throwable apply(Throwable failure) { return failure; @@ -92,9 +88,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private static final AtomicLong counter = new AtomicLong(); - private static final Logger - LOG = LoggerFactory.getLogger(TransactionProxy.class); + private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class); + /** + * Time interval in between transaction create retries. + */ + private static final FiniteDuration CREATE_TX_TRY_INTERVAL = + FiniteDuration.create(1, TimeUnit.SECONDS); /** * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The @@ -157,7 +157,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { for(ActorSelection actor : remoteTransactionActors) { LOG.trace("Sending CloseTransaction to {}", actor); actorContext.sendOperationAsync(actor, - new CloseTransaction().toSerializable()); + new CloseTransaction().toSerializable()); } } } @@ -173,11 +173,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private List remoteTransactionActors; private AtomicBoolean remoteTransactionActorsMB; - private final Map remoteTransactionPaths = new HashMap<>(); + /** + * Stores the create transaction results per shard. + */ + private final Map txFutureCallbackMap = new HashMap<>(); private final TransactionType transactionType; private final ActorContext actorContext; private final TransactionIdentifier identifier; + private final TransactionChainProxy transactionChainProxy; private final SchemaContext schemaContext; private boolean inReadyState; @@ -185,17 +189,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { this(actorContext, transactionType, null); } - @VisibleForTesting - List> getRecordedOperationFutures() { - List> recordedOperationFutures = Lists.newArrayList(); - for(TransactionContext transactionContext : remoteTransactionPaths.values()) { - recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures()); - } - - return recordedOperationFutures; - } - - public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) { + public TransactionProxy(ActorContext actorContext, TransactionType transactionType, + TransactionChainProxy transactionChainProxy) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, @@ -224,9 +219,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { new TransactionProxyCleanupPhantomReference(this); phantomReferenceCache.put(cleanup, cleanup); } - if(LOG.isDebugEnabled()) { - LOG.debug("Created txn {} of type {}", identifier, transactionType); + + LOG.debug("Created txn {} of type {}", identifier, transactionType); + } + + @VisibleForTesting + List> getRecordedOperationFutures() { + List> recordedOperationFutures = Lists.newArrayList(); + for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + if(transactionContext != null) { + recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures()); + } } + + return recordedOperationFutures; } @Override @@ -236,26 +243,82 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Read operation on write-only transaction is not allowed"); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} read {}", identifier, path); + LOG.debug("Tx {} read {}", identifier, path); + + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + + CheckedFuture>, ReadFailedException> future; + if(transactionContext != null) { + future = transactionContext.readData(path); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + final SettableFuture>> proxyFuture = SettableFuture.create(); + txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + Futures.addCallback(transactionContext.readData(path), + new FutureCallback>>() { + @Override + public void onSuccess(Optional> data) { + proxyFuture.set(data); + } + + @Override + public void onFailure(Throwable t) { + proxyFuture.setException(t); + } + }); + } + }); + + future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } - createTransactionIfMissing(actorContext, path); - return transactionContext(path).readData(path); + return future; } @Override - public CheckedFuture exists(YangInstanceIdentifier path) { + public CheckedFuture exists(final YangInstanceIdentifier path) { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Exists operation on write-only transaction is not allowed"); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} exists {}", identifier, path); + LOG.debug("Tx {} exists {}", identifier, path); + + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + + CheckedFuture future; + if(transactionContext != null) { + future = transactionContext.dataExists(path); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + final SettableFuture proxyFuture = SettableFuture.create(); + txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + Futures.addCallback(transactionContext.dataExists(path), + new FutureCallback() { + @Override + public void onSuccess(Boolean exists) { + proxyFuture.set(exists); + } + + @Override + public void onFailure(Throwable t) { + proxyFuture.setException(t); + } + }); + } + }); + + future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } - createTransactionIfMissing(actorContext, path); - return transactionContext(path).dataExists(path); + return future; } private void checkModificationState() { @@ -266,41 +329,72 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } @Override - public void write(YangInstanceIdentifier path, NormalizedNode data) { + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { checkModificationState(); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} write {}", identifier, path); - } - createTransactionIfMissing(actorContext, path); + LOG.debug("Tx {} write {}", identifier, path); - transactionContext(path).writeData(path, data); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + if(transactionContext != null) { + transactionContext.writeData(path, data); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.writeData(path, data); + } + }); + } } @Override - public void merge(YangInstanceIdentifier path, NormalizedNode data) { + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { checkModificationState(); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} merge {}", identifier, path); - } - createTransactionIfMissing(actorContext, path); + LOG.debug("Tx {} merge {}", identifier, path); - transactionContext(path).mergeData(path, data); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + if(transactionContext != null) { + transactionContext.mergeData(path, data); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.mergeData(path, data); + } + }); + } } @Override - public void delete(YangInstanceIdentifier path) { + public void delete(final YangInstanceIdentifier path) { checkModificationState(); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} delete {}", identifier, path); - } - createTransactionIfMissing(actorContext, path); - transactionContext(path).deleteData(path); + LOG.debug("Tx {} delete {}", identifier, path); + + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + if(transactionContext != null) { + transactionContext.deleteData(path); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.deleteData(path); + } + }); + } } @Override @@ -310,19 +404,32 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { inReadyState = true; - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier, - remoteTransactionPaths.size()); - } + LOG.debug("Tx {} Readying {} transactions for commit", identifier, + txFutureCallbackMap.size()); + List> cohortFutures = Lists.newArrayList(); - for(TransactionContext transactionContext : remoteTransactionPaths.values()) { + for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { + + LOG.debug("Tx {} Readying transaction for shard {}", identifier, + txFutureCallback.getShardName()); + + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + if(transactionContext != null) { + cohortFutures.add(transactionContext.readyTransaction()); + } else { + // The shard Tx hasn't been created yet so create a promise to ready the Tx later + // after it's created. + final Promise cohortPromise = akka.dispatch.Futures.promise(); + txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + cohortPromise.completeWith(transactionContext.readyTransaction()); + } + }); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} Readying transaction for shard {}", identifier, - transactionContext.getShardName()); + cohortFutures.add(cohortPromise.future()); } - cohortFutures.add(transactionContext.readyTransaction()); } if(transactionChainProxy != null){ @@ -340,11 +447,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void close() { - for(TransactionContext transactionContext : remoteTransactionPaths.values()) { - transactionContext.closeTransaction(); + for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + if(transactionContext != null) { + transactionContext.closeTransaction(); + } else { + txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.closeTransaction(); + } + }); + } } - remoteTransactionPaths.clear(); + txFutureCallbackMap.clear(); if(transactionType == TransactionType.READ_ONLY) { remoteTransactionActors.clear(); @@ -352,94 +469,211 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - private TransactionContext transactionContext(YangInstanceIdentifier path){ + private String shardNameFromIdentifier(YangInstanceIdentifier path){ + return ShardStrategyFactory.getStrategy(path).findShard(path); + } + + private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { String shardName = shardNameFromIdentifier(path); - return remoteTransactionPaths.get(shardName); + TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); + if(txFutureCallback == null) { + Future findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName); + + final TransactionFutureCallback newTxFutureCallback = + new TransactionFutureCallback(shardName); + + txFutureCallback = newTxFutureCallback; + txFutureCallbackMap.put(shardName, txFutureCallback); + + findPrimaryFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, ActorSelection primaryShard) { + if(failure != null) { + newTxFutureCallback.onComplete(failure, null); + } else { + newTxFutureCallback.setPrimaryShard(primaryShard); + } + } + }, actorContext.getActorSystem().dispatcher()); + } + + return txFutureCallback; } - private String shardNameFromIdentifier(YangInstanceIdentifier path){ - return ShardStrategyFactory.getStrategy(path).findShard(path); + public String getTransactionChainId() { + if(transactionChainProxy == null){ + return ""; + } + return transactionChainProxy.getTransactionChainId(); } - private void createTransactionIfMissing(ActorContext actorContext, - YangInstanceIdentifier path) { + /** + * Interface for a transaction operation to be invoked later. + */ + private static interface TransactionOperation { + void invoke(TransactionContext transactionContext); + } - if(transactionChainProxy != null){ - transactionChainProxy.waitTillCurrentTransactionReady(); + /** + * Implements a Future OnComplete callback for a CreateTransaction message. This class handles + * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a + * retry task after a short delay. + *

+ * The end result from a completed CreateTransaction message is a TransactionContext that is + * used to perform transaction operations. Transaction operations that occur before the + * CreateTransaction completes are cache and executed once the CreateTransaction completes, + * successfully or not. + */ + private class TransactionFutureCallback extends OnComplete { + + /** + * The list of transaction operations to execute once the CreateTransaction completes. + */ + @GuardedBy("txOperationsOnComplete") + private final List txOperationsOnComplete = Lists.newArrayList(); + + /** + * The TransactionContext resulting from the CreateTransaction reply. + */ + private volatile TransactionContext transactionContext; + + /** + * The target primary shard. + */ + private volatile ActorSelection primaryShard; + + private volatile int createTxTries = (int) (actorContext.getDatastoreContext(). + getShardLeaderElectionTimeout().duration().toMillis() / + CREATE_TX_TRY_INTERVAL.toMillis()); + + private final String shardName; + + TransactionFutureCallback(String shardName) { + this.shardName = shardName; } - String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); + String getShardName() { + return shardName; + } - TransactionContext transactionContext = - remoteTransactionPaths.get(shardName); + TransactionContext getTransactionContext() { + return transactionContext; + } - if (transactionContext != null) { - // A transaction already exists with that shard - return; + + /** + * Sets the target primary shard and initiates a CreateTransaction try. + */ + void setPrimaryShard(ActorSelection primaryShard) { + LOG.debug("Tx {} Primary shard found - trying create transaction", identifier); + + this.primaryShard = primaryShard; + tryCreateTransaction(); } - try { - Optional primaryShard = actorContext.findPrimaryShard(shardName); - if (!primaryShard.isPresent()) { - throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName); + /** + * Adds a TransactionOperation to be executed after the CreateTransaction completes. + */ + void addTxOperationOnComplete(TransactionOperation operation) { + synchronized(txOperationsOnComplete) { + if(transactionContext == null) { + LOG.debug("Tx {} Adding operation on complete {}", identifier); + + txOperationsOnComplete.add(operation); + } else { + operation.invoke(transactionContext); + } } + } - Object response = actorContext.executeOperation(primaryShard.get(), - new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), - getTransactionChainId()).toSerializable()); - if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { - CreateTransactionReply reply = - CreateTransactionReply.fromSerializable(response); + /** + * Performs a CreateTransaction try async. + */ + private void tryCreateTransaction() { + Future createTxFuture = actorContext.executeOperationAsync(primaryShard, + new CreateTransaction(identifier.toString(), + TransactionProxy.this.transactionType.ordinal(), + getTransactionChainId()).toSerializable()); - String transactionPath = reply.getTransactionPath(); + createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher()); + } - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath); + @Override + public void onComplete(Throwable failure, Object response) { + if(failure instanceof NoShardLeaderException) { + // There's no leader for the shard yet - schedule and try again, unless we're out + // of retries. Note: createTxTries is volatile as it may be written by different + // threads however not concurrently, therefore decrementing it non-atomically here + // is ok. + if(--createTxTries > 0) { + LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry", + identifier, shardName); + + actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL, + new Runnable() { + @Override + public void run() { + tryCreateTransaction(); + } + }, actorContext.getActorSystem().dispatcher()); + return; } - ActorSelection transactionActor = actorContext.actorSelection(transactionPath); + } - if (transactionType == TransactionType.READ_ONLY) { - // Add the actor to the remoteTransactionActors list for access by the - // cleanup PhantonReference. - remoteTransactionActors.add(transactionActor); + // Create the TransactionContext from the response or failure and execute delayed + // TransactionOperations. This entire section is done atomically (ie synchronized) with + // respect to #addTxOperationOnComplete to handle timing issues and ensure no + // TransactionOperation is missed and that they are processed in the order they occurred. + synchronized(txOperationsOnComplete) { + if(failure != null) { + LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, + failure.getMessage()); + + transactionContext = new NoOpTransactionContext(failure, identifier); + } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { + createValidTransactionContext(CreateTransactionReply.fromSerializable(response)); + } else { + IllegalArgumentException exception = new IllegalArgumentException(String.format( + "Invalid reply type %s for CreateTransaction", response.getClass())); + + transactionContext = new NoOpTransactionContext(exception, identifier); + } - // Write to the memory barrier volatile to publish the above update to the - // remoteTransactionActors list for thread visibility. - remoteTransactionActorsMB.set(true); + for(TransactionOperation oper: txOperationsOnComplete) { + oper.invoke(transactionContext); } - // TxActor is always created where the leader of the shard is. - // Check if TxActor is created in the same node - boolean isTxActorLocal = actorContext.isLocalPath(transactionPath); + txOperationsOnComplete.clear(); + } + } - transactionContext = new TransactionContextImpl(shardName, transactionPath, - transactionActor, identifier, actorContext, schemaContext, isTxActorLocal); + private void createValidTransactionContext(CreateTransactionReply reply) { + String transactionPath = reply.getTransactionPath(); - remoteTransactionPaths.put(shardName, transactionContext); - } else { - throw new IllegalArgumentException(String.format( - "Invalid reply type {} for CreateTransaction", response.getClass())); - } - } catch (Exception e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); + LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath); + + ActorSelection transactionActor = actorContext.actorSelection(transactionPath); + + if (transactionType == TransactionType.READ_ONLY) { + // Add the actor to the remoteTransactionActors list for access by the + // cleanup PhantonReference. + remoteTransactionActors.add(transactionActor); + + // Write to the memory barrier volatile to publish the above update to the + // remoteTransactionActors list for thread visibility. + remoteTransactionActorsMB.set(true); } - remoteTransactionPaths - .put(shardName, new NoOpTransactionContext(shardName, e, identifier)); - } - } - public String getTransactionChainId() { - if(transactionChainProxy == null){ - return ""; + // TxActor is always created where the leader of the shard is. + // Check if TxActor is created in the same node + boolean isTxActorLocal = actorContext.isLocalPath(transactionPath); + + transactionContext = new TransactionContextImpl(transactionActor, identifier, + actorContext, schemaContext, isTxActorLocal); } - return transactionChainProxy.getTransactionChainId(); } - private interface TransactionContext { - String getShardName(); - void closeTransaction(); Future readyTransaction(); @@ -461,19 +695,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private static abstract class AbstractTransactionContext implements TransactionContext { protected final TransactionIdentifier identifier; - protected final String shardName; protected final List> recordedOperationFutures = Lists.newArrayList(); - AbstractTransactionContext(String shardName, TransactionIdentifier identifier) { - this.shardName = shardName; + AbstractTransactionContext(TransactionIdentifier identifier) { this.identifier = identifier; } - @Override - public String getShardName() { - return shardName; - } - @Override public List> getRecordedOperationFutures() { return recordedOperationFutures; @@ -485,15 +712,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final ActorContext actorContext; private final SchemaContext schemaContext; - private final String actorPath; private final ActorSelection actor; private final boolean isTxActorLocal; - private TransactionContextImpl(String shardName, String actorPath, - ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, - SchemaContext schemaContext, boolean isTxActorLocal) { - super(shardName, identifier); - this.actorPath = actorPath; + private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, + ActorContext actorContext, SchemaContext schemaContext, + boolean isTxActorLocal) { + super(identifier); this.actor = actor; this.actorContext = actorContext; this.schemaContext = schemaContext; @@ -506,18 +731,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void closeTransaction() { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} closeTransaction called", identifier); - } + LOG.debug("Tx {} closeTransaction called", identifier); + actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable()); } @Override public Future readyTransaction() { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", + LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", identifier, recordedOperationFutures.size()); - } + // Send the ReadyTransaction message to the Tx actor. ReadyTransaction readyTransaction = new ReadyTransaction(); @@ -540,13 +763,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Transform the combined Future into a Future that returns the cohort actor path from // the ReadyTransactionReply. That's the end result of the ready operation. - return combinedFutures.transform(new AbstractFunction1, ActorSelection>() { + return combinedFutures.transform(new Mapper, ActorSelection>() { @Override - public ActorSelection apply(Iterable notUsed) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded", + public ActorSelection checkedApply(Iterable notUsed) { + LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded", identifier); - } + // At this point all the Futures succeeded and we need to extract the cohort // actor path from the ReadyTransactionReply. For the recorded operations, they // don't return any data so we're only interested that they completed @@ -574,9 +796,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void deleteData(YangInstanceIdentifier path) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} deleteData called path = {}", identifier, path); - } + LOG.debug("Tx {} deleteData called path = {}", identifier, path); DeleteData deleteData = new DeleteData(path); recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), @@ -585,9 +805,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} mergeData called path = {}", identifier, path); - } + LOG.debug("Tx {} mergeData called path = {}", identifier, path); MergeData mergeData = new MergeData(path, data, schemaContext); recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), @@ -596,9 +814,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} writeData called path = {}", identifier, path); - } + LOG.debug("Tx {} writeData called path = {}", identifier, path); WriteData writeData = new WriteData(path, data, schemaContext); recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), @@ -609,9 +825,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture>, ReadFailedException> readData( final YangInstanceIdentifier path) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} readData called path = {}", identifier, path); - } + LOG.debug("Tx {} readData called path = {}", identifier, path); + final SettableFuture>> returnFuture = SettableFuture.create(); // If there were any previous recorded put/merge/delete operation reply Futures then we @@ -621,10 +836,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(recordedOperationFutures.isEmpty()) { finishReadData(path, returnFuture); } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} readData: verifying {} previous recorded operations", + LOG.debug("Tx {} readData: verifying {} previous recorded operations", identifier, recordedOperationFutures.size()); - } + // Note: we make a copy of recordedOperationFutures to be on the safe side in case // Futures#sequence accesses the passed List on a different thread, as // recordedOperationFutures is not synchronized. @@ -638,10 +852,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void onComplete(Throwable failure, Iterable notUsed) throws Throwable { if(failure != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} readData: a recorded operation failed: {}", + LOG.debug("Tx {} readData: a recorded operation failed: {}", identifier, failure); - } returnFuture.setException(new ReadFailedException( "The read could not be performed because a previous put, merge," + "or delete operation failed", failure)); @@ -660,23 +872,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private void finishReadData(final YangInstanceIdentifier path, final SettableFuture>> returnFuture) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} finishReadData called path = {}", identifier, path); - } + LOG.debug("Tx {} finishReadData called path = {}", identifier, path); + OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object readResponse) throws Throwable { if(failure != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} read operation failed: {}", identifier, failure); - } + LOG.debug("Tx {} read operation failed: {}", identifier, failure); returnFuture.setException(new ReadFailedException( "Error reading data for path " + path, failure)); } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} read operation succeeded", identifier, failure); - } + LOG.debug("Tx {} read operation succeeded", identifier, failure); if (readResponse instanceof ReadDataReply) { ReadDataReply reply = (ReadDataReply) readResponse; @@ -705,9 +912,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture dataExists( final YangInstanceIdentifier path) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} dataExists called path = {}", identifier, path); - } + LOG.debug("Tx {} dataExists called path = {}", identifier, path); + final SettableFuture returnFuture = SettableFuture.create(); // If there were any previous recorded put/merge/delete operation reply Futures then we @@ -718,10 +924,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(recordedOperationFutures.isEmpty()) { finishDataExists(path, returnFuture); } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} dataExists: verifying {} previous recorded operations", + LOG.debug("Tx {} dataExists: verifying {} previous recorded operations", identifier, recordedOperationFutures.size()); - } + // Note: we make a copy of recordedOperationFutures to be on the safe side in case // Futures#sequence accesses the passed List on a different thread, as // recordedOperationFutures is not synchronized. @@ -734,10 +939,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void onComplete(Throwable failure, Iterable notUsed) throws Throwable { if(failure != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} dataExists: a recorded operation failed: {}", + LOG.debug("Tx {} dataExists: a recorded operation failed: {}", identifier, failure); - } returnFuture.setException(new ReadFailedException( "The data exists could not be performed because a previous " + "put, merge, or delete operation failed", failure)); @@ -756,22 +959,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private void finishDataExists(final YangInstanceIdentifier path, final SettableFuture returnFuture) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} finishDataExists called path = {}", identifier, path); - } + LOG.debug("Tx {} finishDataExists called path = {}", identifier, path); + OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object response) throws Throwable { if(failure != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure); - } + LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure); returnFuture.setException(new ReadFailedException( "Error checking data exists for path " + path, failure)); } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} dataExists operation succeeded", identifier, failure); - } + LOG.debug("Tx {} dataExists operation succeeded", identifier, failure); if (response instanceof DataExistsReply) { returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists())); @@ -799,66 +997,51 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class); - private final Exception failure; + private final Throwable failure; - public NoOpTransactionContext(String shardName, Exception failure, - TransactionIdentifier identifier){ - super(shardName, identifier); + public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){ + super(identifier); this.failure = failure; } @Override public void closeTransaction() { - if(LOG.isDebugEnabled()) { - LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier); - } + LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier); } @Override public Future readyTransaction() { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} readyTransaction called", identifier); - } + LOG.debug("Tx {} readyTransaction called", identifier); return akka.dispatch.Futures.failed(failure); } @Override public void deleteData(YangInstanceIdentifier path) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} deleteData called path = {}", identifier, path); - } + LOG.debug("Tx {} deleteData called path = {}", identifier, path); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} mergeData called path = {}", identifier, path); - } + LOG.debug("Tx {} mergeData called path = {}", identifier, path); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} writeData called path = {}", identifier, path); - } + LOG.debug("Tx {} writeData called path = {}", identifier, path); } @Override public CheckedFuture>, ReadFailedException> readData( - YangInstanceIdentifier path) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} readData called path = {}", identifier, path); - } + YangInstanceIdentifier path) { + LOG.debug("Tx {} readData called path = {}", identifier, path); return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error reading data for path " + path, failure)); } @Override public CheckedFuture dataExists( - YangInstanceIdentifier path) { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} dataExists called path = {}", identifier, path); - } + YangInstanceIdentifier path) { + LOG.debug("Tx {} dataExists called path = {}", identifier, path); return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error checking exists for path " + path, failure)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NoShardLeaderException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NoShardLeaderException.java new file mode 100644 index 0000000000..b205b0fe78 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NoShardLeaderException.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.exceptions; + +/** + * Exception indicating a shard has no current leader. + * + * @author Thomas Pantelis + */ +public class NoShardLeaderException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public NoShardLeaderException(String message){ + super(message); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 0a1e80b0cb..e409168c85 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -14,13 +14,16 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.dispatch.Mapper; +import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; @@ -29,6 +32,7 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -50,25 +54,55 @@ public class ActorContext { private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class); - private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS); - public static final String MAILBOX = "bounded-mailbox"; + private static final Mapper FIND_PRIMARY_FAILURE_TRANSFORMER = + new Mapper() { + @Override + public Throwable apply(Throwable failure) { + Throwable actualFailure = failure; + if(failure instanceof AskTimeoutException) { + // A timeout exception most likely means the shard isn't initialized. + actualFailure = new NotInitializedException( + "Timed out trying to find the primary shard. Most likely cause is the " + + "shard is not initialized yet."); + } + + return actualFailure; + } + }; + private final ActorSystem actorSystem; private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; private final Configuration configuration; + private final DatastoreContext datastoreContext; private volatile SchemaContext schemaContext; - private FiniteDuration operationDuration = DEFAULT_OPER_DURATION; - private Timeout operationTimeout = new Timeout(operationDuration); + private final FiniteDuration operationDuration; + private final Timeout operationTimeout; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, - ClusterWrapper clusterWrapper, - Configuration configuration) { + ClusterWrapper clusterWrapper, Configuration configuration) { + this(actorSystem, shardManager, clusterWrapper, configuration, + DatastoreContext.newBuilder().build()); + } + + public ActorContext(ActorSystem actorSystem, ActorRef shardManager, + ClusterWrapper clusterWrapper, Configuration configuration, + DatastoreContext datastoreContext) { this.actorSystem = actorSystem; this.shardManager = shardManager; this.clusterWrapper = clusterWrapper; this.configuration = configuration; + this.datastoreContext = datastoreContext; + + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), + TimeUnit.SECONDS); + operationTimeout = new Timeout(operationDuration); + } + + public DatastoreContext getDatastoreContext() { + return datastoreContext; } public ActorSystem getActorSystem() { @@ -95,11 +129,6 @@ public class ActorContext { } } - public void setOperationTimeout(int timeoutInSeconds) { - operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS); - operationTimeout = new Timeout(operationDuration); - } - public SchemaContext getSchemaContext() { return schemaContext; } @@ -118,6 +147,34 @@ public class ActorContext { return Optional.of(actorSystem.actorSelection(path)); } + public Future findPrimaryShardAsync(final String shardName) { + Future future = executeOperationAsync(shardManager, + new FindPrimary(shardName, true).toSerializable(), + datastoreContext.getShardInitializationTimeout()); + + return future.transform(new Mapper() { + @Override + public ActorSelection checkedApply(Object response) throws Exception { + if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { + PrimaryFound found = PrimaryFound.fromSerializable(response); + + LOG.debug("Primary found {}", found.getPrimaryPath()); + return actorSystem.actorSelection(found.getPrimaryPath()); + } else if(response instanceof ActorNotInitialized) { + throw new NotInitializedException( + String.format("Found primary shard %s but it's not initialized yet. " + + "Please try again later", shardName)); + } else if(response instanceof PrimaryNotFound) { + throw new PrimaryNotFoundException( + String.format("No primary shard found for %S.", shardName)); + } + + throw new UnknownMessageException(String.format( + "FindPrimary returned unkown response: %s", response)); + } + }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher()); + } + /** * Finds a local shard given its shard name and return it's ActorRef * @@ -143,9 +200,9 @@ public class ActorContext { * * @param shardName the name of the local shard that needs to be found */ - public Future findLocalShardAsync( final String shardName, Timeout timeout) { + public Future findLocalShardAsync( final String shardName) { Future future = executeOperationAsync(shardManager, - new FindLocalShard(shardName, true), timeout); + new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout()); return future.map(new Mapper() { @Override @@ -238,15 +295,28 @@ public class ActorContext { * * @param actor the ActorSelection * @param message the message to send + * @param timeout the operation timeout * @return a Future containing the eventual result */ - public Future executeOperationAsync(ActorSelection actor, Object message) { + public Future executeOperationAsync(ActorSelection actor, Object message, + Timeout timeout) { Preconditions.checkArgument(actor != null, "actor must not be null"); Preconditions.checkArgument(message != null, "message must not be null"); LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString()); - return ask(actor, message, operationTimeout); + return ask(actor, message, timeout); + } + + /** + * Execute an operation on a remote actor asynchronously. + * + * @param actor the ActorSelection + * @param message the message to send + * @return a Future containing the eventual result + */ + public Future executeOperationAsync(ActorSelection actor, Object message) { + return executeOperationAsync(actor, message, operationTimeout); } /** diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index de33f55b96..a675b40718 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -54,6 +54,10 @@ public class DistributedConfigDataStoreProviderModule extends getValue().intValue()) .shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue()) .shardHeartbeatIntervalInMillis(props.getShardHearbeatIntervalInMillis().getValue()) + .shardInitializationTimeout(props.getShardInitializationTimeoutInSeconds().getValue(), + TimeUnit.SECONDS) + .shardLeaderElectionTimeout(props.getShardLeaderElectionTimeoutInSeconds().getValue(), + TimeUnit.SECONDS) .shardTransactionCommitTimeoutInSeconds( props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue()) .shardTransactionCommitQueueCapacity( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index ee1859d9ca..21cb7998a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -54,6 +54,10 @@ public class DistributedOperationalDataStoreProviderModule extends getValue().intValue()) .shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue()) .shardHeartbeatIntervalInMillis(props.getShardHearbeatIntervalInMillis().getValue()) + .shardInitializationTimeout(props.getShardInitializationTimeoutInSeconds().getValue(), + TimeUnit.SECONDS) + .shardLeaderElectionTimeout(props.getShardLeaderElectionTimeoutInSeconds().getValue(), + TimeUnit.SECONDS) .shardTransactionCommitTimeoutInSeconds( props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue()) .shardTransactionCommitQueueCapacity( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index 167d530d18..ef9da94887 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -121,6 +121,21 @@ module distributed-datastore-provider { description "The maximum allowed capacity for each shard's transaction commit queue."; } + leaf shard-initialization-timeout-in-seconds { + default 300; // 5 minutes + type non-zero-uint32-type; + description "The maximum amount of time to wait for a shard to initialize from persistence + on startup before failing an operation (eg transaction create and change + listener registration)."; + } + + leaf shard-leader-election-timeout-in-seconds { + default 30; + type non-zero-uint32-type; + description "The maximum amount of time to wait for a shard to elect a leader before failing + an operation (eg transaction create)."; + } + leaf enable-metric-capture { default false; type boolean; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index c27993f97b..9ac30095a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -204,8 +204,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { doReturn(mockActorSystem).when(actorContext).getActorSystem(); doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); - doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName), - any(Timeout.class)); + doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); doReturn(Futures.failed(new RuntimeException("mock"))). when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class)); @@ -228,12 +227,12 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( shardName, actorContext, mockListener); + doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); doReturn(getSystem()).when(actorContext).getActorSystem(); doReturn(getSystem().actorSelection(getRef().path())). when(actorContext).actorSelection(getRef().path()); doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); - doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName), - any(Timeout.class)); + doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); Answer> answer = new Answer>() { @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 1cc7ae8ad0..5a45a9961a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -4,11 +4,15 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; @@ -16,6 +20,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -26,10 +31,16 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; public class DistributedDataStoreIntegrationTest extends AbstractActorTest { + private final DatastoreContext.Builder datastoreContextBuilder = + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100); + @Test public void testWriteTransactionWithSingleShard() throws Exception{ System.setProperty("shard.persistent", "true"); @@ -72,7 +83,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); - // 5. Verify the data in the store + // Verify the data in the store DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); @@ -95,7 +106,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { DistributedDataStore dataStore = setupDistributedDataStore("testReadWriteTransaction", "test-1"); - // 1. Create a read-write Tx + // 1. Create a read-write Tx DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", readWriteTx); @@ -138,6 +149,383 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test + public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + String testName = "testTransactionWritesWithShardNotInitiallyReady"; + String shardName = "test-1"; + + // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't + // initialized until we create and submit the write the Tx. + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + // Create the write Tx + + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modification operations and ready the Tx on a separate thread. + + final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder( + TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, + TestModel.ID_QNAME, 1).build(); + + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder( + TestModel.OUTER_LIST_QNAME).build()); + + writeTx.write(listEntryPath, ImmutableNodes.mapEntry( + TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + + writeTx.delete(listEntryPath); + + txCohort.set(writeTx.ready()); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx ready", true, done); + + // At this point the Tx operations should be waiting for the shard to initialize so + // trigger the latch to let the shard recovery to continue. + + blockRecoveryLatch.countDown(); + + // Wait for the Tx commit to complete. + + assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS)); + txCohort.get().preCommit().get(5, TimeUnit.SECONDS); + txCohort.get().commit().get(5, TimeUnit.SECONDS); + + // Verify the data in the store + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + Optional> optional = readTx.read(TestModel.TEST_PATH). + get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + + optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); + + cleanup(dataStore); + }}; + } + + @Test + public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + String testName = "testTransactionReadsWithShardNotInitiallyReady"; + String shardName = "test-1"; + + // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't + // initialized until we create the Tx. + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + // Create the read-write Tx + + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + // Do some reads on the Tx on a separate thread. + + final AtomicReference> txExistsFuture = + new AtomicReference<>(); + final AtomicReference>, ReadFailedException>> + txReadFuture = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReadsDone = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + readWriteTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH)); + + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReadsDone.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx reads done", true, done); + + // At this point the Tx operations should be waiting for the shard to initialize so + // trigger the latch to let the shard recovery to continue. + + blockRecoveryLatch.countDown(); + + // Wait for the reads to complete and verify. + + assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS)); + assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent()); + + readWriteTx.close(); + + cleanup(dataStore); + }}; + } + + @Test(expected=NotInitializedException.class) + public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{ + new IntegrationTestKit(getSystem()) {{ + String testName = "testTransactionCommitFailureWithShardNotInitialized"; + String shardName = "test-1"; + + // Set the shard initialization timeout low for the test. + + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); + + // Setup the InMemoryJournal to block shard recovery indefinitely. + + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + // Create the write Tx + + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modifications and ready the Tx on a separate thread. + + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txCohort.set(writeTx.ready()); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx ready", true, done); + + // Wait for the commit to complete. Since the shard never initialized, the Tx should + // have timed out and throw an appropriate exception cause. + + try { + txCohort.get().canCommit().get(5, TimeUnit.SECONDS); + } catch(ExecutionException e) { + throw e.getCause(); + } finally { + blockRecoveryLatch.countDown(); + cleanup(dataStore); + } + }}; + } + + @Test(expected=NotInitializedException.class) + public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{ + new IntegrationTestKit(getSystem()) {{ + String testName = "testTransactionReadFailureWithShardNotInitialized"; + String shardName = "test-1"; + + // Set the shard initialization timeout low for the test. + + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); + + // Setup the InMemoryJournal to block shard recovery indefinitely. + + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + // Create the read-write Tx + + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + // Do a read on the Tx on a separate thread. + + final AtomicReference>, ReadFailedException>> + txReadFuture = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReadDone = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + readWriteTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + + readWriteTx.close(); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReadDone.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx read done", true, done); + + // Wait for the read to complete. Since the shard never initialized, the Tx should + // have timed out and throw an appropriate exception cause. + + try { + txReadFuture.get().checkedGet(5, TimeUnit.SECONDS); + } catch(ReadFailedException e) { + throw e.getCause(); + } finally { + blockRecoveryLatch.countDown(); + cleanup(dataStore); + } + }}; + } + + @Test(expected=NoShardLeaderException.class) + public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{ + new IntegrationTestKit(getSystem()) {{ + String testName = "testTransactionCommitFailureWithNoShardLeader"; + String shardName = "test-1"; + + // We don't want the shard to become the leader so prevent shard election from completing + // by setting the election timeout, which is based on the heartbeat interval, really high. + + datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000); + + // Set the leader election timeout low for the test. + + datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS); + + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + // Create the write Tx. + + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modifications and ready the Tx on a separate thread. + + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txCohort.set(writeTx.ready()); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx ready", true, done); + + // Wait for the commit to complete. Since no shard leader was elected in time, the Tx + // should have timed out and throw an appropriate exception cause. + + try { + txCohort.get().canCommit().get(5, TimeUnit.SECONDS); + } catch(ExecutionException e) { + throw e.getCause(); + } finally { + cleanup(dataStore); + } + }}; + } + @Test public void testTransactionAbort() throws Exception{ System.setProperty("shard.persistent", "true"); @@ -253,31 +641,37 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) { + return setupDistributedDataStore(typeName, true, shardNames); + } + + DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader, + String... shardNames) { MockClusterWrapper cluster = new MockClusterWrapper(); Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); ShardStrategyFactory.setConfiguration(config); - DatastoreContext datastoreContext = DatastoreContext.newBuilder().build(); + DatastoreContext datastoreContext = datastoreContextBuilder.build(); DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster, config, datastoreContext); SchemaContext schemaContext = SchemaContextHelper.full(); dataStore.onGlobalContextUpdated(schemaContext); - for(String shardName: shardNames) { - ActorRef shard = null; - for(int i = 0; i < 20 * 5 && shard == null; i++) { - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - Optional shardReply = dataStore.getActorContext().findLocalShard(shardName); - if(shardReply.isPresent()) { - shard = shardReply.get(); + if(waitUntilLeader) { + for(String shardName: shardNames) { + ActorRef shard = null; + for(int i = 0; i < 20 * 5 && shard == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + Optional shardReply = dataStore.getActorContext().findLocalShard(shardName); + if(shardReply.isPresent()) { + shard = shardReply.get(); + } } - } - assertNotNull("Shard was not created", shard); + assertNotNull("Shard was not created", shard); - System.out.println("!!!!!!shard: "+shard.path().toString()); - waitUntilLeader(shard); + waitUntilLeader(shard); + } } return dataStore; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index f2b849122a..9e0bba48c1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -5,9 +5,15 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; +import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mock; @@ -44,6 +50,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -54,7 +61,6 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.isA; import static org.mockito.Mockito.times; @@ -64,7 +70,7 @@ import static org.opendaylight.controller.cluster.datastore.TransactionProxy.Tra import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; @SuppressWarnings("resource") -public class TransactionProxyTest extends AbstractActorTest { +public class TransactionProxyTest { @SuppressWarnings("serial") static class TestException extends RuntimeException { @@ -74,6 +80,8 @@ public class TransactionProxyTest extends AbstractActorTest { CheckedFuture invoke(TransactionProxy proxy) throws Exception; } + private static ActorSystem system; + private final Configuration configuration = new MockConfiguration(); @Mock @@ -86,20 +94,44 @@ public class TransactionProxyTest extends AbstractActorTest { String memberName = "mock-member"; + @BeforeClass + public static void setUpClass() throws IOException { + + Config config = ConfigFactory.parseMap(ImmutableMap.builder(). + put("akka.actor.default-dispatcher.type", + "akka.testkit.CallingThreadDispatcherConfigurator").build()). + withFallback(ConfigFactory.load()); + system = ActorSystem.create("test", config); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + @Before public void setUp(){ MockitoAnnotations.initMocks(this); schemaContext = TestModel.createTestContext(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build(); + doReturn(getSystem()).when(mockActorContext).getActorSystem(); doReturn(memberName).when(mockActorContext).getCurrentMemberName(); doReturn(schemaContext).when(mockActorContext).getSchemaContext(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); + doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); + doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext(); ShardStrategyFactory.setConfiguration(configuration); } + private ActorSystem getSystem() { + return system; + } + private CreateTransaction eqCreateTransaction(final String memberName, final TransactionType type) { ArgumentMatcher matcher = new ArgumentMatcher() { @@ -317,11 +349,11 @@ public class TransactionProxyTest extends AbstractActorTest { doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); - doReturn(Optional.of(actorSystem.actorSelection(actorRef.path()))). - when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD)); + doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - doReturn(createTransactionReply(actorRef)).when(mockActorContext). - executeOperation(eq(actorSystem.actorSelection(actorRef.path())), + doReturn(Futures.successful(createTransactionReply(actorRef))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), eqCreateTransaction(memberName, type)); doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString()); @@ -336,6 +368,7 @@ public class TransactionProxyTest extends AbstractActorTest { future.checkedGet(5, TimeUnit.SECONDS); fail("Expected ReadFailedException"); } catch(ReadFailedException e) { + e.printStackTrace(); throw e.getCause(); } } @@ -372,7 +405,7 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeOperationAsync(any(ActorSelection.class), any()); + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -385,7 +418,7 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(any(ActorSelection.class), any()); + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -398,12 +431,14 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); if (exToThrow instanceof PrimaryNotFoundException) { - doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString()); + doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); } else { - doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))). - when(mockActorContext).findPrimaryShard(anyString()); + doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(anyString()); } - doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any()); + + doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -499,6 +534,24 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.read(TestModel.TEST_PATH); } + @Test(expected=IllegalArgumentException.class) + public void testInvalidCreateTransactionReply() throws Throwable { + ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + + doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext). + actorSelection(actorRef.path().toString()); + + doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync( + eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); + } + @Test public void testExists() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); @@ -536,7 +589,7 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeOperationAsync(any(ActorSelection.class), any()); + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -549,7 +602,7 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(any(ActorSelection.class), any()); + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -693,8 +746,7 @@ public class TransactionProxyTest extends AbstractActorTest { doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); @@ -815,10 +867,10 @@ public class TransactionProxyTest extends AbstractActorTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + verifyCohortFutures(proxy, TestException.class); + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), MergeDataReply.SERIALIZABLE_CLASS, TestException.class); - - verifyCohortFutures(proxy, TestException.class); } @SuppressWarnings("unchecked") @@ -855,9 +907,8 @@ public class TransactionProxyTest extends AbstractActorTest { @Test public void testReadyWithInitialCreateTransactionFailure() throws Exception { - doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString()); -// doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation( -// anyString(), any()); + doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when( + mockActorContext).findPrimaryShardAsync(anyString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -958,8 +1009,8 @@ public class TransactionProxyTest extends AbstractActorTest { doReturn(actorSystem.actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); - doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))). - when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD)); + doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() @@ -967,8 +1018,8 @@ public class TransactionProxyTest extends AbstractActorTest { .setTransactionActorPath(actorPath) .build(); - doReturn(createTransactionReply).when(mockActorContext). - executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())), + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_ONLY)); doReturn(true).when(mockActorContext).isLocalPath(actorPath); @@ -1013,8 +1064,8 @@ public class TransactionProxyTest extends AbstractActorTest { doReturn(actorSystem.actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); - doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))). - when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD)); + doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() @@ -1022,8 +1073,8 @@ public class TransactionProxyTest extends AbstractActorTest { .setTransactionActorPath(actorPath) .build(); - doReturn(createTransactionReply).when(mockActorContext). - executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())), + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, WRITE_ONLY)); doReturn(true).when(mockActorContext).isLocalPath(actorPath); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java index 3486753082..f340d1c305 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemoryJournal.java @@ -32,6 +32,8 @@ public class InMemoryJournal extends AsyncWriteJournal { private static final Map deleteMessagesCompleteLatches = new ConcurrentHashMap<>(); + private static final Map blockReadMessagesLatches = new ConcurrentHashMap<>(); + public static void addEntry(String persistenceId, long sequenceNr, Object data) { Map journal = journals.get(persistenceId); if(journal == null) { @@ -62,12 +64,21 @@ public class InMemoryJournal extends AsyncWriteJournal { deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1)); } + public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) { + blockReadMessagesLatches.put(persistenceId, latch); + } + @Override public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max, final Procedure replayCallback) { return Futures.future(new Callable() { @Override public Void call() throws Exception { + CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId); + if(blockLatch != null) { + Uninterruptibles.awaitUninterruptibly(blockLatch); + } + Map journal = journals.get(persistenceId); if(journal == null) { return null; -- 2.36.6