package org.opendaylight.controller.cluster.datastore;
-import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.dispatch.OnComplete;
-import akka.util.Timeout;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
- public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
-
private volatile ActorSelection listenerRegistrationActor;
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
private ActorRef dataChangeListenerActor;
dataChangeListenerActor = actorContext.getActorSystem().actorOf(
DataChangeListener.props(listener));
- Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
+ Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
findFuture.onComplete(new OnComplete<ActorRef>() {
@Override
public void onComplete(Throwable failure, ActorRef shard) {
Future<Object> future = actorContext.executeOperationAsync(shard,
new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
- REGISTER_TIMEOUT);
+ actorContext.getDatastoreContext().getShardInitializationTimeout());
future.onComplete(new OnComplete<Object>(){
@Override
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import akka.util.Timeout;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
private final ConfigParams shardRaftConfig;
private final int shardTransactionCommitTimeoutInSeconds;
private final int shardTransactionCommitQueueCapacity;
+ private final Timeout shardInitializationTimeout;
+ private final Timeout shardLeaderElectionTimeout;
private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
- int shardTransactionCommitQueueCapacity) {
+ int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout,
+ Timeout shardLeaderElectionTimeout) {
this.dataStoreProperties = dataStoreProperties;
this.shardRaftConfig = shardRaftConfig;
this.dataStoreMXBeanType = dataStoreMXBeanType;
this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
+ this.shardInitializationTimeout = shardInitializationTimeout;
+ this.shardLeaderElectionTimeout = shardLeaderElectionTimeout;
}
public static Builder newBuilder() {
return shardTransactionCommitQueueCapacity;
}
+ public Timeout getShardInitializationTimeout() {
+ return shardInitializationTimeout;
+ }
+
+ public Timeout getShardLeaderElectionTimeout() {
+ return shardLeaderElectionTimeout;
+ }
+
public static class Builder {
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
private int shardSnapshotBatchCount = 20000;
private int shardHeartbeatIntervalInMillis = 500;
private int shardTransactionCommitQueueCapacity = 20000;
+ private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES);
+ private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
+ public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
+ this.shardInitializationTimeout = new Timeout(timeout, unit);
+ return this;
+ }
+
+ public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
+ this.shardLeaderElectionTimeout = new Timeout(timeout, unit);
+ return this;
+ }
+
public DatastoreContext build() {
DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
operationTimeoutInSeconds, shardTransactionIdleTimeout,
- shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity);
+ shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity,
+ shardInitializationTimeout, shardLeaderElectionTimeout);
}
}
}
actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
ShardManager.props(type, cluster, configuration, datastoreContext)
- .withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
-
- actorContext.setOperationTimeout(datastoreContext.getOperationTimeoutInSeconds());
+ .withMailbox(ActorContext.MAILBOX), shardManagerId ),
+ cluster, configuration, datastoreContext);
}
public DistributedDataStore(ActorContext actorContext) {
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
"Could not find shard leader so transaction cannot be created. This typically happens" +
- " when system is coming up or recovering and a leader is being elected. Try again" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
" later.")), getSelf());
}
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
+import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.FinalizablePhantomReference;
import com.google.common.base.FinalizableReferenceQueue;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
-
-import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import scala.Function1;
import scala.concurrent.Future;
-import scala.runtime.AbstractFunction1;
-
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
/**
* TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
*/
public class TransactionProxy implements DOMStoreReadWriteTransaction {
- private final TransactionChainProxy transactionChainProxy;
-
-
-
- public enum TransactionType {
+ public static enum TransactionType {
READ_ONLY,
WRITE_ONLY,
READ_WRITE
}
- static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
- Throwable, Throwable>() {
+ static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
+ new Mapper<Throwable, Throwable>() {
@Override
public Throwable apply(Throwable failure) {
return failure;
private static final AtomicLong counter = new AtomicLong();
- private static final Logger
- LOG = LoggerFactory.getLogger(TransactionProxy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
+ /**
+ * Time interval in between transaction create retries.
+ */
+ private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
+ FiniteDuration.create(1, TimeUnit.SECONDS);
/**
* Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
for(ActorSelection actor : remoteTransactionActors) {
LOG.trace("Sending CloseTransaction to {}", actor);
actorContext.sendOperationAsync(actor,
- new CloseTransaction().toSerializable());
+ new CloseTransaction().toSerializable());
}
}
}
private List<ActorSelection> remoteTransactionActors;
private AtomicBoolean remoteTransactionActorsMB;
- private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
+ /**
+ * Stores the create transaction results per shard.
+ */
+ private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
private final TransactionType transactionType;
private final ActorContext actorContext;
private final TransactionIdentifier identifier;
+ private final TransactionChainProxy transactionChainProxy;
private final SchemaContext schemaContext;
private boolean inReadyState;
this(actorContext, transactionType, null);
}
- @VisibleForTesting
- List<Future<Object>> getRecordedOperationFutures() {
- List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
- for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
- }
-
- return recordedOperationFutures;
- }
-
- public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
+ public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
+ TransactionChainProxy transactionChainProxy) {
this.actorContext = Preconditions.checkNotNull(actorContext,
"actorContext should not be null");
this.transactionType = Preconditions.checkNotNull(transactionType,
new TransactionProxyCleanupPhantomReference(this);
phantomReferenceCache.put(cleanup, cleanup);
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Created txn {} of type {}", identifier, transactionType);
+
+ LOG.debug("Created txn {} of type {}", identifier, transactionType);
+ }
+
+ @VisibleForTesting
+ List<Future<Object>> getRecordedOperationFutures() {
+ List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+ }
}
+
+ return recordedOperationFutures;
}
@Override
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Read operation on write-only transaction is not allowed");
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} read {}", identifier, path);
+ LOG.debug("Tx {} read {}", identifier, path);
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future;
+ if(transactionContext != null) {
+ future = transactionContext.readData(path);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ Futures.addCallback(transactionContext.readData(path),
+ new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+ @Override
+ public void onSuccess(Optional<NormalizedNode<?, ?>> data) {
+ proxyFuture.set(data);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ proxyFuture.setException(t);
+ }
+ });
+ }
+ });
+
+ future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
}
- createTransactionIfMissing(actorContext, path);
- return transactionContext(path).readData(path);
+ return future;
}
@Override
- public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
+ public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Exists operation on write-only transaction is not allowed");
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} exists {}", identifier, path);
+ LOG.debug("Tx {} exists {}", identifier, path);
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+
+ CheckedFuture<Boolean, ReadFailedException> future;
+ if(transactionContext != null) {
+ future = transactionContext.dataExists(path);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ Futures.addCallback(transactionContext.dataExists(path),
+ new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean exists) {
+ proxyFuture.set(exists);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ proxyFuture.setException(t);
+ }
+ });
+ }
+ });
+
+ future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
}
- createTransactionIfMissing(actorContext, path);
- return transactionContext(path).dataExists(path);
+ return future;
}
private void checkModificationState() {
}
@Override
- public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
checkModificationState();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} write {}", identifier, path);
- }
- createTransactionIfMissing(actorContext, path);
+ LOG.debug("Tx {} write {}", identifier, path);
- transactionContext(path).writeData(path, data);
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ transactionContext.writeData(path, data);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.writeData(path, data);
+ }
+ });
+ }
}
@Override
- public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
checkModificationState();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} merge {}", identifier, path);
- }
- createTransactionIfMissing(actorContext, path);
+ LOG.debug("Tx {} merge {}", identifier, path);
- transactionContext(path).mergeData(path, data);
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ transactionContext.mergeData(path, data);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.mergeData(path, data);
+ }
+ });
+ }
}
@Override
- public void delete(YangInstanceIdentifier path) {
+ public void delete(final YangInstanceIdentifier path) {
checkModificationState();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} delete {}", identifier, path);
- }
- createTransactionIfMissing(actorContext, path);
- transactionContext(path).deleteData(path);
+ LOG.debug("Tx {} delete {}", identifier, path);
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ transactionContext.deleteData(path);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.deleteData(path);
+ }
+ });
+ }
}
@Override
inReadyState = true;
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
- remoteTransactionPaths.size());
- }
+ LOG.debug("Tx {} Readying {} transactions for commit", identifier,
+ txFutureCallbackMap.size());
+
List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
- for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+
+ LOG.debug("Tx {} Readying transaction for shard {}", identifier,
+ txFutureCallback.getShardName());
+
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ cohortFutures.add(transactionContext.readyTransaction());
+ } else {
+ // The shard Tx hasn't been created yet so create a promise to ready the Tx later
+ // after it's created.
+ final Promise<ActorSelection> cohortPromise = akka.dispatch.Futures.promise();
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ cohortPromise.completeWith(transactionContext.readyTransaction());
+ }
+ });
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} Readying transaction for shard {}", identifier,
- transactionContext.getShardName());
+ cohortFutures.add(cohortPromise.future());
}
- cohortFutures.add(transactionContext.readyTransaction());
}
if(transactionChainProxy != null){
@Override
public void close() {
- for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- transactionContext.closeTransaction();
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ transactionContext.closeTransaction();
+ } else {
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.closeTransaction();
+ }
+ });
+ }
}
- remoteTransactionPaths.clear();
+ txFutureCallbackMap.clear();
if(transactionType == TransactionType.READ_ONLY) {
remoteTransactionActors.clear();
}
}
- private TransactionContext transactionContext(YangInstanceIdentifier path){
+ private String shardNameFromIdentifier(YangInstanceIdentifier path){
+ return ShardStrategyFactory.getStrategy(path).findShard(path);
+ }
+
+ private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
String shardName = shardNameFromIdentifier(path);
- return remoteTransactionPaths.get(shardName);
+ TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
+ if(txFutureCallback == null) {
+ Future<ActorSelection> findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName);
+
+ final TransactionFutureCallback newTxFutureCallback =
+ new TransactionFutureCallback(shardName);
+
+ txFutureCallback = newTxFutureCallback;
+ txFutureCallbackMap.put(shardName, txFutureCallback);
+
+ findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ @Override
+ public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ if(failure != null) {
+ newTxFutureCallback.onComplete(failure, null);
+ } else {
+ newTxFutureCallback.setPrimaryShard(primaryShard);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ }
+
+ return txFutureCallback;
}
- private String shardNameFromIdentifier(YangInstanceIdentifier path){
- return ShardStrategyFactory.getStrategy(path).findShard(path);
+ public String getTransactionChainId() {
+ if(transactionChainProxy == null){
+ return "";
+ }
+ return transactionChainProxy.getTransactionChainId();
}
- private void createTransactionIfMissing(ActorContext actorContext,
- YangInstanceIdentifier path) {
+ /**
+ * Interface for a transaction operation to be invoked later.
+ */
+ private static interface TransactionOperation {
+ void invoke(TransactionContext transactionContext);
+ }
- if(transactionChainProxy != null){
- transactionChainProxy.waitTillCurrentTransactionReady();
+ /**
+ * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
+ * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
+ * retry task after a short delay.
+ * <p>
+ * The end result from a completed CreateTransaction message is a TransactionContext that is
+ * used to perform transaction operations. Transaction operations that occur before the
+ * CreateTransaction completes are cache and executed once the CreateTransaction completes,
+ * successfully or not.
+ */
+ private class TransactionFutureCallback extends OnComplete<Object> {
+
+ /**
+ * The list of transaction operations to execute once the CreateTransaction completes.
+ */
+ @GuardedBy("txOperationsOnComplete")
+ private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
+
+ /**
+ * The TransactionContext resulting from the CreateTransaction reply.
+ */
+ private volatile TransactionContext transactionContext;
+
+ /**
+ * The target primary shard.
+ */
+ private volatile ActorSelection primaryShard;
+
+ private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
+ getShardLeaderElectionTimeout().duration().toMillis() /
+ CREATE_TX_TRY_INTERVAL.toMillis());
+
+ private final String shardName;
+
+ TransactionFutureCallback(String shardName) {
+ this.shardName = shardName;
}
- String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
+ String getShardName() {
+ return shardName;
+ }
- TransactionContext transactionContext =
- remoteTransactionPaths.get(shardName);
+ TransactionContext getTransactionContext() {
+ return transactionContext;
+ }
- if (transactionContext != null) {
- // A transaction already exists with that shard
- return;
+
+ /**
+ * Sets the target primary shard and initiates a CreateTransaction try.
+ */
+ void setPrimaryShard(ActorSelection primaryShard) {
+ LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
+
+ this.primaryShard = primaryShard;
+ tryCreateTransaction();
}
- try {
- Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
- if (!primaryShard.isPresent()) {
- throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
+ /**
+ * Adds a TransactionOperation to be executed after the CreateTransaction completes.
+ */
+ void addTxOperationOnComplete(TransactionOperation operation) {
+ synchronized(txOperationsOnComplete) {
+ if(transactionContext == null) {
+ LOG.debug("Tx {} Adding operation on complete {}", identifier);
+
+ txOperationsOnComplete.add(operation);
+ } else {
+ operation.invoke(transactionContext);
+ }
}
+ }
- Object response = actorContext.executeOperation(primaryShard.get(),
- new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
- getTransactionChainId()).toSerializable());
- if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
- CreateTransactionReply reply =
- CreateTransactionReply.fromSerializable(response);
+ /**
+ * Performs a CreateTransaction try async.
+ */
+ private void tryCreateTransaction() {
+ Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
+ new CreateTransaction(identifier.toString(),
+ TransactionProxy.this.transactionType.ordinal(),
+ getTransactionChainId()).toSerializable());
- String transactionPath = reply.getTransactionPath();
+ createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher());
+ }
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if(failure instanceof NoShardLeaderException) {
+ // There's no leader for the shard yet - schedule and try again, unless we're out
+ // of retries. Note: createTxTries is volatile as it may be written by different
+ // threads however not concurrently, therefore decrementing it non-atomically here
+ // is ok.
+ if(--createTxTries > 0) {
+ LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
+ identifier, shardName);
+
+ actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
+ new Runnable() {
+ @Override
+ public void run() {
+ tryCreateTransaction();
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ return;
}
- ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+ }
- if (transactionType == TransactionType.READ_ONLY) {
- // Add the actor to the remoteTransactionActors list for access by the
- // cleanup PhantonReference.
- remoteTransactionActors.add(transactionActor);
+ // Create the TransactionContext from the response or failure and execute delayed
+ // TransactionOperations. This entire section is done atomically (ie synchronized) with
+ // respect to #addTxOperationOnComplete to handle timing issues and ensure no
+ // TransactionOperation is missed and that they are processed in the order they occurred.
+ synchronized(txOperationsOnComplete) {
+ if(failure != null) {
+ LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
+ failure.getMessage());
+
+ transactionContext = new NoOpTransactionContext(failure, identifier);
+ } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+ createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
+ } else {
+ IllegalArgumentException exception = new IllegalArgumentException(String.format(
+ "Invalid reply type %s for CreateTransaction", response.getClass()));
+
+ transactionContext = new NoOpTransactionContext(exception, identifier);
+ }
- // Write to the memory barrier volatile to publish the above update to the
- // remoteTransactionActors list for thread visibility.
- remoteTransactionActorsMB.set(true);
+ for(TransactionOperation oper: txOperationsOnComplete) {
+ oper.invoke(transactionContext);
}
- // TxActor is always created where the leader of the shard is.
- // Check if TxActor is created in the same node
- boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
+ txOperationsOnComplete.clear();
+ }
+ }
- transactionContext = new TransactionContextImpl(shardName, transactionPath,
- transactionActor, identifier, actorContext, schemaContext, isTxActorLocal);
+ private void createValidTransactionContext(CreateTransactionReply reply) {
+ String transactionPath = reply.getTransactionPath();
- remoteTransactionPaths.put(shardName, transactionContext);
- } else {
- throw new IllegalArgumentException(String.format(
- "Invalid reply type {} for CreateTransaction", response.getClass()));
- }
- } catch (Exception e) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+ LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
+
+ ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+
+ if (transactionType == TransactionType.READ_ONLY) {
+ // Add the actor to the remoteTransactionActors list for access by the
+ // cleanup PhantonReference.
+ remoteTransactionActors.add(transactionActor);
+
+ // Write to the memory barrier volatile to publish the above update to the
+ // remoteTransactionActors list for thread visibility.
+ remoteTransactionActorsMB.set(true);
}
- remoteTransactionPaths
- .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
- }
- }
- public String getTransactionChainId() {
- if(transactionChainProxy == null){
- return "";
+ // TxActor is always created where the leader of the shard is.
+ // Check if TxActor is created in the same node
+ boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
+
+ transactionContext = new TransactionContextImpl(transactionActor, identifier,
+ actorContext, schemaContext, isTxActorLocal);
}
- return transactionChainProxy.getTransactionChainId();
}
-
private interface TransactionContext {
- String getShardName();
-
void closeTransaction();
Future<ActorSelection> readyTransaction();
private static abstract class AbstractTransactionContext implements TransactionContext {
protected final TransactionIdentifier identifier;
- protected final String shardName;
protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
- AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
- this.shardName = shardName;
+ AbstractTransactionContext(TransactionIdentifier identifier) {
this.identifier = identifier;
}
- @Override
- public String getShardName() {
- return shardName;
- }
-
@Override
public List<Future<Object>> getRecordedOperationFutures() {
return recordedOperationFutures;
private final ActorContext actorContext;
private final SchemaContext schemaContext;
- private final String actorPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
- private TransactionContextImpl(String shardName, String actorPath,
- ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
- SchemaContext schemaContext, boolean isTxActorLocal) {
- super(shardName, identifier);
- this.actorPath = actorPath;
+ private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ ActorContext actorContext, SchemaContext schemaContext,
+ boolean isTxActorLocal) {
+ super(identifier);
this.actor = actor;
this.actorContext = actorContext;
this.schemaContext = schemaContext;
@Override
public void closeTransaction() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} closeTransaction called", identifier);
- }
+ LOG.debug("Tx {} closeTransaction called", identifier);
+
actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
}
@Override
public Future<ActorSelection> readyTransaction() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
identifier, recordedOperationFutures.size());
- }
+
// Send the ReadyTransaction message to the Tx actor.
ReadyTransaction readyTransaction = new ReadyTransaction();
// Transform the combined Future into a Future that returns the cohort actor path from
// the ReadyTransactionReply. That's the end result of the ready operation.
- return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorSelection>() {
+ return combinedFutures.transform(new Mapper<Iterable<Object>, ActorSelection>() {
@Override
- public ActorSelection apply(Iterable<Object> notUsed) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+ public ActorSelection checkedApply(Iterable<Object> notUsed) {
+ LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
identifier);
- }
+
// At this point all the Futures succeeded and we need to extract the cohort
// actor path from the ReadyTransactionReply. For the recorded operations, they
// don't return any data so we're only interested that they completed
@Override
public void deleteData(YangInstanceIdentifier path) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} deleteData called path = {}", identifier, path);
- }
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
DeleteData deleteData = new DeleteData(path);
recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} mergeData called path = {}", identifier, path);
- }
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
MergeData mergeData = new MergeData(path, data, schemaContext);
recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} writeData called path = {}", identifier, path);
- }
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
WriteData writeData = new WriteData(path, data, schemaContext);
recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
final YangInstanceIdentifier path) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readData called path = {}", identifier, path);
- }
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
+
final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
// If there were any previous recorded put/merge/delete operation reply Futures then we
if(recordedOperationFutures.isEmpty()) {
finishReadData(path, returnFuture);
} else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+ LOG.debug("Tx {} readData: verifying {} previous recorded operations",
identifier, recordedOperationFutures.size());
- }
+
// Note: we make a copy of recordedOperationFutures to be on the safe side in case
// Futures#sequence accesses the passed List on a different thread, as
// recordedOperationFutures is not synchronized.
public void onComplete(Throwable failure, Iterable<Object> notUsed)
throws Throwable {
if(failure != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readData: a recorded operation failed: {}",
+ LOG.debug("Tx {} readData: a recorded operation failed: {}",
identifier, failure);
- }
returnFuture.setException(new ReadFailedException(
"The read could not be performed because a previous put, merge,"
+ "or delete operation failed", failure));
private void finishReadData(final YangInstanceIdentifier path,
final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
- }
+ LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object readResponse) throws Throwable {
if(failure != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} read operation failed: {}", identifier, failure);
- }
+ LOG.debug("Tx {} read operation failed: {}", identifier, failure);
returnFuture.setException(new ReadFailedException(
"Error reading data for path " + path, failure));
} else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} read operation succeeded", identifier, failure);
- }
+ LOG.debug("Tx {} read operation succeeded", identifier, failure);
if (readResponse instanceof ReadDataReply) {
ReadDataReply reply = (ReadDataReply) readResponse;
public CheckedFuture<Boolean, ReadFailedException> dataExists(
final YangInstanceIdentifier path) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} dataExists called path = {}", identifier, path);
- }
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+
final SettableFuture<Boolean> returnFuture = SettableFuture.create();
// If there were any previous recorded put/merge/delete operation reply Futures then we
if(recordedOperationFutures.isEmpty()) {
finishDataExists(path, returnFuture);
} else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+ LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
identifier, recordedOperationFutures.size());
- }
+
// Note: we make a copy of recordedOperationFutures to be on the safe side in case
// Futures#sequence accesses the passed List on a different thread, as
// recordedOperationFutures is not synchronized.
public void onComplete(Throwable failure, Iterable<Object> notUsed)
throws Throwable {
if(failure != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+ LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
identifier, failure);
- }
returnFuture.setException(new ReadFailedException(
"The data exists could not be performed because a previous "
+ "put, merge, or delete operation failed", failure));
private void finishDataExists(final YangInstanceIdentifier path,
final SettableFuture<Boolean> returnFuture) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
- }
+ LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
if(failure != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
- }
+ LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
returnFuture.setException(new ReadFailedException(
"Error checking data exists for path " + path, failure));
} else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
- }
+ LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
if (response instanceof DataExistsReply) {
returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
- private final Exception failure;
+ private final Throwable failure;
- public NoOpTransactionContext(String shardName, Exception failure,
- TransactionIdentifier identifier){
- super(shardName, identifier);
+ public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
+ super(identifier);
this.failure = failure;
}
@Override
public void closeTransaction() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
- }
+ LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
}
@Override
public Future<ActorSelection> readyTransaction() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readyTransaction called", identifier);
- }
+ LOG.debug("Tx {} readyTransaction called", identifier);
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} deleteData called path = {}", identifier, path);
- }
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} mergeData called path = {}", identifier, path);
- }
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} writeData called path = {}", identifier, path);
- }
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
- YangInstanceIdentifier path) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readData called path = {}", identifier, path);
- }
+ YangInstanceIdentifier path) {
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error reading data for path " + path, failure));
}
@Override
public CheckedFuture<Boolean, ReadFailedException> dataExists(
- YangInstanceIdentifier path) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} dataExists called path = {}", identifier, path);
- }
+ YangInstanceIdentifier path) {
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error checking exists for path " + path, failure));
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+/**
+ * Exception indicating a shard has no current leader.
+ *
+ * @author Thomas Pantelis
+ */
+public class NoShardLeaderException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public NoShardLeaderException(String message){
+ super(message);
+ }
+}
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.dispatch.Mapper;
+import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private static final Logger
LOG = LoggerFactory.getLogger(ActorContext.class);
- private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
-
public static final String MAILBOX = "bounded-mailbox";
+ private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
+ new Mapper<Throwable, Throwable>() {
+ @Override
+ public Throwable apply(Throwable failure) {
+ Throwable actualFailure = failure;
+ if(failure instanceof AskTimeoutException) {
+ // A timeout exception most likely means the shard isn't initialized.
+ actualFailure = new NotInitializedException(
+ "Timed out trying to find the primary shard. Most likely cause is the " +
+ "shard is not initialized yet.");
+ }
+
+ return actualFailure;
+ }
+ };
+
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
+ private final DatastoreContext datastoreContext;
private volatile SchemaContext schemaContext;
- private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
- private Timeout operationTimeout = new Timeout(operationDuration);
+ private final FiniteDuration operationDuration;
+ private final Timeout operationTimeout;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
- ClusterWrapper clusterWrapper,
- Configuration configuration) {
+ ClusterWrapper clusterWrapper, Configuration configuration) {
+ this(actorSystem, shardManager, clusterWrapper, configuration,
+ DatastoreContext.newBuilder().build());
+ }
+
+ public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
+ ClusterWrapper clusterWrapper, Configuration configuration,
+ DatastoreContext datastoreContext) {
this.actorSystem = actorSystem;
this.shardManager = shardManager;
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
+ this.datastoreContext = datastoreContext;
+
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
+ TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+ }
+
+ public DatastoreContext getDatastoreContext() {
+ return datastoreContext;
}
public ActorSystem getActorSystem() {
}
}
- public void setOperationTimeout(int timeoutInSeconds) {
- operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
- operationTimeout = new Timeout(operationDuration);
- }
-
public SchemaContext getSchemaContext() {
return schemaContext;
}
return Optional.of(actorSystem.actorSelection(path));
}
+ public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
+ Future<Object> future = executeOperationAsync(shardManager,
+ new FindPrimary(shardName, true).toSerializable(),
+ datastoreContext.getShardInitializationTimeout());
+
+ return future.transform(new Mapper<Object, ActorSelection>() {
+ @Override
+ public ActorSelection checkedApply(Object response) throws Exception {
+ if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+ PrimaryFound found = PrimaryFound.fromSerializable(response);
+
+ LOG.debug("Primary found {}", found.getPrimaryPath());
+ return actorSystem.actorSelection(found.getPrimaryPath());
+ } else if(response instanceof ActorNotInitialized) {
+ throw new NotInitializedException(
+ String.format("Found primary shard %s but it's not initialized yet. " +
+ "Please try again later", shardName));
+ } else if(response instanceof PrimaryNotFound) {
+ throw new PrimaryNotFoundException(
+ String.format("No primary shard found for %S.", shardName));
+ }
+
+ throw new UnknownMessageException(String.format(
+ "FindPrimary returned unkown response: %s", response));
+ }
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
+ }
+
/**
* Finds a local shard given its shard name and return it's ActorRef
*
*
* @param shardName the name of the local shard that needs to be found
*/
- public Future<ActorRef> findLocalShardAsync( final String shardName, Timeout timeout) {
+ public Future<ActorRef> findLocalShardAsync( final String shardName) {
Future<Object> future = executeOperationAsync(shardManager,
- new FindLocalShard(shardName, true), timeout);
+ new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
return future.map(new Mapper<Object, ActorRef>() {
@Override
*
* @param actor the ActorSelection
* @param message the message to send
+ * @param timeout the operation timeout
* @return a Future containing the eventual result
*/
- public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
+ public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
+ Timeout timeout) {
Preconditions.checkArgument(actor != null, "actor must not be null");
Preconditions.checkArgument(message != null, "message must not be null");
LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
- return ask(actor, message, operationTimeout);
+ return ask(actor, message, timeout);
+ }
+
+ /**
+ * Execute an operation on a remote actor asynchronously.
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ * @return a Future containing the eventual result
+ */
+ public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
+ return executeOperationAsync(actor, message, operationTimeout);
}
/**
getValue().intValue())
.shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue())
.shardHeartbeatIntervalInMillis(props.getShardHearbeatIntervalInMillis().getValue())
+ .shardInitializationTimeout(props.getShardInitializationTimeoutInSeconds().getValue(),
+ TimeUnit.SECONDS)
+ .shardLeaderElectionTimeout(props.getShardLeaderElectionTimeoutInSeconds().getValue(),
+ TimeUnit.SECONDS)
.shardTransactionCommitTimeoutInSeconds(
props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
.shardTransactionCommitQueueCapacity(
getValue().intValue())
.shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue())
.shardHeartbeatIntervalInMillis(props.getShardHearbeatIntervalInMillis().getValue())
+ .shardInitializationTimeout(props.getShardInitializationTimeoutInSeconds().getValue(),
+ TimeUnit.SECONDS)
+ .shardLeaderElectionTimeout(props.getShardLeaderElectionTimeoutInSeconds().getValue(),
+ TimeUnit.SECONDS)
.shardTransactionCommitTimeoutInSeconds(
props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
.shardTransactionCommitQueueCapacity(
description "The maximum allowed capacity for each shard's transaction commit queue.";
}
+ leaf shard-initialization-timeout-in-seconds {
+ default 300; // 5 minutes
+ type non-zero-uint32-type;
+ description "The maximum amount of time to wait for a shard to initialize from persistence
+ on startup before failing an operation (eg transaction create and change
+ listener registration).";
+ }
+
+ leaf shard-leader-election-timeout-in-seconds {
+ default 30;
+ type non-zero-uint32-type;
+ description "The maximum amount of time to wait for a shard to elect a leader before failing
+ an operation (eg transaction create).";
+ }
+
leaf enable-metric-capture {
default false;
type boolean;
doReturn(mockActorSystem).when(actorContext).getActorSystem();
doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
- doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName),
- any(Timeout.class));
+ doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
doReturn(Futures.failed(new RuntimeException("mock"))).
when(actorContext).executeOperationAsync(any(ActorRef.class),
any(Object.class), any(Timeout.class));
final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
shardName, actorContext, mockListener);
+ doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
doReturn(getSystem()).when(actorContext).getActorSystem();
doReturn(getSystem().actorSelection(getRef().path())).
when(actorContext).actorSelection(getRef().path());
doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
- doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName),
- any(Timeout.class));
+ doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
Answer<Future<Object>> answer = new Answer<Future<Object>>() {
@Override
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
+ private final DatastoreContext.Builder datastoreContextBuilder =
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
+
@Test
public void testWriteTransactionWithSingleShard() throws Exception{
System.setProperty("shard.persistent", "true");
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
- // 5. Verify the data in the store
+ // Verify the data in the store
DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
DistributedDataStore dataStore =
setupDistributedDataStore("testReadWriteTransaction", "test-1");
- // 1. Create a read-write Tx
+ // 1. Create a read-write Tx
DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", readWriteTx);
}};
}
+ @Test
+ public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
+ new IntegrationTestKit(getSystem()) {{
+ String testName = "testTransactionWritesWithShardNotInitiallyReady";
+ String shardName = "test-1";
+
+ // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
+ // initialized until we create and submit the write the Tx.
+ String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
+ CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
+ InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
+
+ DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
+
+ // Create the write Tx
+
+ final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ assertNotNull("newReadWriteTransaction returned null", writeTx);
+
+ // Do some modification operations and ready the Tx on a separate thread.
+
+ final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
+ TestModel.ID_QNAME, 1).build();
+
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
+ final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+ final CountDownLatch txReady = new CountDownLatch(1);
+ Thread txThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ writeTx.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
+ TestModel.OUTER_LIST_QNAME).build());
+
+ writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
+ TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+ writeTx.delete(listEntryPath);
+
+ txCohort.set(writeTx.ready());
+ } catch(Exception e) {
+ caughtEx.set(e);
+ return;
+ } finally {
+ txReady.countDown();
+ }
+ }
+ };
+
+ txThread.start();
+
+ // Wait for the Tx operations to complete.
+
+ boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
+ if(caughtEx.get() != null) {
+ throw caughtEx.get();
+ }
+
+ assertEquals("Tx ready", true, done);
+
+ // At this point the Tx operations should be waiting for the shard to initialize so
+ // trigger the latch to let the shard recovery to continue.
+
+ blockRecoveryLatch.countDown();
+
+ // Wait for the Tx commit to complete.
+
+ assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS));
+ txCohort.get().preCommit().get(5, TimeUnit.SECONDS);
+ txCohort.get().commit().get(5, TimeUnit.SECONDS);
+
+ // Verify the data in the store
+
+ DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+
+ Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
+ get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+
+ optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+
+ optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", false, optional.isPresent());
+
+ cleanup(dataStore);
+ }};
+ }
+
+ @Test
+ public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
+ new IntegrationTestKit(getSystem()) {{
+ String testName = "testTransactionReadsWithShardNotInitiallyReady";
+ String shardName = "test-1";
+
+ // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
+ // initialized until we create the Tx.
+ String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
+ CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
+ InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
+
+ DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
+
+ // Create the read-write Tx
+
+ final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
+ assertNotNull("newReadWriteTransaction returned null", readWriteTx);
+
+ // Do some reads on the Tx on a separate thread.
+
+ final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
+ new AtomicReference<>();
+ final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
+ txReadFuture = new AtomicReference<>();
+ final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+ final CountDownLatch txReadsDone = new CountDownLatch(1);
+ Thread txThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ readWriteTx.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
+
+ txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
+ } catch(Exception e) {
+ caughtEx.set(e);
+ return;
+ } finally {
+ txReadsDone.countDown();
+ }
+ }
+ };
+
+ txThread.start();
+
+ // Wait for the Tx operations to complete.
+
+ boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
+ if(caughtEx.get() != null) {
+ throw caughtEx.get();
+ }
+
+ assertEquals("Tx reads done", true, done);
+
+ // At this point the Tx operations should be waiting for the shard to initialize so
+ // trigger the latch to let the shard recovery to continue.
+
+ blockRecoveryLatch.countDown();
+
+ // Wait for the reads to complete and verify.
+
+ assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
+ assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
+
+ readWriteTx.close();
+
+ cleanup(dataStore);
+ }};
+ }
+
+ @Test(expected=NotInitializedException.class)
+ public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
+ new IntegrationTestKit(getSystem()) {{
+ String testName = "testTransactionCommitFailureWithShardNotInitialized";
+ String shardName = "test-1";
+
+ // Set the shard initialization timeout low for the test.
+
+ datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
+
+ // Setup the InMemoryJournal to block shard recovery indefinitely.
+
+ String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
+ CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
+ InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
+
+ DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
+
+ // Create the write Tx
+
+ final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ assertNotNull("newReadWriteTransaction returned null", writeTx);
+
+ // Do some modifications and ready the Tx on a separate thread.
+
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
+ final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+ final CountDownLatch txReady = new CountDownLatch(1);
+ Thread txThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ writeTx.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ txCohort.set(writeTx.ready());
+ } catch(Exception e) {
+ caughtEx.set(e);
+ return;
+ } finally {
+ txReady.countDown();
+ }
+ }
+ };
+
+ txThread.start();
+
+ // Wait for the Tx operations to complete.
+
+ boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
+ if(caughtEx.get() != null) {
+ throw caughtEx.get();
+ }
+
+ assertEquals("Tx ready", true, done);
+
+ // Wait for the commit to complete. Since the shard never initialized, the Tx should
+ // have timed out and throw an appropriate exception cause.
+
+ try {
+ txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
+ } catch(ExecutionException e) {
+ throw e.getCause();
+ } finally {
+ blockRecoveryLatch.countDown();
+ cleanup(dataStore);
+ }
+ }};
+ }
+
+ @Test(expected=NotInitializedException.class)
+ public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
+ new IntegrationTestKit(getSystem()) {{
+ String testName = "testTransactionReadFailureWithShardNotInitialized";
+ String shardName = "test-1";
+
+ // Set the shard initialization timeout low for the test.
+
+ datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
+
+ // Setup the InMemoryJournal to block shard recovery indefinitely.
+
+ String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
+ CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
+ InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
+
+ DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
+
+ // Create the read-write Tx
+
+ final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
+ assertNotNull("newReadWriteTransaction returned null", readWriteTx);
+
+ // Do a read on the Tx on a separate thread.
+
+ final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
+ txReadFuture = new AtomicReference<>();
+ final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+ final CountDownLatch txReadDone = new CountDownLatch(1);
+ Thread txThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ readWriteTx.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
+
+ readWriteTx.close();
+ } catch(Exception e) {
+ caughtEx.set(e);
+ return;
+ } finally {
+ txReadDone.countDown();
+ }
+ }
+ };
+
+ txThread.start();
+
+ // Wait for the Tx operations to complete.
+
+ boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
+ if(caughtEx.get() != null) {
+ throw caughtEx.get();
+ }
+
+ assertEquals("Tx read done", true, done);
+
+ // Wait for the read to complete. Since the shard never initialized, the Tx should
+ // have timed out and throw an appropriate exception cause.
+
+ try {
+ txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
+ } catch(ReadFailedException e) {
+ throw e.getCause();
+ } finally {
+ blockRecoveryLatch.countDown();
+ cleanup(dataStore);
+ }
+ }};
+ }
+
+ @Test(expected=NoShardLeaderException.class)
+ public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
+ new IntegrationTestKit(getSystem()) {{
+ String testName = "testTransactionCommitFailureWithNoShardLeader";
+ String shardName = "test-1";
+
+ // We don't want the shard to become the leader so prevent shard election from completing
+ // by setting the election timeout, which is based on the heartbeat interval, really high.
+
+ datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
+
+ // Set the leader election timeout low for the test.
+
+ datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
+
+ DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
+
+ // Create the write Tx.
+
+ final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ assertNotNull("newReadWriteTransaction returned null", writeTx);
+
+ // Do some modifications and ready the Tx on a separate thread.
+
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
+ final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+ final CountDownLatch txReady = new CountDownLatch(1);
+ Thread txThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ writeTx.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ txCohort.set(writeTx.ready());
+ } catch(Exception e) {
+ caughtEx.set(e);
+ return;
+ } finally {
+ txReady.countDown();
+ }
+ }
+ };
+
+ txThread.start();
+
+ // Wait for the Tx operations to complete.
+
+ boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
+ if(caughtEx.get() != null) {
+ throw caughtEx.get();
+ }
+
+ assertEquals("Tx ready", true, done);
+
+ // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
+ // should have timed out and throw an appropriate exception cause.
+
+ try {
+ txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
+ } catch(ExecutionException e) {
+ throw e.getCause();
+ } finally {
+ cleanup(dataStore);
+ }
+ }};
+ }
+
@Test
public void testTransactionAbort() throws Exception{
System.setProperty("shard.persistent", "true");
}
DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
+ return setupDistributedDataStore(typeName, true, shardNames);
+ }
+
+ DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
+ String... shardNames) {
MockClusterWrapper cluster = new MockClusterWrapper();
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
ShardStrategyFactory.setConfiguration(config);
- DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
+ DatastoreContext datastoreContext = datastoreContextBuilder.build();
DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
config, datastoreContext);
SchemaContext schemaContext = SchemaContextHelper.full();
dataStore.onGlobalContextUpdated(schemaContext);
- for(String shardName: shardNames) {
- ActorRef shard = null;
- for(int i = 0; i < 20 * 5 && shard == null; i++) {
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
- if(shardReply.isPresent()) {
- shard = shardReply.get();
+ if(waitUntilLeader) {
+ for(String shardName: shardNames) {
+ ActorRef shard = null;
+ for(int i = 0; i < 20 * 5 && shard == null; i++) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
+ if(shardReply.isPresent()) {
+ shard = shardReply.get();
+ }
}
- }
- assertNotNull("Shard was not created", shard);
+ assertNotNull("Shard was not created", shard);
- System.out.println("!!!!!!shard: "+shard.path().toString());
- waitUntilLeader(shard);
+ waitUntilLeader(shard);
+ }
}
return dataStore;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.Futures;
+import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.CheckedFuture;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.times;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
@SuppressWarnings("resource")
-public class TransactionProxyTest extends AbstractActorTest {
+public class TransactionProxyTest {
@SuppressWarnings("serial")
static class TestException extends RuntimeException {
CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
}
+ private static ActorSystem system;
+
private final Configuration configuration = new MockConfiguration();
@Mock
String memberName = "mock-member";
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+
+ Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
+ put("akka.actor.default-dispatcher.type",
+ "akka.testkit.CallingThreadDispatcherConfigurator").build()).
+ withFallback(ConfigFactory.load());
+ system = ActorSystem.create("test", config);
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
@Before
public void setUp(){
MockitoAnnotations.initMocks(this);
schemaContext = TestModel.createTestContext();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
+
doReturn(getSystem()).when(mockActorContext).getActorSystem();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
+ doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
+ doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
ShardStrategyFactory.setConfiguration(configuration);
}
+ private ActorSystem getSystem() {
+ return system;
+ }
+
private CreateTransaction eqCreateTransaction(final String memberName,
final TransactionType type) {
ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
- doReturn(Optional.of(actorSystem.actorSelection(actorRef.path()))).
- when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
- doReturn(createTransactionReply(actorRef)).when(mockActorContext).
- executeOperation(eq(actorSystem.actorSelection(actorRef.path())),
+ doReturn(Futures.successful(createTransactionReply(actorRef))).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString());
future.checkedGet(5, TimeUnit.SECONDS);
fail("Expected ReadFailedException");
} catch(ReadFailedException e) {
+ e.printStackTrace();
throw e.getCause();
}
}
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
if (exToThrow instanceof PrimaryNotFoundException) {
- doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
+ doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
} else {
- doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
- when(mockActorContext).findPrimaryShard(anyString());
+ doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(anyString());
}
- doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any());
+
+ doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
transactionProxy.read(TestModel.TEST_PATH);
}
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidCreateTransactionReply() throws Throwable {
+ ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
+ actorSelection(actorRef.path().toString());
+
+ doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
+ eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+
+ propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
+ }
+
@Test
public void testExists() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ verifyCohortFutures(proxy, TestException.class);
+
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
-
- verifyCohortFutures(proxy, TestException.class);
}
@SuppressWarnings("unchecked")
@Test
public void testReadyWithInitialCreateTransactionFailure() throws Exception {
- doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
-// doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
-// anyString(), any());
+ doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
+ mockActorContext).findPrimaryShardAsync(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
- doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))).
- when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorPath)
.build();
- doReturn(createTransactionReply).when(mockActorContext).
- executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())),
+ doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, READ_ONLY));
doReturn(true).when(mockActorContext).isLocalPath(actorPath);
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
- doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))).
- when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorPath)
.build();
- doReturn(createTransactionReply).when(mockActorContext).
- executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())),
+ doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, WRITE_ONLY));
doReturn(true).when(mockActorContext).isLocalPath(actorPath);
private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
+ private static final Map<String, CountDownLatch> blockReadMessagesLatches = new ConcurrentHashMap<>();
+
public static void addEntry(String persistenceId, long sequenceNr, Object data) {
Map<Long, Object> journal = journals.get(persistenceId);
if(journal == null) {
deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
}
+ public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
+ blockReadMessagesLatches.put(persistenceId, latch);
+ }
+
@Override
public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
return Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
+ CountDownLatch blockLatch = blockReadMessagesLatches.remove(persistenceId);
+ if(blockLatch != null) {
+ Uninterruptibles.awaitUninterruptibly(blockLatch);
+ }
+
Map<Long, Object> journal = journals.get(persistenceId);
if(journal == null) {
return null;