import akka.actor.ActorPath;
import akka.actor.ActorRef;
+import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
}
ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
- shardDatastoreContext, createShard.getShardPropsCreator());
+ shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
localShards.put(info.getShardName(), info);
mBean.addLocalShard(shardId.toString());
LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
- peerAddressResolver.addPeerAddress(memberName, message.member().address());
+ addPeerAddress(memberName, message.member().address());
+
+ checkReady();
+ }
+
+ private void addPeerAddress(String memberName, Address address) {
+ peerAddressResolver.addPeerAddress(memberName, address);
for(ShardInformation info : localShards.values()){
String shardName = info.getShardName();
info.peerUp(memberName, peerId, getSelf());
}
-
- checkReady();
}
private void memberReachable(ClusterEvent.ReachableMember message) {
String memberName = message.member().roles().head();
LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+ addPeerAddress(memberName, message.member().address());
+
markMemberAvailable(memberName);
}
Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
- shardPropsCreator));
+ shardPropsCreator, peerAddressResolver));
}
mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
private final String shardName;
private ActorRef actor;
private ActorPath actorPath;
- private final Map<String, String> peerAddresses;
+ private final Map<String, String> initialPeerAddresses;
private Optional<DataTree> localShardDataTree;
private boolean leaderAvailable = false;
private final DatastoreContext datastoreContext;
private final ShardPropsCreator shardPropsCreator;
+ private final ShardPeerAddressResolver addressResolver;
private ShardInformation(String shardName, ShardIdentifier shardId,
- Map<String, String> peerAddresses, DatastoreContext datastoreContext,
- ShardPropsCreator shardPropsCreator) {
+ Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
+ ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) {
this.shardName = shardName;
this.shardId = shardId;
- this.peerAddresses = peerAddresses;
+ this.initialPeerAddresses = initialPeerAddresses;
this.datastoreContext = datastoreContext;
this.shardPropsCreator = shardPropsCreator;
+ this.addressResolver = addressResolver;
}
Props newProps(SchemaContext schemaContext) {
- return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext);
+ return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext);
}
String getShardName() {
return localShardDataTree;
}
- Map<String, String> getPeerAddresses() {
- return peerAddresses;
- }
-
void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
- LOG.info("updatePeerAddress for peer {} with address {}", peerId,
- peerAddress);
- if(peerAddresses.containsKey(peerId)){
- peerAddresses.put(peerId, peerAddress);
-
- if(actor != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
- peerId, peerAddress, actor.path());
- }
+ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
- actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
+ if(actor != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+ peerId, peerAddress, actor.path());
}
- notifyOnShardInitializedCallbacks();
+ actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
}
+
+ notifyOnShardInitializedCallbacks();
}
void peerDown(String memberName, String peerId, ActorRef sender) {
- if(peerAddresses.containsKey(peerId) && actor != null) {
+ if(actor != null) {
actor.tell(new PeerDown(memberName, peerId), sender);
}
}
void peerUp(String memberName, String peerId, ActorRef sender) {
- if(peerAddresses.containsKey(peerId) && actor != null) {
+ if(actor != null) {
actor.tell(new PeerUp(memberName, peerId), sender);
}
}
boolean isShardReadyWithLeaderId() {
return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
- (isLeader() || peerAddresses.get(leaderId) != null);
+ (isLeader() || addressResolver.resolve(leaderId) != null);
}
boolean isShardInitialized() {
if(isLeader()) {
return Serialization.serializedActorPath(getActor());
} else {
- return peerAddresses.get(leaderId);
+ return addressResolver.resolve(leaderId);
}
}