// the primary/leader shard. However with timing and caching on the front-end, there's a small
// window where it could have a stale leader during leadership transitions.
//
- if(isLeader()) {
- failIfIsolatedLeader(getSender());
-
+ boolean isIsolatedLeader = isIsolatedLeader();
+ if (isLeader() && !isIsolatedLeader) {
handleBatchedModificationsLocal(batched, getSender());
} else {
ActorSelection leader = getLeader();
- if(leader != null) {
+ if (isIsolatedLeader || leader == null) {
+ messageRetrySupport.addMessageToRetry(batched, getSender(),
+ "Could not commit transaction " + batched.getTransactionID());
+ } else {
// TODO: what if this is not the first batch and leadership changed in between batched messages?
// We could check if the commitCoordinator already has a cached entry and forward all the previous
// batched modifications.
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
- } else {
- messageRetrySupport.addMessageToRetry(batched, getSender(),
- "Could not commit transaction " + batched.getTransactionID());
}
}
}
}
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
- if (isLeader()) {
- failIfIsolatedLeader(getSender());
+ LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
+ boolean isIsolatedLeader = isIsolatedLeader();
+ if (isLeader() && !isIsolatedLeader) {
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
}
} else {
ActorSelection leader = getLeader();
- if (leader != null) {
+ if (isIsolatedLeader || leader == null) {
+ messageRetrySupport.addMessageToRetry(message, getSender(),
+ "Could not commit transaction " + message.getTransactionID());
+ } else {
LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
- } else {
- messageRetrySupport.addMessageToRetry(message, getSender(),
- "Could not commit transaction " + message.getTransactionID());
}
}
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
- if (isLeader()) {
- failIfIsolatedLeader(getSender());
+ LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID());
+ boolean isIsolatedLeader = isIsolatedLeader();
+ if (isLeader() && !isIsolatedLeader) {
commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
} else {
ActorSelection leader = getLeader();
- if (leader != null) {
+ if (isIsolatedLeader || leader == null) {
+ messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
+ "Could not commit transaction " + forwardedReady.getTransactionID());
+ } else {
LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionID(),
forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(readyLocal, getContext());
- } else {
- messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
- "Could not commit transaction " + forwardedReady.getTransactionID());
}
}
}
store.closeAllTransactionChains();
}
+
+ if(hasLeader && !isIsolatedLeader()) {
+ messageRetrySupport.retryMessages();
+ }
}
@Override
protected void onLeaderChanged(String oldLeader, String newLeader) {
shardMBean.incrementLeadershipChangeCount();
- if(hasLeader()) {
+ if(hasLeader() && !isIsolatedLeader()) {
messageRetrySupport.retryMessages();
}
}