}
private void markMemberUnavailable(final MemberName memberName) {
- final String memberStr = memberName.getName();
for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- // XXX: why are we using String#contains() here?
- if (leaderId != null && leaderId.contains(memberStr)) {
+ if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
LOG.debug("Marking Leader {} as unavailable.", leaderId);
info.setLeaderAvailable(false);
}
private void markMemberAvailable(final MemberName memberName) {
- final String memberStr = memberName.getName();
for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- // XXX: why are we using String#contains() here?
- if (leaderId != null && leaderId.contains(memberStr)) {
+ if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
LOG.debug("Marking Leader {} as available.", leaderId);
info.setLeaderAvailable(true);
}
private void updateSchemaContext(final Object message) {
schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
- LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
+ LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
for (ShardInformation info : localShards.values()) {
info.setSchemaContext(schemaContext);
if (info.getActor() == null) {
LOG.debug("Creating Shard {}", info.getShardId());
info.setActor(newShardActor(info));
+ // Update peer address for every existing peer memeber to avoid missing sending
+ // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp.
+ String shardName = info.getShardName();
+ for (MemberName memberName : peerAddressResolver.getPeerMembers()) {
+ String peerId = getShardIdentifier(memberName, shardName).toString() ;
+ String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
+ info.updatePeerAddress(peerId, peerAddress, getSelf());
+ info.peerUp(memberName, peerId, getSelf());
+ LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
+ persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
+ }
} else {
info.getActor().tell(message, getSelf());
}