CDS: Retry remote front-end transactions on AskTimeoutException
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 65be23dfd0dd0de4b037710ab50604d31d6b0c81..ce3cae481e766f4ba4b7f93593e8d12a282ee294 100644 (file)
@@ -62,6 +62,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
@@ -410,13 +411,10 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
-    private void noLeaderError(Object message) {
+    private void noLeaderError(String errMessage, Object message) {
         // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
         // it more resilient in case we're in the process of electing a new leader.
-        getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
-            "Could not find the leader for shard %s. This typically happens" +
-            " when the system is coming up or recovering and a leader is being elected. Try again" +
-            " later.", persistenceId()))), getSelf());
+        getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
     }
 
     private void handleBatchedModifications(BatchedModifications batched) {
@@ -433,6 +431,8 @@ public class Shard extends RaftActor {
         // window where it could have a stale leader during leadership transitions.
         //
         if(isLeader()) {
+            failIfIsolatedLeader(getSender());
+
             try {
                 commitCoordinator.handleBatchedModifications(batched, getSender(), this);
             } catch (Exception e) {
@@ -449,13 +449,27 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
-                noLeaderError(batched);
+                noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
             }
         }
     }
 
+    private boolean failIfIsolatedLeader(ActorRef sender) {
+        if(getRaftState() == RaftState.IsolatedLeader) {
+            sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+                    "Shard %s was the leader but has lost contact with all of its followers. Either all" +
+                    " other follower nodes are down or this node is isolated by a network partition.",
+                    persistenceId()))), getSelf());
+            return true;
+        }
+
+        return false;
+    }
+
     private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
         if (isLeader()) {
+            failIfIsolatedLeader(getSender());
+
             try {
                 commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
             } catch (Exception e) {
@@ -470,7 +484,7 @@ public class Shard extends RaftActor {
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
-                noLeaderError(message);
+                noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
             }
         }
     }
@@ -520,10 +534,8 @@ public class Shard extends RaftActor {
         } else if (getLeader() != null) {
             getLeader().forward(message, getContext());
         } else {
-            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
-                "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
-                " when the system is coming up or recovering and a leader is being elected. Try again" +
-                " later.", persistenceId()))), getSelf());
+            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
+                    "Could not create a shard transaction", persistenceId())), getSelf());
         }
     }
 
@@ -541,6 +553,11 @@ public class Shard extends RaftActor {
 
     private void createTransaction(CreateTransaction createTransaction) {
         try {
+            if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
+                    failIfIsolatedLeader(getSender())) {
+                return;
+            }
+
             ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
                 createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
                 createTransaction.getVersion());