- void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
- LOG.debug("onLeadershipChange, isLeader: {}, hasLeader : {}", isLeader, hasLeader);
-
- for (ActorSelection dataChangeListener : dataChangeListeners) {
- dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
- }
-
- if(hasLeader) {
- for (DelayedListenerRegistration reg : delayedRegisterOnAllListeners) {
- registerDelayedListeners(reg);
- }
- delayedRegisterOnAllListeners.clear();
- }
-
- if (isLeader) {
- for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
- registerDelayedListeners(reg);
- }
-
- delayedListenerRegistrations.clear();
- }
- }
-
- private void registerDelayedListeners(DelayedListenerRegistration reg) {
- if(!reg.isClosed()) {
- final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
- createDelegate(reg.getRegisterChangeListener());
- reg.setDelegate(res.getKey());
- if (res.getValue() != null) {
- reg.getInstance().onDataChanged(res.getValue());
- }
- }
- }
-
- @Override
- void onMessage(final RegisterChangeListener message, final boolean isLeader, boolean hasLeader) {
-
- LOG.debug("{}: registerDataChangeListener for {}, isLeader: {}, hasLeader : {}",
- persistenceId(), message.getPath(), isLeader, hasLeader);
-
- final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> registration;
- final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
- if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
- final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
- createDelegate(message);
- registration = res.getKey();
- event = res.getValue();
- } else {
- LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
-
- DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
- if(message.isRegisterOnAllInstances()) {
- delayedRegisterOnAllListeners.add(delayedReg);
- } else {
- delayedListenerRegistrations.add(delayedReg);
- }
- registration = delayedReg;
- event = null;
- }
-
- ActorRef listenerRegistration = createActor(DataChangeListenerRegistration.props(registration));
-
- LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
- persistenceId(), listenerRegistration.path());
-
- tellSender(new RegisterChangeListenerReply(listenerRegistration));
- if (event != null) {
- registration.getInstance().onDataChanged(event);
- }
- }
-
- @Override
- Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> createDelegate(
- final RegisterChangeListener message) {