From 5c5c980e564d2b5f6cd26821ffd26997f59af260 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 1 Jul 2015 16:28:43 -0400 Subject: [PATCH] CDS: Retry remote front-end transactions on AskTimeoutException With the front-end PrimaryShardInfo cache, if the cached primary/leader shard is remote and unavailable, the RemoteTransactionContextSupport will fail with an AskTimeoutException when it tries to send the CreateTransaction message. Since it can take at least 1 election timeout period to re-elect a new leader, I changed RemoteTransactionContextSupport to also retry on AskTimeoutException (it already retries on NoShardLeaderException). However instead of re-sending the CreateTransaction message, as it did before, it now re-sends the FindPrimary message to get a new primary shard actor. I also modified how RemoteTransactionContextSupport retries. It will now retry for a total period of 2 times the shard election timeout which should be ample time for a re-election to occur. If no leader is found then the txn will fail. I also added a ShardLeaderNotRespondingException which the RemoteTransactionContextSupport will throw if it ends up with an AskTimeoutException after the tx creation timeout period. This shouldn't occur normally as, with the retries, it should get a NoShardLeaderException even if the initial error was AskTimeoutException. But it's possible to end up with an AskTimeoutException, eg if the system is overloaded and the election timeout is delayed. During testing, I noticed that if you take down the 2 followers and try a transaction, it fails with an AskTimeoutEx instead of NoShardLeaderException as one would expect. This is b/c the leader changes to an isolated leader. So I changed the Shardanager to return NoShardLeaderException if the state is IsolatedLeader. Change-Id: I3efd3f841cf41b7738aedb694fa18b44851b3074 Signed-off-by: Tom Pantelis (cherry picked from commit 845609758d1739ee07d5ca92f5448e18a8933861) --- .../datastore/ConcurrentDOMDataBroker.java | 3 +- .../RemoteTransactionContextSupport.java | 113 +++++++++++--- .../controller/cluster/datastore/Shard.java | 39 +++-- .../cluster/datastore/ShardManager.java | 19 ++- .../exceptions/NoShardLeaderException.java | 9 +- .../ShardLeaderNotRespondingException.java | 21 +++ .../AbstractTransactionProxyTest.java | 15 +- .../DistributedDataStoreIntegrationTest.java | 15 +- ...butedDataStoreRemotingIntegrationTest.java | 142 +++++++++++++++++- .../cluster/datastore/ShardManagerTest.java | 19 ++- .../datastore/TransactionChainProxyTest.java | 4 +- .../datastore/TransactionProxyTest.java | 24 ++- .../src/test/resources/application.conf | 54 +++++++ .../module-shards-member1-and-2-and-3.conf | 28 ++++ 14 files changed, 431 insertions(+), 74 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/ShardLeaderNotRespondingException.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2-and-3.conf diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java index c16a849335..582f25859b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.Executor; import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; @@ -197,7 +198,7 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker { LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t); final Exception e; - if(t instanceof NoShardLeaderException) { + if(t instanceof NoShardLeaderException || t instanceof ShardLeaderNotRespondingException) { e = new DataStoreUnavailableException(t.getMessage(), t); } else if (t instanceof Exception) { e = (Exception)t; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 59205692d1..88c797a990 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -10,13 +10,17 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; +import akka.pattern.AskTimeoutException; +import akka.util.Timeout; import com.google.common.base.Preconditions; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,10 +40,8 @@ import scala.concurrent.duration.FiniteDuration; final class RemoteTransactionContextSupport { private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class); - /** - * Time interval in between transaction create retries. - */ - private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS); + private static final long CREATE_TX_TRY_INTERVAL_IN_MS = 1000; + private static final long MAX_CREATE_TX_MSG_TIMEOUT_IN_MS = 5000; private final TransactionProxy parent; private final String shardName; @@ -48,7 +50,13 @@ final class RemoteTransactionContextSupport { * The target primary shard. */ private volatile ActorSelection primaryShard; - private volatile int createTxTries; + + /** + * The total timeout for creating a tx on the primary shard. + */ + private volatile long totalCreateTxTimeout; + + private final Timeout createTxMessageTimeout; private final TransactionContextWrapper transactionContextWrapper; @@ -57,9 +65,18 @@ final class RemoteTransactionContextSupport { this.parent = Preconditions.checkNotNull(parent); this.shardName = shardName; this.transactionContextWrapper = transactionContextWrapper; - createTxTries = (int) (parent.getActorContext().getDatastoreContext(). - getShardLeaderElectionTimeout().duration().toMillis() / - CREATE_TX_TRY_INTERVAL.toMillis()); + + // For the total create tx timeout, use 2 times the election timeout. This should be enough time for + // a leader re-election to occur if we happen to hit it in transition. + totalCreateTxTimeout = parent.getActorContext().getDatastoreContext().getShardRaftConfig() + .getElectionTimeOutInterval().toMillis() * 2; + + // We'll use the operationTimeout for the the create Tx message timeout so it can be set appropriately + // for unit tests but cap it at MAX_CREATE_TX_MSG_TIMEOUT_IN_MS. The operationTimeout could be set + // larger than the totalCreateTxTimeout in production which we don't want. + long operationTimeout = parent.getActorContext().getOperationTimeout().duration().toMillis(); + createTxMessageTimeout = new Timeout(Math.min(operationTimeout, MAX_CREATE_TX_MSG_TIMEOUT_IN_MS), + TimeUnit.MILLISECONDS); } String getShardName() { @@ -74,10 +91,6 @@ final class RemoteTransactionContextSupport { return parent.getActorContext(); } - private OperationLimiter getOperationLimiter() { - return transactionContextWrapper.getLimiter(); - } - private TransactionIdentifier getIdentifier() { return parent.getIdentifier(); } @@ -113,7 +126,8 @@ final class RemoteTransactionContextSupport { Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(), getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable(); - Future createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage); + Future createTxFuture = getActorContext().executeOperationAsync(primaryShard, + serializedCreateMessage, createTxMessageTimeout); createTxFuture.onComplete(new OnComplete() { @Override @@ -123,21 +137,60 @@ final class RemoteTransactionContextSupport { }, getActorContext().getClientDispatcher()); } + private void tryFindPrimaryShard() { + LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName); + + this.primaryShard = null; + Future findPrimaryFuture = getActorContext().findPrimaryShardAsync(shardName); + findPrimaryFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) { + onFindPrimaryShardComplete(failure, primaryShardInfo); + } + }, getActorContext().getClientDispatcher()); + } + + private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) { + if (failure == null) { + this.primaryShard = primaryShardInfo.getPrimaryShardActor(); + tryCreateTransaction(); + } else { + LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure); + + onCreateTransactionComplete(failure, null); + } + } + private void onCreateTransactionComplete(Throwable failure, Object response) { - if(failure instanceof NoShardLeaderException) { - // There's no leader for the shard yet - schedule and try again, unless we're out - // of retries. Note: createTxTries is volatile as it may be written by different - // threads however not concurrently, therefore decrementing it non-atomically here - // is ok. - if(--createTxTries > 0) { - LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry", - getIdentifier(), shardName); - - getActorContext().getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL, + // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or + // the cached remote leader actor is no longer available. + boolean retryCreateTransaction = this.primaryShard != null && + (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException); + if(retryCreateTransaction) { + // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may + // be written by different threads however not concurrently, therefore decrementing it + // non-atomically here is ok. + if(totalCreateTxTimeout > 0) { + long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS; + if(failure instanceof AskTimeoutException) { + // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed + // out, subtract it from the total timeout. Also since the createTxMessageTimeout period + // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate). + totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis(); + scheduleInterval = 10; + } + + totalCreateTxTimeout -= scheduleInterval; + + LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms", + getIdentifier(), shardName, failure, scheduleInterval); + + getActorContext().getActorSystem().scheduler().scheduleOnce( + FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS), new Runnable() { @Override public void run() { - tryCreateTransaction(); + tryFindPrimaryShard(); } }, getActorContext().getClientDispatcher()); return; @@ -160,7 +213,17 @@ final class RemoteTransactionContextSupport { if(failure != null) { LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure); - localTransactionContext = new NoOpTransactionContext(failure, getIdentifier()); + Throwable resultingEx = failure; + if(failure instanceof AskTimeoutException) { + resultingEx = new ShardLeaderNotRespondingException(String.format( + "Could not create a %s transaction on shard %s. The shard leader isn't responding.", + parent.getType(), shardName), failure); + } else if(!(failure instanceof NoShardLeaderException)) { + resultingEx = new Exception(String.format( + "Error creating %s transaction on shard %s", parent.getType(), shardName), failure); + } + + localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier()); } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) { localTransactionContext = createValidTransactionContext( CreateTransactionReply.fromSerializable(response)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 65be23dfd0..ce3cae481e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -62,6 +62,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; +import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; @@ -410,13 +411,10 @@ public class Shard extends RaftActor { commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } - private void noLeaderError(Object message) { + private void noLeaderError(String errMessage, Object message) { // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make // it more resilient in case we're in the process of electing a new leader. - getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( - "Could not find the leader for shard %s. This typically happens" + - " when the system is coming up or recovering and a leader is being elected. Try again" + - " later.", persistenceId()))), getSelf()); + getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf()); } private void handleBatchedModifications(BatchedModifications batched) { @@ -433,6 +431,8 @@ public class Shard extends RaftActor { // window where it could have a stale leader during leadership transitions. // if(isLeader()) { + failIfIsolatedLeader(getSender()); + try { commitCoordinator.handleBatchedModifications(batched, getSender(), this); } catch (Exception e) { @@ -449,13 +449,27 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); leader.forward(batched, getContext()); } else { - noLeaderError(batched); + noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched); } } } + private boolean failIfIsolatedLeader(ActorRef sender) { + if(getRaftState() == RaftState.IsolatedLeader) { + sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( + "Shard %s was the leader but has lost contact with all of its followers. Either all" + + " other follower nodes are down or this node is isolated by a network partition.", + persistenceId()))), getSelf()); + return true; + } + + return false; + } + private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { if (isLeader()) { + failIfIsolatedLeader(getSender()); + try { commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); } catch (Exception e) { @@ -470,7 +484,7 @@ public class Shard extends RaftActor { message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(message, getContext()); } else { - noLeaderError(message); + noLeaderError("Could not commit transaction " + message.getTransactionID(), message); } } } @@ -520,10 +534,8 @@ public class Shard extends RaftActor { } else if (getLeader() != null) { getLeader().forward(message, getContext()); } else { - getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( - "Could not find leader for shard %s so transaction cannot be created. This typically happens" + - " when the system is coming up or recovering and a leader is being elected. Try again" + - " later.", persistenceId()))), getSelf()); + getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException( + "Could not create a shard transaction", persistenceId())), getSelf()); } } @@ -541,6 +553,11 @@ public class Shard extends RaftActor { private void createTransaction(CreateTransaction createTransaction) { try { + if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY && + failIfIsolatedLeader(getSender())) { + return; + } + ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(), createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), createTransaction.getVersion()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 4f3c521f44..a46bb3dae8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -65,6 +66,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; /** * The ShardManager has the following jobs, @@ -365,8 +367,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName()); + FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration(); + if(shardInformation.isShardInitialized()) { + // If the shard is already initialized then we'll wait enough time for the shard to + // elect a leader, ie 2 times the election timeout. + timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig() + .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS); + } + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( - datastoreContext.getShardInitializationTimeout().duration(), getSelf(), + timeout, getSelf(), new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), getContext().dispatcher(), getSelf()); @@ -389,9 +399,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { - return new NoShardLeaderException(String.format( - "Could not find a leader for shard %s. This typically happens when the system is coming up or " + - "recovering and a leader is being elected. Try again later.", shardId)); + return new NoShardLeaderException(null, shardId.toString()); } private NotInitializedException createNotInitializedException(ShardIdentifier shardId) { @@ -733,7 +741,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } boolean isShardReadyWithLeaderId() { - return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null); + return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) && + (isLeader() || peerAddresses.get(leaderId) != null); } boolean isShardInitialized() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NoShardLeaderException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NoShardLeaderException.java index b205b0fe78..8365177d1b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NoShardLeaderException.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NoShardLeaderException.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore.exceptions; +import com.google.common.base.Strings; + /** * Exception indicating a shard has no current leader. * @@ -15,7 +17,12 @@ package org.opendaylight.controller.cluster.datastore.exceptions; public class NoShardLeaderException extends RuntimeException { private static final long serialVersionUID = 1L; - public NoShardLeaderException(String message){ + public NoShardLeaderException(String message) { super(message); } + + public NoShardLeaderException(String message, String shardName) { + super(String.format("%sShard %s currently has no leader. Try again later.", + (Strings.isNullOrEmpty(message) ? "" : message + ". "), shardName)); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/ShardLeaderNotRespondingException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/ShardLeaderNotRespondingException.java new file mode 100644 index 0000000000..5dfac1e69a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/ShardLeaderNotRespondingException.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.exceptions; + +/** + * Exception indicating a shard leader is not responding to messages. + * + * @author Thomas Pantelis + */ +public class ShardLeaderNotRespondingException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public ShardLeaderNotRespondingException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index fcc18ed702..37ca9533b6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -23,6 +23,7 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; import akka.testkit.JavaTestKit; +import akka.util.Timeout; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.base.Objects; @@ -131,7 +132,9 @@ public abstract class AbstractTransactionProxyTest { protected final String memberName = "mock-member"; - protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2); + private final int operationTimeoutInSeconds = 2; + protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder() + .operationTimeoutInSeconds(operationTimeoutInSeconds); @BeforeClass public static void setUpClass() throws IOException { @@ -159,6 +162,7 @@ public abstract class AbstractTransactionProxyTest { doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); doReturn(memberName).when(mockActorContext).getCurrentMemberName(); doReturn(schemaContext).when(mockActorContext).getSchemaContext(); + doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); @@ -380,7 +384,7 @@ public abstract class AbstractTransactionProxyTest { doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(prefix, type)); + eqCreateTransaction(prefix, type), any(Timeout.class)); } return txActorRef; @@ -404,7 +408,12 @@ public abstract class AbstractTransactionProxyTest { future.checkedGet(5, TimeUnit.SECONDS); fail("Expected ReadFailedException"); } catch(ReadFailedException e) { - throw e.getCause(); + assertNotNull("Expected a cause", e.getCause()); + if(e.getCause().getCause() != null) { + throw e.getCause().getCause(); + } else { + throw e.getCause(); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 2319c5be38..9dd882f2de 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -34,6 +35,8 @@ import org.junit.Test; import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; @@ -614,13 +617,17 @@ public class DistributedDataStoreIntegrationTest { // by setting the election timeout, which is based on the heartbeat interval, really high. datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000); - datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); - // Set the leader election timeout low for the test. + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(), + new FindLocalShard(shardName, true)); + assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); - datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS); + // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly. - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1); + dataStore.onDatastoreContextUpdated(datastoreContextBuilder.build()); // Create the write Tx. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 672d8f3b99..90600f9529 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -19,17 +20,22 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; +import akka.pattern.AskTimeoutException; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.math.BigInteger; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -75,10 +81,12 @@ public class DistributedDataStoreRemotingIntegrationTest { private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559"); - private static final String MODULE_SHARDS_CONFIG = "module-shards-member1-and-2.conf"; + private static final String MODULE_SHARDS_CONFIG_2 = "module-shards-member1-and-2.conf"; + private static final String MODULE_SHARDS_CONFIG_3 = "module-shards-member1-and-2-and-3.conf"; private ActorSystem leaderSystem; private ActorSystem followerSystem; + private ActorSystem follower2System; private final DatastoreContext.Builder leaderDatastoreContextBuilder = DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1); @@ -92,27 +100,35 @@ public class DistributedDataStoreRemotingIntegrationTest { private IntegrationTestKit leaderTestKit; @Before - public void setUpClass() { + public void setUp() { leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); Cluster.get(followerSystem).join(MEMBER_1_ADDRESS); + + follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3")); + Cluster.get(follower2System).join(MEMBER_1_ADDRESS); } @After - public void tearDownClass() { + public void tearDown() { JavaTestKit.shutdownActorSystem(leaderSystem); JavaTestKit.shutdownActorSystem(followerSystem); + JavaTestKit.shutdownActorSystem(follower2System); } private void initDatastores(String type) { + initDatastores(type, MODULE_SHARDS_CONFIG_2); + } + + private void initDatastores(String type, String moduleShardsConfig) { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); - followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES); + leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES); - leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES); + followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); + followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES); } @@ -478,8 +494,7 @@ public class DistributedDataStoreRemotingIntegrationTest { DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder(). shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder); - DistributedDataStore newMember1Datastore = newMember1TestKit. - setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG, false, SHARD_NAMES); + newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_2, false, SHARD_NAMES); followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES); @@ -521,4 +536,115 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car); } + + @Test(expected=NoShardLeaderException.class) + public void testTransactionWithIsolatedLeader() throws Throwable { + leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(300); + String testName = "testTransactionWithIsolatedLeader"; + initDatastores(testName); + + JavaTestKit.shutdownActorSystem(followerSystem, null, true); + + Uninterruptibles.sleepUninterruptibly(leaderDistributedDataStore.getActorContext().getDatastoreContext() + .getShardRaftConfig().getElectionTimeOutInterval().toMillis() * 3, TimeUnit.MILLISECONDS); + + DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + try { + followerTestKit.doCommit(writeTx.ready()); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test(expected=AskTimeoutException.class) + public void testTransactionWithShardLeaderNotResponding() throws Throwable { + followerDatastoreContextBuilder.shardElectionTimeoutFactor(30); + initDatastores("testTransactionWithShardLeaderNotResponding"); + + // Do an initial read to get the primary shard info cached. + + DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); + readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + + // Shutdown the leader and try to create a new tx. + + JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + + followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1); + followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build()); + + DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + + rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + try { + followerTestKit.doCommit(rwTx.ready()); + } catch (ExecutionException e) { + assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(), + e.getCause() instanceof ShardLeaderNotRespondingException); + assertNotNull("Expected a nested cause", e.getCause().getCause()); + throw e.getCause().getCause(); + } + } + + @Test(expected=NoShardLeaderException.class) + public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable { + initDatastores("testTransactionWithCreateTxFailureDueToNoLeader"); + + // Do an initial read to get the primary shard info cached. + + DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); + readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + + // Shutdown the leader and try to create a new tx. + + JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1); + followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build()); + + DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + + rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + try { + followerTestKit.doCommit(rwTx.ready()); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test + public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception { + followerDatastoreContextBuilder.shardElectionTimeoutFactor(30); + String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx"; + initDatastores(testName, MODULE_SHARDS_CONFIG_3); + + DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); + IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder); + follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_3, false, SHARD_NAMES); + + // Do an initial read to get the primary shard info cached. + + DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); + readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + + // Shutdown the leader and try to create a new tx. + + JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + + followerDatastoreContextBuilder.operationTimeoutInMillis(500); + followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build()); + + DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + + rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + followerTestKit.doCommit(rwTx.ready()); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index b5cceea851..4362047a32 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -80,7 +80,8 @@ public class ShardManagerTest extends AbstractActorTest { private static TestActorRef mockShardActor; private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). - dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); + dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6); private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { String name = new ShardIdentifier(shardName, memberName,"config").toString(); @@ -342,6 +343,22 @@ public class ShardManagerTest extends AbstractActorTest { }}; } + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + null, RaftState.IsolatedLeader.name()), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); + }}; + } + @Test public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { new JavaTestKit(getSystem()) {{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index 4ea81f44e3..03e4a16e67 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; @@ -23,6 +24,7 @@ import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE; import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY; import akka.actor.ActorRef; +import akka.util.Timeout; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -245,7 +247,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get()); verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())), - eqCreateTransaction(tx2MemberName, READ_WRITE)); + eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class)); } @Test(expected=IllegalStateException.class) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index b4cbbd5d3b..4a9f4246e5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -19,6 +19,7 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; +import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.collect.Sets; import com.google.common.util.concurrent.CheckedFuture; @@ -138,7 +139,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), any()); + any(ActorSelection.class), any(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); @@ -216,7 +217,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync( - eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY)); + eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY), + any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); @@ -334,7 +336,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { Promise createTxPromise = akka.dispatch.Futures.promise(); doReturn(createTxPromise).when(mockActorContext).executeOperationAsync( eq(getSystem().actorSelection(actorRef.path())), - eqCreateTransaction(memberName, READ_WRITE)); + eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class)); doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); @@ -838,13 +840,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). - setTransactionId("txn-1").setTransactionActorPath(actorPath). - setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); - doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, READ_WRITE)); + doReturn(incompleteFuture()).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class)); doReturn(true).when(mockActorContext).isPathLocal(actorPath); @@ -891,7 +890,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, READ_WRITE)); + eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class)); doReturn(true).when(mockActorContext).isPathLocal(anyString()); @@ -1311,9 +1310,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(1); - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), any(ReadyTransaction.class)); - transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); transactionProxy.ready(); @@ -1574,7 +1570,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, TransactionType.READ_ONLY)); + eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class)); doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build())); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index 8f5550fe00..5357e6900a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -140,3 +140,57 @@ Member2 { } } } + +Member3 { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + + akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + loglevel = "DEBUG" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer" + } + + serialization-bindings { + "com.google.protobuf.Message" = proto + "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal + } + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2557 + } + } + + cluster { + auto-down-unreachable-after = 100s + + roles = [ + "member-3" + ] + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2-and-3.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2-and-3.conf new file mode 100644 index 0000000000..481e27d820 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2-and-3.conf @@ -0,0 +1,28 @@ +module-shards = [ + { + name = "people" + shards = [ + { + name="people" + replicas = [ + "member-1", + "member-2", + "member-3" + ] + } + ] + }, + { + name = "cars" + shards = [ + { + name="cars" + replicas = [ + "member-1", + "member-2", + "member-3" + ] + } + ] + } +] -- 2.36.6