import java.util.function.Supplier;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
+ private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
+
private final String persistenceId;
private final AbstractDataStore dataStore;
onGetShardRole((GetShardRole) message);
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
+ } else if (message instanceof RegisterForShardAvailabilityChanges) {
+ onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message);
} else if (message instanceof DeleteSnapshotsFailure) {
LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
((DeleteSnapshotsFailure) message).cause());
}
}
+ private void onRegisterForShardAvailabilityChanges(RegisterForShardAvailabilityChanges message) {
+ LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
+
+ final Consumer<String> callback = message.getCallback();
+ shardAvailabilityCallbacks.add(callback);
+
+ getSender().tell(new Status.Success((Registration)
+ () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
+ }
+
private void onGetShardRole(final GetShardRole message) {
LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
private void onInitConfigListener() {
LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
- final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+ final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType =
org.opendaylight.mdsal.common.api.LogicalDatastoreType
.valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
}
configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
- configUpdateHandler.initListener(dataStore, type);
+ configUpdateHandler.initListener(dataStore, datastoreType);
}
private void onShutDown() {
}
self().tell((RunnableMessage) () -> {
+ // At any rate, invalidate primaryShardInfo cache
+ primaryShardInfoCache.remove(shardName);
+
shardActorsStopping.remove(shardName);
notifyOnCompleteTasks(failure, result);
}, ActorRef.noSender());
configuration.addPrefixShardConfiguration(config);
final Builder builder = newShardDatastoreContextBuilder(shardName);
- builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+ builder.logicalStoreType(config.getPrefix().getDatastoreType())
.storeRoot(config.getPrefix().getRootIdentifier());
DatastoreContext shardDatastoreContext = builder.build();
shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
+
+ notifyShardAvailabilityCallbacks(shardInformation);
}
checkReady();
}
}
+ private void notifyShardAvailabilityCallbacks(ShardInformation shardInformation) {
+ shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
+ }
+
private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
ShardInformation shardInfo = message.getShardInfo();
final ActorRef sender = getSender();
if (sender == null) {
- return; //why is a non-actor sending this message? Just ignore.
+ // why is a non-actor sending this message? Just ignore.
+ return;
}
String actorName = sender.path().name();
}
LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
- shardInformation.getShardName());
+ shardInformation);
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
timeout, getSelf(),
}
private void markMemberUnavailable(final MemberName memberName) {
- final String memberStr = memberName.getName();
for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- // XXX: why are we using String#contains() here?
- if (leaderId != null && leaderId.contains(memberStr)) {
+ if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
LOG.debug("Marking Leader {} as unavailable.", leaderId);
info.setLeaderAvailable(false);
primaryShardInfoCache.remove(info.getShardName());
+
+ notifyShardAvailabilityCallbacks(info);
}
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
}
private void markMemberAvailable(final MemberName memberName) {
- final String memberStr = memberName.getName();
for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- // XXX: why are we using String#contains() here?
- if (leaderId != null && leaderId.contains(memberStr)) {
+ if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
LOG.debug("Marking Leader {} as available.", leaderId);
info.setLeaderAvailable(true);
}
private void updateSchemaContext(final Object message) {
schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
- LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
+ LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
for (ShardInformation info : localShards.values()) {
info.setSchemaContext(schemaContext);
if (info.getActor() == null) {
LOG.debug("Creating Shard {}", info.getShardId());
info.setActor(newShardActor(info));
+ // Update peer address for every existing peer memeber to avoid missing sending
+ // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp.
+ String shardName = info.getShardName();
+ for (MemberName memberName : peerAddressResolver.getPeerMembers()) {
+ String peerId = getShardIdentifier(memberName, shardName).toString() ;
+ String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
+ info.updatePeerAddress(peerId, peerAddress, getSelf());
+ info.peerUp(memberName, peerId, getSelf());
+ LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
+ persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
+ }
} else {
info.getActor().tell(message, getSelf());
}
}
}
- restoreFromSnapshot = null; // null out to GC
+ // null out to GC
+ restoreFromSnapshot = null;
for (String shardName : memberShardNames) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);