import akka.japi.Creator;
import akka.japi.Function;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
/**
* The ShardManager has the following jobs,
- * <p>
+ * <ul>
* <li> Create all the local shard replicas that belong on this cluster member
+ * <li> Find the address of the local shard
* <li> Find the primary replica for any given shard
- * <li> Engage in shard replica elections which decide which replica should be the primary
- * </p>
- * <p/>
- * <h3>>Creation of Shard replicas</h3
- * <p>
- * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
- * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
- * </p>
- * <p/>
- * <h3> Replica Elections </h3>
- * <p/>
- * <p>
- * The Shard Manager uses multiple cues to initiate election.
- * <li> When a member of the cluster dies
- * <li> When a local shard replica dies
- * <li> When a local shard replica comes alive
- * </p>
+ * <li> Monitor the cluster members and store their addresses
+ * <ul>
*/
public class ShardManager extends AbstractUntypedActor {
// Stores a mapping between a member name and the address of the member
+ // Member names look like "member-1", "member-2" etc and are as specified
+ // in configuration
private final Map<String, Address> memberNameToAddress = new HashMap<>();
- private final Map<String, ActorPath> localShards = new HashMap<>();
-
+ // Stores a mapping between a shard name and it's corresponding information
+ // Shard names look like inventory, topology etc and are as specified in
+ // configuration
+ private final Map<String, ShardInformation> localShards = new HashMap<>();
+ // The type of a ShardManager reflects the type of the datastore itself
+ // A data store could be of type config/operational
private final String type;
private final ClusterWrapper cluster;
if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
findPrimary(
FindPrimary.fromSerializable(message));
-
+ } else if(message instanceof FindLocalShard){
+ findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
} else if (message instanceof ClusterEvent.MemberUp){
}
+ private void findLocalShard(FindLocalShard message) {
+ ShardInformation shardInformation =
+ localShards.get(message.getShardName());
+
+ if(shardInformation != null){
+ getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
+ return;
+ }
+
+ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+ }
+
private void ignoreMessage(Object message){
LOG.debug("Unhandled message : " + message);
}
}
private void memberUp(ClusterEvent.MemberUp message) {
- memberNameToAddress.put(message.member().roles().head(), message.member().address());
+ String memberName = message.member().roles().head();
+
+ memberNameToAddress.put(memberName , message.member().address());
+
+ for(ShardInformation info : localShards.values()){
+ String shardName = info.getShardName();
+ info.updatePeerAddress(getShardActorName(memberName, shardName),
+ getShardActorPath(shardName, memberName));
+ }
}
+ /**
+ * Notifies all the local shards of a change in the schema context
+ *
+ * @param message
+ */
private void updateSchemaContext(Object message) {
- for(ActorPath path : localShards.values()){
- getContext().system().actorSelection(path)
- .forward(message,
- getContext());
+ for(ShardInformation info : localShards.values()){
+ info.getActor().tell(message,getSelf());
}
}
List<String> members =
configuration.getMembersFromShardName(shardName);
- for(String memberName : members) {
- if (memberName.equals(cluster.getCurrentMemberName())) {
- // This is a local shard
- ActorPath shardPath = localShards.get(shardName);
- if (shardPath == null) {
- getSender()
- .tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
- return;
- }
- getSender().tell(new PrimaryFound(shardPath.toString()).toSerializable(),
- getSelf());
+ // First see if the there is a local replica for the shard
+ ShardInformation info = localShards.get(shardName);
+ if(info != null) {
+ ActorPath shardPath = info.getActorPath();
+ if (shardPath != null) {
+ getSender()
+ .tell(
+ new PrimaryFound(shardPath.toString()).toSerializable(),
+ getSelf());
return;
- } else {
- Address address = memberNameToAddress.get(memberName);
- if(address != null){
- String path =
- address.toString() + "/user/shardmanager-" + this.type + "/" + getShardActorName(
- memberName, shardName);
- getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
- return;
- }
+ }
+ }
+ if(cluster.getCurrentMemberName() != null) {
+ members.remove(cluster.getCurrentMemberName());
+ }
+ // There is no way for us to figure out the primary (for now) so assume
+ // that one of the remote nodes is a primary
+ for(String memberName : members) {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null){
+ String path =
+ getShardActorPath(shardName, memberName);
+ getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+ return;
}
}
-
getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
}
+ private String getShardActorPath(String shardName, String memberName) {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null) {
+ return address.toString() + "/user/shardmanager-" + this.type + "/"
+ + getShardActorName(
+ memberName, shardName);
+ }
+ return null;
+ }
+
+ /**
+ * Construct the name of the shard actor given the name of the member on
+ * which the shard resides and the name of the shard
+ *
+ * @param memberName
+ * @param shardName
+ * @return
+ */
private String getShardActorName(String memberName, String shardName){
return memberName + "-shard-" + shardName + "-" + this.type;
}
- // Create the shards that are local to this member
+ /**
+ * Create shards that are local to the member on which the ShardManager
+ * runs
+ *
+ */
private void createLocalShards() {
String memberName = this.cluster.getCurrentMemberName();
List<String> memberShardNames =
for(String shardName : memberShardNames){
String shardActorName = getShardActorName(memberName, shardName);
+ Map<String, String> peerAddresses = getPeerAddresses(shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(shardActorName), shardActorName);
- ActorPath path = actor.path();
- localShards.put(shardName, path);
+ .actorOf(Shard.props(shardActorName, peerAddresses),
+ shardActorName);
+ localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
}
}
+ /**
+ * Given the name of the shard find the addresses of all it's peers
+ *
+ * @param shardName
+ * @return
+ */
+ private Map<String, String> getPeerAddresses(String shardName){
+
+ Map<String, String> peerAddresses = new HashMap<>();
+
+ List<String> members =
+ this.configuration.getMembersFromShardName(shardName);
+
+ String currentMemberName = this.cluster.getCurrentMemberName();
+
+ for(String memberName : members){
+ if(!currentMemberName.equals(memberName)){
+ String shardActorName = getShardActorName(memberName, shardName);
+ String path =
+ getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardActorName, path);
+ }
+ }
+ return peerAddresses;
+ }
+
@Override
public SupervisorStrategy supervisorStrategy() {
);
}
+
+ private class ShardInformation {
+ private final String shardName;
+ private final ActorRef actor;
+ private final ActorPath actorPath;
+ private final Map<String, String> peerAddresses;
+
+ private ShardInformation(String shardName, ActorRef actor,
+ Map<String, String> peerAddresses) {
+ this.shardName = shardName;
+ this.actor = actor;
+ this.actorPath = actor.path();
+ this.peerAddresses = peerAddresses;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public ActorRef getActor(){
+ return actor;
+ }
+
+ public ActorPath getActorPath() {
+ return actorPath;
+ }
+
+ public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ public void updatePeerAddress(String peerId, String peerAddress){
+ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+ if(peerAddresses.containsKey(peerId)){
+ peerAddresses.put(peerId, peerAddress);
+
+ LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path());
+
+ actor
+ .tell(new PeerAddressResolved(peerId, peerAddress),
+ getSelf());
+
+ }
+ }
+ }
}