import java.util.function.Supplier;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
private void onInitConfigListener() {
LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
- final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+ final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType =
org.opendaylight.mdsal.common.api.LogicalDatastoreType
.valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
}
configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
- configUpdateHandler.initListener(dataStore, type);
+ configUpdateHandler.initListener(dataStore, datastoreType);
}
private void onShutDown() {
}
self().tell((RunnableMessage) () -> {
+ // At any rate, invalidate primaryShardInfo cache
+ primaryShardInfoCache.remove(shardName);
+
shardActorsStopping.remove(shardName);
notifyOnCompleteTasks(failure, result);
}, ActorRef.noSender());
final ActorRef sender = getSender();
if (sender == null) {
- return; //why is a non-actor sending this message? Just ignore.
+ // why is a non-actor sending this message? Just ignore.
+ return;
}
String actorName = sender.path().name();
}
private void markMemberUnavailable(final MemberName memberName) {
- final String memberStr = memberName.getName();
for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- // XXX: why are we using String#contains() here?
- if (leaderId != null && leaderId.contains(memberStr)) {
+ if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
LOG.debug("Marking Leader {} as unavailable.", leaderId);
info.setLeaderAvailable(false);
}
private void markMemberAvailable(final MemberName memberName) {
- final String memberStr = memberName.getName();
for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- // XXX: why are we using String#contains() here?
- if (leaderId != null && leaderId.contains(memberStr)) {
+ if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
LOG.debug("Marking Leader {} as available.", leaderId);
info.setLeaderAvailable(true);
}
private void updateSchemaContext(final Object message) {
schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
- LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
+ LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
for (ShardInformation info : localShards.values()) {
info.setSchemaContext(schemaContext);
if (info.getActor() == null) {
LOG.debug("Creating Shard {}", info.getShardId());
info.setActor(newShardActor(info));
+ // Update peer address for every existing peer memeber to avoid missing sending
+ // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp.
+ String shardName = info.getShardName();
+ for (MemberName memberName : peerAddressResolver.getPeerMembers()) {
+ String peerId = getShardIdentifier(memberName, shardName).toString() ;
+ String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
+ info.updatePeerAddress(peerId, peerAddress, getSelf());
+ info.peerUp(memberName, peerId, getSelf());
+ LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
+ persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
+ }
} else {
info.getActor().tell(message, getSelf());
}
}
}
- restoreFromSnapshot = null; // null out to GC
+ // null out to GC
+ restoreFromSnapshot = null;
for (String shardName : memberShardNames) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);