From a51163ead1d60e66eeaf3691adb70b019ce60fb2 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 11 Dec 2015 03:27:28 -0500 Subject: [PATCH] Transaction message retry when no shard leader present Implemented retry of transaction ready messages, ForwardedReadyTransaction, ReadyLocalTransaction, and BatchedModifications, when there's no current shard leader. A new class, ShardTransactionMessageRetrySupport, maintains a list of messages to retry and handles the retry logic. If there is no leader, the message is added to the list and a timer (2 * election time out) is started. If a leader is elected, on leader changed, the messages are retried. If no leader is elected in time, the messages are removed and NoShardLeaderException is returned. Change-Id: Iade3fd245982d75ee97acf0534e9224551d9e45d Signed-off-by: Tom Pantelis --- .../controller/cluster/datastore/Shard.java | 28 +-- .../datastore/ShardTransactionFactory.java | 7 +- .../ShardTransactionMessageRetrySupport.java | 103 +++++++++++ .../messages/ForwardedReadyTransaction.java | 6 + ...butedDataStoreRemotingIntegrationTest.java | 161 +++++++++++++----- .../cluster/datastore/IntegrationTestKit.java | 40 ++++- .../cluster/datastore/ShardTest.java | 28 +++ .../cluster/datastore/ShardTestKit.java | 14 +- .../module-shards-cars-member-1-and-2.conf | 14 ++ 9 files changed, 340 insertions(+), 61 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionMessageRetrySupport.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1-and-2.conf diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index f7b3461d33..5b8a50e188 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -121,7 +121,7 @@ public class Shard extends RaftActor { private ShardSnapshot restoreFromSnapshot; - + private final ShardTransactionMessageRetrySupport messageRetrySupport; protected Shard(AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), @@ -163,8 +163,7 @@ public class Shard extends RaftActor { snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name); - - + messageRetrySupport = new ShardTransactionMessageRetrySupport(this); } private void setTransactionCommitTimeout() { @@ -184,6 +183,8 @@ public class Shard extends RaftActor { super.postStop(); + messageRetrySupport.close(); + if(txCommitTimeoutCheckSchedule != null) { txCommitTimeoutCheckSchedule.cancel(); } @@ -265,6 +266,8 @@ public class Shard extends RaftActor { sender().tell(store.getDataTree(), self()); } else if(message instanceof ServerRemoved){ context().parent().forward(message, context()); + } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { + messageRetrySupport.onTimerMessage(message); } else { super.onReceiveCommand(message); } @@ -410,12 +413,6 @@ public class Shard extends RaftActor { commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } - private void noLeaderError(String errMessage) { - // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make - // it more resilient in case we're in the process of electing a new leader. - getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf()); - } - protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) { try { commitCoordinator.handleBatchedModifications(batched, sender, this); @@ -452,7 +449,8 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); leader.forward(batched, getContext()); } else { - noLeaderError("Could not commit transaction " + batched.getTransactionID()); + messageRetrySupport.addMessageToRetry(batched, getSender(), + "Could not commit transaction " + batched.getTransactionID()); } } } @@ -491,7 +489,8 @@ public class Shard extends RaftActor { message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(message, getContext()); } else { - noLeaderError("Could not commit transaction " + message.getTransactionID()); + messageRetrySupport.addMessageToRetry(message, getSender(), + "Could not commit transaction " + message.getTransactionID()); } } } @@ -511,7 +510,8 @@ public class Shard extends RaftActor { readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(readyLocal, getContext()); } else { - noLeaderError("Could not commit transaction " + forwardedReady.getTransactionID()); + messageRetrySupport.addMessageToRetry(forwardedReady, getSender(), + "Could not commit transaction " + forwardedReady.getTransactionID()); } } } @@ -711,6 +711,10 @@ public class Shard extends RaftActor { @Override protected void onLeaderChanged(String oldLeader, String newLeader) { shardMBean.incrementLeadershipChangeCount(); + + if(hasLeader()) { + messageRetrySupport.retryMessages(); + } } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java index f8b8b9c952..887f656205 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java @@ -7,9 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Preconditions; import akka.actor.ActorRef; import akka.actor.UntypedActorContext; +import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; @@ -43,10 +43,15 @@ class ShardTransactionActorFactory { switch (type) { case READ_ONLY: transaction = dataTree.newReadOnlyTransaction(transactionID.toString(), transactionChainID); + shardMBean.incrementReadOnlyTransactionCount(); break; case READ_WRITE: + transaction = dataTree.newReadWriteTransaction(transactionID.toString(), transactionChainID); + shardMBean.incrementReadWriteTransactionCount(); + break; case WRITE_ONLY: transaction = dataTree.newReadWriteTransaction(transactionID.toString(), transactionChainID); + shardMBean.incrementWriteOnlyTransactionCount(); break; default: throw new IllegalArgumentException("Unsupported transaction type " + type); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionMessageRetrySupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionMessageRetrySupport.java new file mode 100644 index 0000000000..291867cf8f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionMessageRetrySupport.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Status.Failure; +import java.io.Closeable; +import java.util.LinkedHashSet; +import java.util.Set; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +/** + * Supporting class for Shard that maintains state for retrying transaction messages when there is no leader. + * + * @author Thomas Pantelis + */ +class ShardTransactionMessageRetrySupport implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(ShardTransactionMessageRetrySupport.class); + + static final Class TIMER_MESSAGE_CLASS = MessageInfo.class; + + private final Set messagesToRetry = new LinkedHashSet<>(); + private final Shard shard; + + ShardTransactionMessageRetrySupport(Shard shard) { + this.shard = shard; + } + + void addMessageToRetry(Object message, ActorRef replyTo, String failureMessage) { + LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message); + + MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage); + + FiniteDuration period = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2); + messageInfo.timer = shard.getContext().system().scheduler().scheduleOnce(period, shard.getSelf(), + messageInfo, shard.getContext().dispatcher(), ActorRef.noSender()); + + messagesToRetry.add(messageInfo); + } + + void retryMessages() { + if(messagesToRetry.isEmpty()) { + return; + } + + MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]); + messagesToRetry.clear(); + + for(MessageInfo info: copy) { + LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message); + info.retry(shard); + } + } + + void onTimerMessage(Object message) { + MessageInfo messageInfo = (MessageInfo)message; + + LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message); + + messagesToRetry.remove(messageInfo); + messageInfo.timedOut(shard); + } + + @Override + public void close() { + for(MessageInfo info: messagesToRetry) { + info.timedOut(shard); + } + + messagesToRetry.clear(); + } + + private static class MessageInfo { + final Object message; + final ActorRef replyTo; + final String failureMessage; + Cancellable timer; + + MessageInfo(Object message, ActorRef replyTo, String failureMessage) { + this.message = message; + this.replyTo = replyTo; + this.failureMessage = failureMessage; + } + + void retry(Shard shard) { + timer.cancel(); + shard.getSelf().tell(message, replyTo); + } + + void timedOut(Shard shard) { + replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())), shard.getSelf()); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index e30d2055c9..b995192113 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -51,4 +51,10 @@ public class ForwardedReadyTransaction { public boolean isDoImmediateCommit() { return doImmediateCommit; } + + @Override + public String toString() { + return "ForwardedReadyTransaction [transactionID=" + transactionID + ", doImmediateCommit=" + doImmediateCommit + + ", txnClientVersion=" + txnClientVersion + "]"; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 005e904e4c..3c4c9046c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -40,8 +40,11 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier; +import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; @@ -50,6 +53,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; @@ -87,23 +92,26 @@ import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFac */ public class DistributedDataStoreRemotingIntegrationTest { - private static final String[] SHARD_NAMES = {"cars", "people"}; + private static final String[] CARS_AND_PEOPLE = {"cars", "people"}; + private static final String[] CARS = {"cars"}; private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559"); - private static final String MODULE_SHARDS_CONFIG_2 = "module-shards-member1-and-2.conf"; - private static final String MODULE_SHARDS_CONFIG_3 = "module-shards-member1-and-2-and-3.conf"; + private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf"; + private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf"; + private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf"; private ActorSystem leaderSystem; private ActorSystem followerSystem; private ActorSystem follower2System; private final DatastoreContext.Builder leaderDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); private final DatastoreContext.Builder followerDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5). + customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); private DistributedDataStore followerDistributedDataStore; private DistributedDataStore leaderDistributedDataStore; @@ -129,19 +137,23 @@ public class DistributedDataStoreRemotingIntegrationTest { JavaTestKit.shutdownActorSystem(follower2System); } - private void initDatastores(String type) { - initDatastores(type, MODULE_SHARDS_CONFIG_2); + private void initDatastoresWithCars(String type) { + initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS); } - private void initDatastores(String type, String moduleShardsConfig) { + private void initDatastoresWithCarsAndPeople(String type) { + initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE); + } + + private void initDatastores(String type, String moduleShardsConfig, String[] shards) { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES); + leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards); followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); - followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES); + followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards); } private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception { @@ -171,7 +183,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testWriteTransactionWithSingleShard() throws Exception { String testName = "testWriteTransactionWithSingleShard"; - initDatastores(testName); + initDatastoresWithCars(testName); String followerCarShardName = "member-2-shard-cars-" + testName; InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class ); @@ -220,7 +232,7 @@ public class DistributedDataStoreRemotingIntegrationTest { ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2")); DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder). - setupDistributedDataStore(testName, "module-shards-member2", true, SHARD_NAMES); + setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE); verifyCars(member2Datastore.newReadOnlyTransaction(), car2); @@ -229,7 +241,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testReadWriteTransactionWithSingleShard() throws Exception { - initDatastores("testReadWriteTransactionWithSingleShard"); + initDatastoresWithCars("testReadWriteTransactionWithSingleShard"); DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", rwTx); @@ -255,7 +267,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testWriteTransactionWithMultipleShards() throws Exception { - initDatastores("testWriteTransactionWithMultipleShards"); + initDatastoresWithCarsAndPeople("testWriteTransactionWithMultipleShards"); DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); @@ -278,7 +290,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testReadWriteTransactionWithMultipleShards() throws Exception { - initDatastores("testReadWriteTransactionWithMultipleShards"); + initDatastoresWithCarsAndPeople("testReadWriteTransactionWithMultipleShards"); DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", rwTx); @@ -301,7 +313,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testTransactionChainWithSingleShard() throws Exception { - initDatastores("testTransactionChainWithSingleShard"); + initDatastoresWithCars("testTransactionChainWithSingleShard"); DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -348,7 +360,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testTransactionChainWithMultipleShards() throws Exception{ - initDatastores("testTransactionChainWithMultipleShards"); + initDatastoresWithCarsAndPeople("testTransactionChainWithMultipleShards"); DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -403,7 +415,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testChainedTransactionFailureWithSingleShard() throws Exception { - initDatastores("testChainedTransactionFailureWithSingleShard"); + initDatastoresWithCars("testChainedTransactionFailureWithSingleShard"); ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder().put( @@ -436,7 +448,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testChainedTransactionFailureWithMultipleShards() throws Exception { - initDatastores("testChainedTransactionFailureWithMultipleShards"); + initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards"); ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder().put( @@ -474,7 +486,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testSingleShardTransactionsWithLeaderChanges() throws Exception { String testName = "testSingleShardTransactionsWithLeaderChanges"; - initDatastores(testName); + initDatastoresWithCars(testName); String followerCarShardName = "member-2-shard-cars-" + testName; InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class ); @@ -492,12 +504,12 @@ public class DistributedDataStoreRemotingIntegrationTest { // Switch the leader to the follower - followerDatastoreContextBuilder.shardElectionTimeoutFactor(1); - sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. + shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); JavaTestKit.shutdownActorSystem(leaderSystem, null, true); - followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES); + followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS); @@ -505,9 +517,9 @@ public class DistributedDataStoreRemotingIntegrationTest { DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder(). shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder); - newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_2, false, SHARD_NAMES); + newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS); // Write a car entry to the new leader - should switch to local Tx @@ -524,7 +536,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { - initDatastores("testReadyLocalTransactionForwardedToLeader"); + initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); Optional carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars"); @@ -586,7 +598,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testForwardedReadyTransactionForwardedToLeader() throws Exception { - initDatastores("testForwardedReadyTransactionForwardedToLeader"); + initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); Optional carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars"); @@ -648,16 +660,80 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } + @Test + public void testTransactionForwardedToLeaderAfterRetry() throws Exception { + initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry"); + + // Do an initial write to get the primary shard info cached. + + DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + // Wait for the commit to be replicated to the follower. + + MemberNode.verifyRaftState(followerDistributedDataStore, "cars", new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertEquals("getLastApplied", 0, raftState.getLastApplied()); + } + }); + + // Create and prepare wo and rw tx's. + + writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + writeTx.write(CarsModel.newCarPath("optima"), car1); + + DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + readWriteTx.write(CarsModel.newCarPath("sportage"), car2); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()); + } + }); + + // Disable elections on the leader so it switches to follower. + + sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. + customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()). + shardElectionTimeoutFactor(10)); + + leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); + + // Submit tx's and enable elections on the follower so it becomes the leader, at which point the + // readied tx's should get forwarded from the previous leader. + + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. + customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1)); + + followerTestKit.doCommit(cohort1); + followerTestKit.doCommit(cohort2); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); + } + @Test(expected=NoShardLeaderException.class) public void testTransactionWithIsolatedLeader() throws Throwable { - leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(300); + leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200); String testName = "testTransactionWithIsolatedLeader"; - initDatastores(testName); + initDatastoresWithCars(testName); JavaTestKit.shutdownActorSystem(followerSystem, null, true); - Uninterruptibles.sleepUninterruptibly(leaderDistributedDataStore.getActorContext().getDatastoreContext() - .getShardRaftConfig().getElectionTimeOutInterval().toMillis() * 3, TimeUnit.MILLISECONDS); + MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()); + } + }); DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction(); writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); @@ -671,8 +747,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test(expected=AskTimeoutException.class) public void testTransactionWithShardLeaderNotResponding() throws Throwable { - followerDatastoreContextBuilder.shardElectionTimeoutFactor(30); - initDatastores("testTransactionWithShardLeaderNotResponding"); + initDatastoresWithCars("testTransactionWithShardLeaderNotResponding"); // Do an initial read to get the primary shard info cached. @@ -702,7 +777,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test(expected=NoShardLeaderException.class) public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable { - initDatastores("testTransactionWithCreateTxFailureDueToNoLeader"); + initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader"); // Do an initial read to get the primary shard info cached. @@ -715,8 +790,8 @@ public class DistributedDataStoreRemotingIntegrationTest { Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1); - sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. + operationTimeoutInMillis(10).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); @@ -731,14 +806,13 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception { - followerDatastoreContextBuilder.shardElectionTimeoutFactor(30); String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx"; - initDatastores(testName, MODULE_SHARDS_CONFIG_3); + initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS); DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder(). shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder); - follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_3, false, SHARD_NAMES); + follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS); // Do an initial read to get the primary shard info cached. @@ -749,8 +823,8 @@ public class DistributedDataStoreRemotingIntegrationTest { JavaTestKit.shutdownActorSystem(leaderSystem, null, true); - followerDatastoreContextBuilder.operationTimeoutInMillis(500); - sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. + operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); @@ -760,11 +834,12 @@ public class DistributedDataStoreRemotingIntegrationTest { } private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) { + final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build()); DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); Answer answer = new Answer() { @Override public DatastoreContext answer(InvocationOnMock invocation) { - return builder.build(); + return newBuilder.build(); } }; Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index ada0fba10e..c3d216be9c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -14,6 +14,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.Callable; @@ -22,6 +23,7 @@ import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -32,6 +34,9 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; public class IntegrationTestKit extends ShardTestKit { @@ -92,7 +97,7 @@ public class IntegrationTestKit extends ShardTestKit { for(String shardName: shardNames) { ActorRef shard = findLocalShard(actorContext, shardName); - assertNotNull("Shard was not created", shard); + assertNotNull("Shard was not created for " + shardName, shard); waitUntilLeader(shard); } @@ -101,13 +106,13 @@ public class IntegrationTestKit extends ShardTestKit { public void waitUntilNoLeader(ActorContext actorContext, String... shardNames) { for(String shardName: shardNames) { ActorRef shard = findLocalShard(actorContext, shardName); - assertNotNull("No local shard found", shard); + assertNotNull("No local shard found for " + shardName, shard); waitUntilNoLeader(shard); } } - private static ActorRef findLocalShard(ActorContext actorContext, String shardName) { + public static ActorRef findLocalShard(ActorContext actorContext, String shardName) { ActorRef shard = null; for(int i = 0; i < 20 * 5 && shard == null; i++) { Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); @@ -119,6 +124,31 @@ public class IntegrationTestKit extends ShardTestKit { return shard; } + public static void verifyShardStats(DistributedDataStore datastore, String shardName, ShardStatsVerifier verifier) + throws Exception { + ActorContext actorContext = datastore.getActorContext(); + + Future future = actorContext.findLocalShardAsync(shardName); + ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); + + AssertionError lastError = null; + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + ShardStats shardStats = (ShardStats)actorContext. + executeOperation(shardActor, Shard.GET_SHARD_MBEAN_MESSAGE); + + try { + verifier.verify(shardStats); + return; + } catch (AssertionError e) { + lastError = e; + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + } + + throw lastError; + } + void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath, NormalizedNode nodeToWrite) throws Exception { @@ -204,4 +234,8 @@ public class IntegrationTestKit extends ShardTestKit { } }, expType); } + + public interface ShardStatsVerifier { + void verify(ShardStats stats); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 25e37edf71..ae751fa65d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -49,6 +49,7 @@ import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; @@ -94,6 +95,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; @@ -1122,6 +1124,32 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testTransactionMessagesWithNoLeader() { + new ShardTestKit(getSystem()) {{ + dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()). + shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1); + final TestActorRef shard = actorFactory.createTestActor( + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testTransactionMessagesWithNoLeader"); + + waitUntilNoLeader(shard); + + shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef()); + Failure failure = expectMsgClass(Failure.class); + assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); + + shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx", + DataStoreVersions.CURRENT_VERSION, true), getRef()); + failure = expectMsgClass(Failure.class); + assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); + + shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef()); + failure = expectMsgClass(Failure.class); + assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); + }}; + } + @Test public void testReadyWithImmediateCommit() throws Exception{ testReadyWithImmediateCommit(true); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java index 7f41bb2277..8f3231b417 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java @@ -71,6 +71,7 @@ public class ShardTestKit extends JavaTestKit { public void waitUntilNoLeader(ActorRef shard) { FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); + Object lastResponse = null; for(int i = 0; i < 20 * 5; i++) { Future future = Patterns.ask(shard, new FindLeader(), new Timeout(duration)); try { @@ -78,16 +79,25 @@ public class ShardTestKit extends JavaTestKit { if(resp.getLeaderActor() == null) { return; } + + lastResponse = resp.getLeaderActor(); } catch(TimeoutException e) { + lastResponse = e; } catch(Exception e) { System.err.println("FindLeader threw ex"); e.printStackTrace(); + lastResponse = e; } - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } - Assert.fail("Unexpected leader found for shard " + shard.path()); + if(lastResponse instanceof Throwable) { + throw (AssertionError)new AssertionError( + String.format("Unexpected error occurred from FindLeader for shard %s", shard.path())). + initCause((Throwable)lastResponse); + } + + Assert.fail(String.format("Unexpected leader %s found for shard %s", lastResponse, shard.path())); } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1-and-2.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1-and-2.conf new file mode 100644 index 0000000000..c2090f4f3a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1-and-2.conf @@ -0,0 +1,14 @@ +module-shards = [ + { + name = "cars" + shards = [ + { + name="cars" + replicas = [ + "member-1", + "member-2" + ] + } + ] + } +] \ No newline at end of file -- 2.36.6