import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
+ private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
+
private final String persistenceId;
private final AbstractDataStore dataStore;
onGetShardRole((GetShardRole) message);
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
+ } else if (message instanceof RegisterForShardAvailabilityChanges) {
+ onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message);
} else if (message instanceof DeleteSnapshotsFailure) {
LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
((DeleteSnapshotsFailure) message).cause());
}
}
+ private void onRegisterForShardAvailabilityChanges(RegisterForShardAvailabilityChanges message) {
+ LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
+
+ final Consumer<String> callback = message.getCallback();
+ shardAvailabilityCallbacks.add(callback);
+
+ getSender().tell(new Status.Success((Registration)
+ () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
+ }
+
private void onGetShardRole(final GetShardRole message) {
LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
+
+ notifyShardAvailabilityCallbacks(shardInformation);
}
checkReady();
}
}
+ private void notifyShardAvailabilityCallbacks(ShardInformation shardInformation) {
+ shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
+ }
+
private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
ShardInformation shardInfo = message.getShardInfo();
}
LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
- shardInformation.getShardName());
+ shardInformation);
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
timeout, getSelf(),
info.setLeaderAvailable(false);
primaryShardInfoCache.remove(info.getShardName());
+
+ notifyShardAvailabilityCallbacks(info);
}
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());