private ShardSnapshot restoreFromSnapshot;
-
+ private final ShardTransactionMessageRetrySupport messageRetrySupport;
protected Shard(AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
-
-
+ messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
}
private void setTransactionCommitTimeout() {
super.postStop();
+ messageRetrySupport.close();
+
if(txCommitTimeoutCheckSchedule != null) {
txCommitTimeoutCheckSchedule.cancel();
}
sender().tell(store.getDataTree(), self());
} else if(message instanceof ServerRemoved){
context().parent().forward(message, context());
+ } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
+ messageRetrySupport.onTimerMessage(message);
} else {
super.onReceiveCommand(message);
}
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
- private void noLeaderError(String errMessage) {
- // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
- // it more resilient in case we're in the process of electing a new leader.
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
- }
-
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
commitCoordinator.handleBatchedModifications(batched, sender, this);
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
} else {
- noLeaderError("Could not commit transaction " + batched.getTransactionID());
+ messageRetrySupport.addMessageToRetry(batched, getSender(),
+ "Could not commit transaction " + batched.getTransactionID());
}
}
}
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
} else {
- noLeaderError("Could not commit transaction " + message.getTransactionID());
+ messageRetrySupport.addMessageToRetry(message, getSender(),
+ "Could not commit transaction " + message.getTransactionID());
}
}
}
readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(readyLocal, getContext());
} else {
- noLeaderError("Could not commit transaction " + forwardedReady.getTransactionID());
+ messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
+ "Could not commit transaction " + forwardedReady.getTransactionID());
}
}
}
@Override
protected void onLeaderChanged(String oldLeader, String newLeader) {
shardMBean.incrementLeadershipChangeCount();
+
+ if(hasLeader()) {
+ messageRetrySupport.retryMessages();
+ }
}
@Override