// the primary/leader shard. However with timing and caching on the front-end, there's a small
// window where it could have a stale leader during leadership transitions.
//
- if(isLeader()) {
- failIfIsolatedLeader(getSender());
-
+ boolean isIsolatedLeader = isIsolatedLeader();
+ if (isLeader() && !isIsolatedLeader) {
handleBatchedModificationsLocal(batched, getSender());
} else {
ActorSelection leader = getLeader();
- if(leader != null) {
+ if (isIsolatedLeader || leader == null) {
+ messageRetrySupport.addMessageToRetry(batched, getSender(),
+ "Could not commit transaction " + batched.getTransactionID());
+ } else {
// TODO: what if this is not the first batch and leadership changed in between batched messages?
// We could check if the commitCoordinator already has a cached entry and forward all the previous
// batched modifications.
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
- } else {
- messageRetrySupport.addMessageToRetry(batched, getSender(),
- "Could not commit transaction " + batched.getTransactionID());
}
}
}
}
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
- if (isLeader()) {
- failIfIsolatedLeader(getSender());
+ LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
+ boolean isIsolatedLeader = isIsolatedLeader();
+ if (isLeader() && !isIsolatedLeader) {
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
}
} else {
ActorSelection leader = getLeader();
- if (leader != null) {
+ if (isIsolatedLeader || leader == null) {
+ messageRetrySupport.addMessageToRetry(message, getSender(),
+ "Could not commit transaction " + message.getTransactionID());
+ } else {
LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
- } else {
- messageRetrySupport.addMessageToRetry(message, getSender(),
- "Could not commit transaction " + message.getTransactionID());
}
}
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
- if (isLeader()) {
- failIfIsolatedLeader(getSender());
+ LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID());
+ boolean isIsolatedLeader = isIsolatedLeader();
+ if (isLeader() && !isIsolatedLeader) {
commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
} else {
ActorSelection leader = getLeader();
- if (leader != null) {
+ if (isIsolatedLeader || leader == null) {
+ messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
+ "Could not commit transaction " + forwardedReady.getTransactionID());
+ } else {
LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionID(),
forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(readyLocal, getContext());
- } else {
- messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
- "Could not commit transaction " + forwardedReady.getTransactionID());
}
}
}
store.closeAllTransactionChains();
}
+
+ if(hasLeader && !isIsolatedLeader()) {
+ messageRetrySupport.retryMessages();
+ }
}
@Override
protected void onLeaderChanged(String oldLeader, String newLeader) {
shardMBean.incrementLeadershipChangeCount();
- if(hasLeader()) {
+ if(hasLeader() && !isIsolatedLeader()) {
messageRetrySupport.retryMessages();
}
}
verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
}
- @Test(expected=NoShardLeaderException.class)
+ @Test
public void testTransactionWithIsolatedLeader() throws Throwable {
leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);
String testName = "testTransactionWithIsolatedLeader";
initDatastoresWithCars(testName);
- JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+ DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
+ followerDistributedDataStore.close();
+ followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() {
@Override
}
});
- DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction();
- writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-
try {
- followerTestKit.doCommit(writeTx.ready());
+ leaderTestKit.doCommit(failWriteTx.ready());
+ fail("Expected NoShardLeaderException");
} catch (ExecutionException e) {
- throw e.getCause();
+ assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass());
}
+
+ sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
+ shardElectionTimeoutFactor(100));
+
+ DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready();
+
+ followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName,
+ MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
+
+ leaderTestKit.doCommit(writeTxCohort);
}
@Test(expected=AskTimeoutException.class)