2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.Map.Entry;
15 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
16 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
17 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
20 import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener, ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> {
28 private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
29 private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
30 private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
32 DataChangeListenerSupport(final Shard shard) {
37 void onLeadershipChange(final boolean isLeader) {
38 for (ActorSelection dataChangeListener : dataChangeListeners) {
39 dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
43 for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
45 final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
46 createDelegate(reg.getRegisterChangeListener());
47 reg.setDelegate(res.getKey());
48 if (res.getValue() != null) {
49 reg.getInstance().onDataChanged(res.getValue());
54 delayedListenerRegistrations.clear();
59 void onMessage(final RegisterChangeListener message, final boolean isLeader) {
61 LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
63 final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
64 NormalizedNode<?, ?>>> registration;
65 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
67 final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
68 createDelegate(message);
69 registration = res.getKey();
70 event = res.getValue();
72 LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
74 DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
75 delayedListenerRegistrations.add(delayedReg);
76 registration = delayedReg;
80 ActorRef listenerRegistration = createActor(DataChangeListenerRegistration.props(registration));
82 LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
83 persistenceId(), listenerRegistration.path());
85 tellSender(new RegisterChangeListenerReply(listenerRegistration));
87 registration.getInstance().onDataChanged(event);
92 Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> createDelegate(
93 final RegisterChangeListener message) {
94 ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
96 // Notify the listener if notifications should be enabled or not
97 // If this shard is the leader then it will enable notifications else
99 dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
101 // Now store a reference to the data change listener so it can be notified
102 // at a later point if notifications should be enabled or disabled
103 dataChangeListeners.add(dataChangeListenerPath);
105 AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
106 new DataChangeListenerProxy(dataChangeListenerPath);
108 LOG.debug("{}: Registering for path {}", persistenceId(), message.getPath());
110 return getShard().getDataStore().registerChangeListener(message.getPath(), listener,