import java.util.concurrent.Executor;
import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
final Exception e;
- if(t instanceof NoShardLeaderException) {
+ if(t instanceof NoShardLeaderException || t instanceof ShardLeaderNotRespondingException) {
e = new DataStoreUnavailableException(t.getMessage(), t);
} else if (t instanceof Exception) {
e = (Exception)t;
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
+import akka.pattern.AskTimeoutException;
+import akka.util.Timeout;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class RemoteTransactionContextSupport {
private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class);
- /**
- * Time interval in between transaction create retries.
- */
- private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS);
+ private static final long CREATE_TX_TRY_INTERVAL_IN_MS = 1000;
+ private static final long MAX_CREATE_TX_MSG_TIMEOUT_IN_MS = 5000;
private final TransactionProxy parent;
private final String shardName;
* The target primary shard.
*/
private volatile ActorSelection primaryShard;
- private volatile int createTxTries;
+
+ /**
+ * The total timeout for creating a tx on the primary shard.
+ */
+ private volatile long totalCreateTxTimeout;
+
+ private final Timeout createTxMessageTimeout;
private final TransactionContextWrapper transactionContextWrapper;
this.parent = Preconditions.checkNotNull(parent);
this.shardName = shardName;
this.transactionContextWrapper = transactionContextWrapper;
- createTxTries = (int) (parent.getActorContext().getDatastoreContext().
- getShardLeaderElectionTimeout().duration().toMillis() /
- CREATE_TX_TRY_INTERVAL.toMillis());
+
+ // For the total create tx timeout, use 2 times the election timeout. This should be enough time for
+ // a leader re-election to occur if we happen to hit it in transition.
+ totalCreateTxTimeout = parent.getActorContext().getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().toMillis() * 2;
+
+ // We'll use the operationTimeout for the the create Tx message timeout so it can be set appropriately
+ // for unit tests but cap it at MAX_CREATE_TX_MSG_TIMEOUT_IN_MS. The operationTimeout could be set
+ // larger than the totalCreateTxTimeout in production which we don't want.
+ long operationTimeout = parent.getActorContext().getOperationTimeout().duration().toMillis();
+ createTxMessageTimeout = new Timeout(Math.min(operationTimeout, MAX_CREATE_TX_MSG_TIMEOUT_IN_MS),
+ TimeUnit.MILLISECONDS);
}
String getShardName() {
return parent.getActorContext();
}
- private OperationLimiter getOperationLimiter() {
- return transactionContextWrapper.getLimiter();
- }
-
private TransactionIdentifier getIdentifier() {
return parent.getIdentifier();
}
Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable();
- Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage);
+ Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard,
+ serializedCreateMessage, createTxMessageTimeout);
createTxFuture.onComplete(new OnComplete<Object>() {
@Override
}, getActorContext().getClientDispatcher());
}
+ private void tryFindPrimaryShard() {
+ LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName);
+
+ this.primaryShard = null;
+ Future<PrimaryShardInfo> findPrimaryFuture = getActorContext().findPrimaryShardAsync(shardName);
+ findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
+ @Override
+ public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
+ onFindPrimaryShardComplete(failure, primaryShardInfo);
+ }
+ }, getActorContext().getClientDispatcher());
+ }
+
+ private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
+ if (failure == null) {
+ this.primaryShard = primaryShardInfo.getPrimaryShardActor();
+ tryCreateTransaction();
+ } else {
+ LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure);
+
+ onCreateTransactionComplete(failure, null);
+ }
+ }
+
private void onCreateTransactionComplete(Throwable failure, Object response) {
- if(failure instanceof NoShardLeaderException) {
- // There's no leader for the shard yet - schedule and try again, unless we're out
- // of retries. Note: createTxTries is volatile as it may be written by different
- // threads however not concurrently, therefore decrementing it non-atomically here
- // is ok.
- if(--createTxTries > 0) {
- LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
- getIdentifier(), shardName);
-
- getActorContext().getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
+ // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or
+ // the cached remote leader actor is no longer available.
+ boolean retryCreateTransaction = this.primaryShard != null &&
+ (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
+ if(retryCreateTransaction) {
+ // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
+ // be written by different threads however not concurrently, therefore decrementing it
+ // non-atomically here is ok.
+ if(totalCreateTxTimeout > 0) {
+ long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS;
+ if(failure instanceof AskTimeoutException) {
+ // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed
+ // out, subtract it from the total timeout. Also since the createTxMessageTimeout period
+ // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate).
+ totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis();
+ scheduleInterval = 10;
+ }
+
+ totalCreateTxTimeout -= scheduleInterval;
+
+ LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms",
+ getIdentifier(), shardName, failure, scheduleInterval);
+
+ getActorContext().getActorSystem().scheduler().scheduleOnce(
+ FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
new Runnable() {
@Override
public void run() {
- tryCreateTransaction();
+ tryFindPrimaryShard();
}
}, getActorContext().getClientDispatcher());
return;
if(failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
- localTransactionContext = new NoOpTransactionContext(failure, getIdentifier());
+ Throwable resultingEx = failure;
+ if(failure instanceof AskTimeoutException) {
+ resultingEx = new ShardLeaderNotRespondingException(String.format(
+ "Could not create a %s transaction on shard %s. The shard leader isn't responding.",
+ parent.getType(), shardName), failure);
+ } else if(!(failure instanceof NoShardLeaderException)) {
+ resultingEx = new Exception(String.format(
+ "Error creating %s transaction on shard %s", parent.getType(), shardName), failure);
+ }
+
+ localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier());
} else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
localTransactionContext = createValidTransactionContext(
CreateTransactionReply.fromSerializable(response));
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
- private void noLeaderError(Object message) {
+ private void noLeaderError(String errMessage, Object message) {
// TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
// it more resilient in case we're in the process of electing a new leader.
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Could not find the leader for shard %s. This typically happens" +
- " when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.", persistenceId()))), getSelf());
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
}
private void handleBatchedModifications(BatchedModifications batched) {
// window where it could have a stale leader during leadership transitions.
//
if(isLeader()) {
+ failIfIsolatedLeader(getSender());
+
try {
commitCoordinator.handleBatchedModifications(batched, getSender(), this);
} catch (Exception e) {
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
} else {
- noLeaderError(batched);
+ noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
}
}
}
+ private boolean failIfIsolatedLeader(ActorRef sender) {
+ if(getRaftState() == RaftState.IsolatedLeader) {
+ sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Shard %s was the leader but has lost contact with all of its followers. Either all" +
+ " other follower nodes are down or this node is isolated by a network partition.",
+ persistenceId()))), getSelf());
+ return true;
+ }
+
+ return false;
+ }
+
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
if (isLeader()) {
+ failIfIsolatedLeader(getSender());
+
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
} else {
- noLeaderError(message);
+ noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
}
}
}
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
- " when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.", persistenceId()))), getSelf());
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
+ "Could not create a shard transaction", persistenceId())), getSelf());
}
}
private void createTransaction(CreateTransaction createTransaction) {
try {
+ if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
+ failIfIsolatedLeader(getSender())) {
+ return;
+ }
+
ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
createTransaction.getVersion());
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
/**
* The ShardManager has the following jobs,
LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+ FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+ if(shardInformation.isShardInitialized()) {
+ // If the shard is already initialized then we'll wait enough time for the shard to
+ // elect a leader, ie 2 times the election timeout.
+ timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+ .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
+ }
+
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
- datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+ timeout, getSelf(),
new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
getContext().dispatcher(), getSelf());
}
private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
- return new NoShardLeaderException(String.format(
- "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
- "recovering and a leader is being elected. Try again later.", shardId));
+ return new NoShardLeaderException(null, shardId.toString());
}
private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
}
boolean isShardReadyWithLeaderId() {
- return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+ return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
+ (isLeader() || peerAddresses.get(leaderId) != null);
}
boolean isShardInitialized() {
*/
package org.opendaylight.controller.cluster.datastore.exceptions;
+import com.google.common.base.Strings;
+
/**
* Exception indicating a shard has no current leader.
*
public class NoShardLeaderException extends RuntimeException {
private static final long serialVersionUID = 1L;
- public NoShardLeaderException(String message){
+ public NoShardLeaderException(String message) {
super(message);
}
+
+ public NoShardLeaderException(String message, String shardName) {
+ super(String.format("%sShard %s currently has no leader. Try again later.",
+ (Strings.isNullOrEmpty(message) ? "" : message + ". "), shardName));
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+/**
+ * Exception indicating a shard leader is not responding to messages.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardLeaderNotRespondingException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public ShardLeaderNotRespondingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Objects;
protected final String memberName = "mock-member";
- protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2);
+ private final int operationTimeoutInSeconds = 2;
+ protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder()
+ .operationTimeoutInSeconds(operationTimeoutInSeconds);
@BeforeClass
public static void setUpClass() throws IOException {
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+ doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(prefix, type));
+ eqCreateTransaction(prefix, type), any(Timeout.class));
}
return txActorRef;
future.checkedGet(5, TimeUnit.SECONDS);
fail("Expected ReadFailedException");
} catch(ReadFailedException e) {
- throw e.getCause();
+ assertNotNull("Expected a cause", e.getCause());
+ if(e.getCause().getCause() != null) {
+ throw e.getCause().getCause();
+ } else {
+ throw e.getCause();
+ }
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
// by setting the election timeout, which is based on the heartbeat interval, really high.
datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
- datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
- // Set the leader election timeout low for the test.
+ DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
+
+ Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
+ new FindLocalShard(shardName, true));
+ assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
- datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
+ // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
- DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
+ datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
+ dataStore.onDatastoreContextUpdated(datastoreContextBuilder.build());
// Create the write Tx.
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.cluster.Cluster;
+import akka.pattern.AskTimeoutException;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
import java.math.BigInteger;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
- private static final String MODULE_SHARDS_CONFIG = "module-shards-member1-and-2.conf";
+ private static final String MODULE_SHARDS_CONFIG_2 = "module-shards-member1-and-2.conf";
+ private static final String MODULE_SHARDS_CONFIG_3 = "module-shards-member1-and-2-and-3.conf";
private ActorSystem leaderSystem;
private ActorSystem followerSystem;
+ private ActorSystem follower2System;
private final DatastoreContext.Builder leaderDatastoreContextBuilder =
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
private IntegrationTestKit leaderTestKit;
@Before
- public void setUpClass() {
+ public void setUp() {
leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
+
+ follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
+ Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
}
@After
- public void tearDownClass() {
+ public void tearDown() {
JavaTestKit.shutdownActorSystem(leaderSystem);
JavaTestKit.shutdownActorSystem(followerSystem);
+ JavaTestKit.shutdownActorSystem(follower2System);
}
private void initDatastores(String type) {
+ initDatastores(type, MODULE_SHARDS_CONFIG_2);
+ }
+
+ private void initDatastores(String type, String moduleShardsConfig) {
leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
- followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
- followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
+ leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
- leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
+ followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+ followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
}
DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
- DistributedDataStore newMember1Datastore = newMember1TestKit.
- setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
+ newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_2, false, SHARD_NAMES);
followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car);
}
+
+ @Test(expected=NoShardLeaderException.class)
+ public void testTransactionWithIsolatedLeader() throws Throwable {
+ leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(300);
+ String testName = "testTransactionWithIsolatedLeader";
+ initDatastores(testName);
+
+ JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+
+ Uninterruptibles.sleepUninterruptibly(leaderDistributedDataStore.getActorContext().getDatastoreContext()
+ .getShardRaftConfig().getElectionTimeOutInterval().toMillis() * 3, TimeUnit.MILLISECONDS);
+
+ DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ try {
+ followerTestKit.doCommit(writeTx.ready());
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ @Test(expected=AskTimeoutException.class)
+ public void testTransactionWithShardLeaderNotResponding() throws Throwable {
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(30);
+ initDatastores("testTransactionWithShardLeaderNotResponding");
+
+ // Do an initial read to get the primary shard info cached.
+
+ DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+ readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+
+ // Shutdown the leader and try to create a new tx.
+
+ JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+ followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
+ followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+
+ DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+ rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ try {
+ followerTestKit.doCommit(rwTx.ready());
+ } catch (ExecutionException e) {
+ assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(),
+ e.getCause() instanceof ShardLeaderNotRespondingException);
+ assertNotNull("Expected a nested cause", e.getCause().getCause());
+ throw e.getCause().getCause();
+ }
+ }
+
+ @Test(expected=NoShardLeaderException.class)
+ public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable {
+ initDatastores("testTransactionWithCreateTxFailureDueToNoLeader");
+
+ // Do an initial read to get the primary shard info cached.
+
+ DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+ readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+
+ // Shutdown the leader and try to create a new tx.
+
+ JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+ followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1);
+ followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+
+ DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+ rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ try {
+ followerTestKit.doCommit(rwTx.ready());
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ @Test
+ public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(30);
+ String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
+ initDatastores(testName, MODULE_SHARDS_CONFIG_3);
+
+ DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+ IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
+ follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_3, false, SHARD_NAMES);
+
+ // Do an initial read to get the primary shard info cached.
+
+ DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+ readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+
+ // Shutdown the leader and try to create a new tx.
+
+ JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+ followerDatastoreContextBuilder.operationTimeoutInMillis(500);
+ followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+
+ DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+ rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ followerTestKit.doCommit(rwTx.ready());
+ }
}
private static TestActorRef<MessageCollectorActor> mockShardActor;
private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
- dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
+ dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
+ .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
String name = new ShardIdentifier(shardName, memberName,"config").toString();
}};
}
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+ null, RaftState.IsolatedLeader.name()), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
+
+ expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
+ }};
+ }
+
@Test
public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
new JavaTestKit(getSystem()) {{
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
+import akka.util.Timeout;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
- eqCreateTransaction(tx2MemberName, READ_WRITE));
+ eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
}
@Test(expected=IllegalStateException.class)
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.Futures;
+import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
}
doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), any());
+ any(ActorSelection.class), any(), any(Timeout.class));
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
- eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
+ eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
+ any(Timeout.class));
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
eq(getSystem().actorSelection(actorRef.path())),
- eqCreateTransaction(memberName, READ_WRITE));
+ eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
}
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
- setTransactionId("txn-1").setTransactionActorPath(actorPath).
- setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
- doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(memberName, READ_WRITE));
+ doReturn(incompleteFuture()).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
doReturn(true).when(mockActorContext).isPathLocal(actorPath);
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(memberName, READ_WRITE));
+ eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
doReturn(true).when(mockActorContext).isPathLocal(anyString());
expectBatchedModifications(1);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), any(ReadyTransaction.class));
-
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
transactionProxy.ready();
doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(memberName, TransactionType.READ_ONLY));
+ eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
}
}
}
+
+Member3 {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
+
+ akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ loglevel = "DEBUG"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2557
+ }
+ }
+
+ cluster {
+ auto-down-unreachable-after = 100s
+
+ roles = [
+ "member-3"
+ ]
+ }
+ }
+}
--- /dev/null
+module-shards = [
+ {
+ name = "people"
+ shards = [
+ {
+ name="people"
+ replicas = [
+ "member-1",
+ "member-2",
+ "member-3"
+ ]
+ }
+ ]
+ },
+ {
+ name = "cars"
+ shards = [
+ {
+ name="cars"
+ replicas = [
+ "member-1",
+ "member-2",
+ "member-3"
+ ]
+ }
+ ]
+ }
+]