return commitCoordinator.getQueueSize();
}
+ public int getCohortCacheSize() {
+ return commitCoordinator.getCohortCacheSize();
+ }
+
@Override
protected Optional<ActorRef> getRoleChangeNotifier() {
return roleChangeNotifier;
// the primary/leader shard. However with timing and caching on the front-end, there's a small
// window where it could have a stale leader during leadership transitions.
//
- boolean isIsolatedLeader = isIsolatedLeader();
- if (isLeader() && !isIsolatedLeader) {
+ boolean isLeaderActive = isLeaderActive();
+ if (isLeader() && isLeaderActive) {
handleBatchedModificationsLocal(batched, getSender());
} else {
ActorSelection leader = getLeader();
- if (isIsolatedLeader || leader == null) {
+ if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(batched, getSender(),
"Could not commit transaction " + batched.getTransactionID());
} else {
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
- boolean isIsolatedLeader = isIsolatedLeader();
- if (isLeader() && !isIsolatedLeader) {
+ boolean isLeaderActive = isLeaderActive();
+ if (isLeader() && isLeaderActive) {
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
}
} else {
ActorSelection leader = getLeader();
- if (isIsolatedLeader || leader == null) {
+ if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(message, getSender(),
"Could not commit transaction " + message.getTransactionID());
} else {
private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID());
- boolean isIsolatedLeader = isIsolatedLeader();
- if (isLeader() && !isIsolatedLeader) {
+ boolean isLeaderActive = isLeaderActive();
+ if (isLeader() && isLeaderActive) {
commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
} else {
ActorSelection leader = getLeader();
- if (isIsolatedLeader || leader == null) {
+ if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
"Could not commit transaction " + forwardedReady.getTransactionID());
} else {
}
}
+ @Override
+ protected void pauseLeader(Runnable operation) {
+ LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ commitCoordinator.setRunOnPendingTransactionsComplete(operation);
+ }
+
@Override
public String persistenceId() {
return this.name;