Bug 7407 - Add request leadership functionality to shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 19a2151d2fc925284a39d65960e80819d425e154..39ee810eb0f3b29a0544e516b56d9e086dc5fb2d 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
@@ -65,6 +66,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
@@ -80,6 +82,7 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
@@ -87,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -339,12 +343,40 @@ public class Shard extends RaftActor {
             } else if (message instanceof PersistAbortTransactionPayload) {
                 final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
                 persistPayload(txId, AbortTransactionPayload.create(txId), true);
+            } else if (message instanceof MakeLeaderLocal) {
+                onMakeLeaderLocal();
             } else {
                 super.handleNonRaftCommand(message);
             }
         }
     }
 
+    private void onMakeLeaderLocal() {
+        LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+        if (isLeader()) {
+            getSender().tell(new Status.Success(null), getSelf());
+            return;
+        }
+
+        final ActorSelection leader = getLeader();
+
+        if (leader == null) {
+            // Leader is not present. The cluster is most likely trying to
+            // elect a leader and we should let that run its normal course
+
+            // TODO we can wait for the election to complete and retry the
+            // request. We can also let the caller retry by sending a flag
+            // in the response indicating the request is "reTryable".
+            getSender().tell(new Failure(
+                    new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
+                            + "Currently there is no leader for " + persistenceId())),
+                    getSelf());
+            return;
+        }
+
+        leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
+    }
+
     // Acquire our frontend tracking handle and verify generation matches
     private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
         final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());