From 08528168cbcf67ce2b0a897e5ac22888b1da53dd Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 10 Dec 2015 19:41:00 -0500 Subject: [PATCH] Handle leader change on ForwardedReadyTransaction in Shard When the ShardWriteTransaction actor sends the ForwardedReadyTransaction to the Shard, if the local shard is no longer the leader it still tries to commit the transaction. Replication fails and it ends up timing out but a side effect is that it persista a new log entry which isn't good. Therefore I added logic similar as was done with ReadyLocalTransaction to forward to the leader. For ReadyLocalTransaction, a special serializer was added to marshal it over the wire as a BatchedModifications message. Rather then creating another serializer, I converted the ForwardedReadyTransaction instance to a ReadyLocalTransaction instance. Change-Id: Ia44e86606846f2af9599ad222b7dcae1bd0cf804 Signed-off-by: Tom Pantelis --- .../controller/cluster/datastore/Shard.java | 29 ++++- .../cluster/datastore/AbstractShardTest.java | 10 +- ...butedDataStoreRemotingIntegrationTest.java | 116 ++++++++++++++++-- .../cluster/datastore/ShardTestKit.java | 2 +- .../ShardTransactionFailureTest.java | 4 +- .../datastore/ShardTransactionTest.java | 4 +- 6 files changed, 146 insertions(+), 19 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 0bb886a12d..f5b9f4c08d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -229,8 +229,7 @@ public class Shard extends RaftActor { } else if (BatchedModifications.class.isInstance(message)) { handleBatchedModifications((BatchedModifications)message); } else if (message instanceof ForwardedReadyTransaction) { - commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message, - getSender(), this); + handleForwardedReadyTransaction((ForwardedReadyTransaction) message); } else if (message instanceof ReadyLocalTransaction) { handleReadyLocalTransaction((ReadyLocalTransaction)message); } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { @@ -411,7 +410,7 @@ public class Shard extends RaftActor { commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } - private void noLeaderError(String errMessage, Object message) { + private void noLeaderError(String errMessage) { // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make // it more resilient in case we're in the process of electing a new leader. getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf()); @@ -453,7 +452,7 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); leader.forward(batched, getContext()); } else { - noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched); + noLeaderError("Could not commit transaction " + batched.getTransactionID()); } } } @@ -492,7 +491,27 @@ public class Shard extends RaftActor { message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(message, getContext()); } else { - noLeaderError("Could not commit transaction " + message.getTransactionID(), message); + noLeaderError("Could not commit transaction " + message.getTransactionID()); + } + } + } + + private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) { + if (isLeader()) { + failIfIsolatedLeader(getSender()); + + commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); + } else { + ActorSelection leader = getLeader(); + if (leader != null) { + LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader); + + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionID(), + forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit()); + readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); + leader.forward(readyLocal, getContext()); + } else { + noLeaderError("Could not commit transaction " + forwardedReady.getTransactionID()); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 7e8d23c6a8..39fe452b25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -264,13 +264,17 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } } - protected Object prepareForwardedReadyTransaction(ShardDataTreeCohort cohort, String transactionID, - short version, boolean doCommitOnReady) { + static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) { ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class); doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class)); doNothing().when(mockParent).abortTransaction(any(AbstractShardDataTreeTransaction.class)); + return mockParent; + } + + protected ForwardedReadyTransaction prepareForwardedReadyTransaction(ShardDataTreeCohort cohort, + String transactionID, short version, boolean doCommitOnReady) { return new ForwardedReadyTransaction(transactionID, version, - new ReadWriteShardDataTreeTransaction(mockParent, transactionID, + new ReadWriteShardDataTreeTransaction(newShardDataTreeTransactionParent(cohort), transactionID, mock(DataTreeModification.class)), true, doCommitOnReady); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 8100bdc6a2..005e904e4c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -16,10 +16,12 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; +import akka.dispatch.Futures; import akka.pattern.AskTimeoutException; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; @@ -28,6 +30,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.math.BigInteger; +import java.util.Arrays; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -40,7 +43,10 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; @@ -65,6 +71,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; @@ -525,16 +532,18 @@ public class DistributedDataStoreRemotingIntegrationTest { TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); dataTree.setSchemaContext(SchemaContextHelper.full()); - DataTreeModification modification = dataTree.takeSnapshot().newModification(); + // Send a tx with immediate commit. + + DataTreeModification modification = dataTree.takeSnapshot().newModification(); new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification); + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); + modification.ready(); - String transactionID = "tx-1"; - ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true); + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction("tx-1" , modification, true); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -542,10 +551,101 @@ public class DistributedDataStoreRemotingIntegrationTest { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); } - assertTrue("Expected response of type " + CommitTransactionReply.SERIALIZABLE_CLASS, - CommitTransactionReply.SERIALIZABLE_CLASS.equals(resp.getClass())); + assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass()); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1); + + // Send another tx without immediate commit. + + modification = dataTree.takeSnapshot().newModification(); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); + modification.ready(); + + readyLocal = new ReadyLocalTransaction("tx-2" , modification, false); + + carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); + resp = followerTestKit.expectMsgClass(Object.class); + if(resp instanceof akka.actor.Status.Failure) { + throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); + } + + assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); + + ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + ((ReadyTransactionReply)resp).getCohortPath()); + + ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( + leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2"); + cohort.canCommit().get(5, TimeUnit.SECONDS); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); + } + + @Test + public void testForwardedReadyTransactionForwardedToLeader() throws Exception { + initDatastores("testForwardedReadyTransactionForwardedToLeader"); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); + + Optional carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars"); + assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent()); + + carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); + DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class); - verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car); + // Send a tx with immediate commit. + + DataTreeModification modification = dataTree.takeSnapshot().newModification(); + new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); + new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); + + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); + + ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1", + DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( + Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true, true); + + carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); + Object resp = followerTestKit.expectMsgClass(Object.class); + if(resp instanceof akka.actor.Status.Failure) { + throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); + } + + assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass()); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1); + + // Send another tx without immediate commit. + + modification = dataTree.takeSnapshot().newModification(); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); + + forwardedReady = new ForwardedReadyTransaction("tx-2", + DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( + Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), true, false); + + carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); + resp = followerTestKit.expectMsgClass(Object.class); + if(resp instanceof akka.actor.Status.Failure) { + throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); + } + + assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); + + ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + ((ReadyTransactionReply)resp).getCohortPath()); + + ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( + leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2"); + cohort.canCommit().get(5, TimeUnit.SECONDS); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } @Test(expected=NoShardLeaderException.class) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java index ae607b9b3d..7f41bb2277 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java @@ -46,7 +46,7 @@ public class ShardTestKit extends JavaTestKit { } - public String waitUntilLeader(ActorRef shard) { + public static String waitUntilLeader(ActorRef shard) { FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); for(int i = 0; i < 20 * 5; i++) { Future future = Patterns.ask(shard, new FindLeader(), new Timeout(duration)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index fc1af285b5..3700c57d30 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -52,8 +52,10 @@ public class ShardTransactionFailureTest extends AbstractActorTest { private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore"); private ActorRef createShard(){ - return getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext). + ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext). schemaContext(TestModel.createTestContext()).props()); + ShardTestKit.waitUntilLeader(shard); + return shard; } @Test(expected = ReadFailedException.class) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 314f497a23..9e97d28c2c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -82,8 +82,10 @@ public class ShardTransactionTest extends AbstractActorTest { private int txCounter = 0; private ActorRef createShard() { - return getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext). + ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext). schemaContext(TestModel.createTestContext()).props()); + ShardTestKit.waitUntilLeader(shard); + return shard; } private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction transaction, String name) { -- 2.36.6