getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
}
+ protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
+ try {
+ commitCoordinator.handleBatchedModifications(batched, sender, this);
+ } catch (Exception e) {
+ LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+ batched.getTransactionID(), e);
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ }
+
private void handleBatchedModifications(BatchedModifications batched) {
// This message is sent to prepare the modifications transaction directly on the Shard as an
// optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
if(isLeader()) {
failIfIsolatedLeader(getSender());
- try {
- commitCoordinator.handleBatchedModifications(batched, getSender(), this);
- } catch (Exception e) {
- LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
- batched.getTransactionID(), e);
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
- }
+ handleBatchedModificationsLocal(batched, getSender());
} else {
ActorSelection leader = getLeader();
if(leader != null) {
}
private boolean failIfIsolatedLeader(ActorRef sender) {
- if(getRaftState() == RaftState.IsolatedLeader) {
+ if(isIsolatedLeader()) {
sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
"Shard %s was the leader but has lost contact with all of its followers. Either all" +
" other follower nodes are down or this node is isolated by a network partition.",
return false;
}
+ protected boolean isIsolatedLeader() {
+ return getRaftState() == RaftState.IsolatedLeader;
+ }
+
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
if (isLeader()) {
failIfIsolatedLeader(getSender());
return commitCoordinator;
}
+ protected DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
protected abstract static class AbstractShardCreator implements Creator<Shard> {
private static final long serialVersionUID = 1L;