From: Tom Pantelis Date: Fri, 11 Dec 2015 23:02:07 +0000 (-0500) Subject: Transaction retry when leader is isolated X-Git-Tag: release/beryllium~66 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=671ee25af46a1f852548de03cd0d3b8f74d7b7ba Transaction retry when leader is isolated Implemented retry of transaction ready messages, ForwardedReadyTransaction, ReadyLocalTransaction, and BatchedModifications, when the leader is isolated. Change-Id: I4524dd99f867173f8666de4aed637124ff1efa50 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 5b8a50e188..7867c91158 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -436,21 +436,20 @@ public class Shard extends RaftActor { // the primary/leader shard. However with timing and caching on the front-end, there's a small // window where it could have a stale leader during leadership transitions. // - if(isLeader()) { - failIfIsolatedLeader(getSender()); - + boolean isIsolatedLeader = isIsolatedLeader(); + if (isLeader() && !isIsolatedLeader) { handleBatchedModificationsLocal(batched, getSender()); } else { ActorSelection leader = getLeader(); - if(leader != null) { + if (isIsolatedLeader || leader == null) { + messageRetrySupport.addMessageToRetry(batched, getSender(), + "Could not commit transaction " + batched.getTransactionID()); + } else { // TODO: what if this is not the first batch and leadership changed in between batched messages? // We could check if the commitCoordinator already has a cached entry and forward all the previous // batched modifications. LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); leader.forward(batched, getContext()); - } else { - messageRetrySupport.addMessageToRetry(batched, getSender(), - "Could not commit transaction " + batched.getTransactionID()); } } } @@ -472,9 +471,10 @@ public class Shard extends RaftActor { } private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { - if (isLeader()) { - failIfIsolatedLeader(getSender()); + LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID()); + boolean isIsolatedLeader = isIsolatedLeader(); + if (isLeader() && !isIsolatedLeader) { try { commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); } catch (Exception e) { @@ -484,34 +484,35 @@ public class Shard extends RaftActor { } } else { ActorSelection leader = getLeader(); - if (leader != null) { + if (isIsolatedLeader || leader == null) { + messageRetrySupport.addMessageToRetry(message, getSender(), + "Could not commit transaction " + message.getTransactionID()); + } else { LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader); message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(message, getContext()); - } else { - messageRetrySupport.addMessageToRetry(message, getSender(), - "Could not commit transaction " + message.getTransactionID()); } } } private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) { - if (isLeader()) { - failIfIsolatedLeader(getSender()); + LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID()); + boolean isIsolatedLeader = isIsolatedLeader(); + if (isLeader() && !isIsolatedLeader) { commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); } else { ActorSelection leader = getLeader(); - if (leader != null) { + if (isIsolatedLeader || leader == null) { + messageRetrySupport.addMessageToRetry(forwardedReady, getSender(), + "Could not commit transaction " + forwardedReady.getTransactionID()); + } else { LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader); ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionID(), forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit()); readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(readyLocal, getContext()); - } else { - messageRetrySupport.addMessageToRetry(forwardedReady, getSender(), - "Could not commit transaction " + forwardedReady.getTransactionID()); } } } @@ -706,13 +707,17 @@ public class Shard extends RaftActor { store.closeAllTransactionChains(); } + + if(hasLeader && !isIsolatedLeader()) { + messageRetrySupport.retryMessages(); + } } @Override protected void onLeaderChanged(String oldLeader, String newLeader) { shardMBean.incrementLeadershipChangeCount(); - if(hasLeader()) { + if(hasLeader() && !isIsolatedLeader()) { messageRetrySupport.retryMessages(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 3c4c9046c8..f7a5630336 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -720,13 +720,21 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } - @Test(expected=NoShardLeaderException.class) + @Test public void testTransactionWithIsolatedLeader() throws Throwable { leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200); String testName = "testTransactionWithIsolatedLeader"; initDatastoresWithCars(testName); - JavaTestKit.shutdownActorSystem(followerSystem, null, true); + DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager()); + followerDistributedDataStore.close(); + followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager()); MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() { @Override @@ -735,14 +743,22 @@ public class DistributedDataStoreRemotingIntegrationTest { } }); - DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - try { - followerTestKit.doCommit(writeTx.ready()); + leaderTestKit.doCommit(failWriteTx.ready()); + fail("Expected NoShardLeaderException"); } catch (ExecutionException e) { - throw e.getCause(); + assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass()); } + + sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. + shardElectionTimeoutFactor(100)); + + DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready(); + + followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName, + MODULE_SHARDS_CARS_ONLY_1_2, false, CARS); + + leaderTestKit.doCommit(writeTxCohort); } @Test(expected=AskTimeoutException.class)