import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
- private void noLeaderError(Object message) {
+ private void noLeaderError(String errMessage, Object message) {
// TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
// it more resilient in case we're in the process of electing a new leader.
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Could not find the leader for shard %s. This typically happens" +
- " when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.", persistenceId()))), getSelf());
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
}
private void handleBatchedModifications(BatchedModifications batched) {
// window where it could have a stale leader during leadership transitions.
//
if(isLeader()) {
+ failIfIsolatedLeader(getSender());
+
try {
commitCoordinator.handleBatchedModifications(batched, getSender(), this);
} catch (Exception e) {
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
} else {
- noLeaderError(batched);
+ noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
}
}
}
+ private boolean failIfIsolatedLeader(ActorRef sender) {
+ if(getRaftState() == RaftState.IsolatedLeader) {
+ sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Shard %s was the leader but has lost contact with all of its followers. Either all" +
+ " other follower nodes are down or this node is isolated by a network partition.",
+ persistenceId()))), getSelf());
+ return true;
+ }
+
+ return false;
+ }
+
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
if (isLeader()) {
+ failIfIsolatedLeader(getSender());
+
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
} else {
- noLeaderError(message);
+ noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
}
}
}
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
- " when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.", persistenceId()))), getSelf());
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
+ "Could not create a shard transaction", persistenceId())), getSelf());
}
}
private void createTransaction(CreateTransaction createTransaction) {
try {
+ if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
+ failIfIsolatedLeader(getSender())) {
+ return;
+ }
+
ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
createTransaction.getVersion());