Handle leader change on ForwardedReadyTransaction in Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 0bb886a12defb5a70df4f39b34866f4a2cc3890f..f5b9f4c08d6413d845344df44a49edcf9f1ffca9 100644 (file)
@@ -229,8 +229,7 @@ public class Shard extends RaftActor {
             } else if (BatchedModifications.class.isInstance(message)) {
                 handleBatchedModifications((BatchedModifications)message);
             } else if (message instanceof ForwardedReadyTransaction) {
-                commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
-                        getSender(), this);
+                handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
             } else if (message instanceof ReadyLocalTransaction) {
                 handleReadyLocalTransaction((ReadyLocalTransaction)message);
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
@@ -411,7 +410,7 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
-    private void noLeaderError(String errMessage, Object message) {
+    private void noLeaderError(String errMessage) {
         // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
         // it more resilient in case we're in the process of electing a new leader.
         getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
@@ -453,7 +452,7 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
-                noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
+                noLeaderError("Could not commit transaction " + batched.getTransactionID());
             }
         }
     }
@@ -492,7 +491,27 @@ public class Shard extends RaftActor {
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
-                noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
+                noLeaderError("Could not commit transaction " + message.getTransactionID());
+            }
+        }
+    }
+
+    private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
+        if (isLeader()) {
+            failIfIsolatedLeader(getSender());
+
+            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+        } else {
+            ActorSelection leader = getLeader();
+            if (leader != null) {
+                LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
+
+                ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionID(),
+                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+                readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
+                leader.forward(readyLocal, getContext());
+            } else {
+                noLeaderError("Could not commit transaction " + forwardedReady.getTransactionID());
             }
         }
     }