09586b270b733830e3e6f0d08f1a3caf29833799
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerSupport.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.concurrent.ConcurrentHashMap;
15 import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
16 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
17 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
18 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
19 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener> {
24     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
25
26     private final Collection<DelayedDataTreeChangeListenerRegistration<DOMDataTreeChangeListener>>
27             delayedDataTreeChangeListenerRegistrations = ConcurrentHashMap.newKeySet();
28     private final Collection<DelayedDataTreeChangeListenerRegistration<DOMDataTreeChangeListener>>
29             delayedListenerOnAllRegistrations = ConcurrentHashMap.newKeySet();
30     private final Collection<ActorSelection> leaderOnlyListenerActors = ConcurrentHashMap.newKeySet();
31     private final Collection<ActorSelection> allListenerActors = ConcurrentHashMap.newKeySet();
32
33     DataTreeChangeListenerSupport(final Shard shard) {
34         super(shard);
35     }
36
37     void doRegistration(final RegisterDataTreeChangeListener message, final ActorRef registrationActor) {
38         final ActorSelection listenerActor = processListenerRegistrationMessage(message);
39
40         final DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(listenerActor, getSelf());
41
42         LOG.debug("{}: Registering listenerActor {} for path {}", persistenceId(), listenerActor, message.getPath());
43
44         final ShardDataTree shardDataTree = getShard().getDataStore();
45         shardDataTree.registerTreeChangeListener(message.getPath(),
46                 listener, shardDataTree.readCurrentData(), registration -> registrationActor.tell(
47                         new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () ->
48                             removeListenerActor(listenerActor)), ActorRef.noSender()));
49     }
50
51     Collection<ActorSelection> getListenerActors() {
52         return new ArrayList<>(allListenerActors);
53     }
54
55     @Override
56     void onLeadershipChange(final boolean isLeader, final boolean hasLeader) {
57         LOG.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
58
59         final EnableNotification msg = new EnableNotification(isLeader, persistenceId());
60         for (ActorSelection dataChangeListener : leaderOnlyListenerActors) {
61             dataChangeListener.tell(msg, getSelf());
62         }
63
64         if (hasLeader) {
65             for (DelayedDataTreeChangeListenerRegistration<DOMDataTreeChangeListener> reg :
66                     delayedListenerOnAllRegistrations) {
67                 reg.doRegistration(this);
68             }
69
70             delayedListenerOnAllRegistrations.clear();
71         }
72
73         if (isLeader) {
74             for (DelayedDataTreeChangeListenerRegistration<DOMDataTreeChangeListener> reg :
75                     delayedDataTreeChangeListenerRegistrations) {
76                 reg.doRegistration(this);
77             }
78
79             delayedDataTreeChangeListenerRegistrations.clear();
80         }
81     }
82
83     @Override
84     void onMessage(final RegisterDataTreeChangeListener message, final boolean isLeader, final boolean hasLeader) {
85         LOG.debug("{}: onMessage {}, isLeader: {}, hasLeader: {}", persistenceId(), message, isLeader, hasLeader);
86
87         final ActorRef registrationActor = createActor(DataTreeNotificationListenerRegistrationActor.props());
88
89         if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
90             doRegistration(message, registrationActor);
91         } else {
92             LOG.debug("{}: Shard does not have a leader - delaying registration", persistenceId());
93
94             final DelayedDataTreeChangeListenerRegistration<DOMDataTreeChangeListener> delayedReg =
95                     new DelayedDataTreeChangeListenerRegistration<>(message, registrationActor);
96             final Collection<DelayedDataTreeChangeListenerRegistration<DOMDataTreeChangeListener>> delayedRegList;
97             if (message.isRegisterOnAllInstances()) {
98                 delayedRegList = delayedListenerOnAllRegistrations;
99             } else {
100                 delayedRegList = delayedDataTreeChangeListenerRegistrations;
101             }
102
103             delayedRegList.add(delayedReg);
104             registrationActor.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(
105                     delayedReg, () -> delayedRegList.remove(delayedReg)), ActorRef.noSender());
106         }
107
108         LOG.debug("{}: sending RegisterDataTreeNotificationListenerReply, listenerRegistrationPath = {} ",
109                 persistenceId(), registrationActor.path());
110
111         tellSender(new RegisterDataTreeNotificationListenerReply(registrationActor));
112     }
113
114     private ActorSelection processListenerRegistrationMessage(final RegisterDataTreeChangeListener message) {
115         final ActorSelection listenerActor = selectActor(message.getListenerActorPath());
116
117         // We have a leader so enable the listener.
118         listenerActor.tell(new EnableNotification(true, persistenceId()), getSelf());
119
120         if (!message.isRegisterOnAllInstances()) {
121             // This is a leader-only registration so store a reference to the listener actor so it can be notified
122             // at a later point if notifications should be enabled or disabled.
123             leaderOnlyListenerActors.add(listenerActor);
124         }
125
126         allListenerActors.add(listenerActor);
127
128         return listenerActor;
129     }
130
131     private void removeListenerActor(final ActorSelection listenerActor) {
132         allListenerActors.remove(listenerActor);
133         leaderOnlyListenerActors.remove(listenerActor);
134     }
135 }