private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+ private final List<DelayedListenerRegistration> delayedRegisterOnAllListeners = new ArrayList<>();
DataChangeListenerSupport(final Shard shard) {
super(shard);
}
@Override
- void onLeadershipChange(final boolean isLeader) {
+ void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
+ LOG.debug("onLeadershipChange, isLeader: {}, hasLeader : {}", isLeader, hasLeader);
+
for (ActorSelection dataChangeListener : dataChangeListeners) {
dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
}
+ if(hasLeader) {
+ for (DelayedListenerRegistration reg : delayedRegisterOnAllListeners) {
+ registerDelayedListeners(reg);
+ }
+ delayedRegisterOnAllListeners.clear();
+ }
+
if (isLeader) {
for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
- if(!reg.isClosed()) {
- final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
- createDelegate(reg.getRegisterChangeListener());
- reg.setDelegate(res.getKey());
- if (res.getValue() != null) {
- reg.getInstance().onDataChanged(res.getValue());
- }
- }
+ registerDelayedListeners(reg);
}
delayedListenerRegistrations.clear();
}
}
+ private void registerDelayedListeners(DelayedListenerRegistration reg) {
+ if(!reg.isClosed()) {
+ final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+ createDelegate(reg.getRegisterChangeListener());
+ reg.setDelegate(res.getKey());
+ if (res.getValue() != null) {
+ reg.getInstance().onDataChanged(res.getValue());
+ }
+ }
+ }
+
@Override
- void onMessage(final RegisterChangeListener message, final boolean isLeader) {
+ void onMessage(final RegisterChangeListener message, final boolean isLeader, boolean hasLeader) {
- LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
+ LOG.debug("{}: registerDataChangeListener for {}, isLeader: {}, hasLeader : {}",
+ persistenceId(), message.getPath(), isLeader, hasLeader);
final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
- if (isLeader) {
+ if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
createDelegate(message);
registration = res.getKey();
LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
- delayedListenerRegistrations.add(delayedReg);
+ if(message.isRegisterOnAllInstances()) {
+ delayedRegisterOnAllListeners.add(delayedReg);
+ } else {
+ delayedListenerRegistrations.add(delayedReg);
+ }
registration = delayedReg;
event = null;
}
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
- dataChangeListeners.add(dataChangeListenerPath);
+ if(!message.isRegisterOnAllInstances()) {
+ dataChangeListeners.add(dataChangeListenerPath);
+ }
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);