import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
+import akka.actor.Status;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
- new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
+ new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
- new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
+ new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
} else if (message instanceof PersistAbortTransactionPayload) {
final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
persistPayload(txId, AbortTransactionPayload.create(txId), true);
+ } else if (message instanceof MakeLeaderLocal) {
+ onMakeLeaderLocal();
} else {
super.handleNonRaftCommand(message);
}
}
}
+ private void onMakeLeaderLocal() {
+ LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+ if (isLeader()) {
+ getSender().tell(new Status.Success(null), getSelf());
+ return;
+ }
+
+ final ActorSelection leader = getLeader();
+
+ if (leader == null) {
+ // Leader is not present. The cluster is most likely trying to
+ // elect a leader and we should let that run its normal course
+
+ // TODO we can wait for the election to complete and retry the
+ // request. We can also let the caller retry by sending a flag
+ // in the response indicating the request is "reTryable".
+ getSender().tell(new Failure(
+ new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
+ + "Currently there is no leader for " + persistenceId())),
+ getSelf());
+ return;
+ }
+
+ leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
+ }
+
// Acquire our frontend tracking handle and verify generation matches
private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
final ClientIdentifier clientId = lhReq.getTarget().getClientId();
return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
- LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
+ LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);
}
}