CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerSupport.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.Map.Entry;
15 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
16 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
17 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
20 import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener, ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> {
28     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
29     private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
30     private final List<ActorSelection> dataChangeListeners =  new ArrayList<>();
31
32     DataChangeListenerSupport(final Shard shard) {
33         super(shard);
34     }
35
36     @Override
37     void onLeadershipChange(final boolean isLeader) {
38         for (ActorSelection dataChangeListener : dataChangeListeners) {
39             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
40         }
41
42         if (isLeader) {
43             for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
44                 if(!reg.isClosed()) {
45                     final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
46                             createDelegate(reg.getRegisterChangeListener());
47                     reg.setDelegate(res.getKey());
48                     if (res.getValue() != null) {
49                         reg.getInstance().onDataChanged(res.getValue());
50                     }
51                 }
52             }
53
54             delayedListenerRegistrations.clear();
55         }
56     }
57
58     @Override
59     void onMessage(final RegisterChangeListener message, final boolean isLeader) {
60
61         LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
62
63         final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
64                                                      NormalizedNode<?, ?>>> registration;
65         final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
66         if (isLeader) {
67             final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
68                     createDelegate(message);
69             registration = res.getKey();
70             event = res.getValue();
71         } else {
72             LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
73
74             DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
75             delayedListenerRegistrations.add(delayedReg);
76             registration = delayedReg;
77             event = null;
78         }
79
80         ActorRef listenerRegistration = createActor(DataChangeListenerRegistration.props(registration));
81
82         LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
83                 persistenceId(), listenerRegistration.path());
84
85         tellSender(new RegisterChangeListenerReply(listenerRegistration));
86         if (event != null) {
87             registration.getInstance().onDataChanged(event);
88         }
89     }
90
91     @Override
92     Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> createDelegate(
93             final RegisterChangeListener message) {
94         ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
95
96         // Notify the listener if notifications should be enabled or not
97         // If this shard is the leader then it will enable notifications else
98         // it will not
99         dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
100
101         // Now store a reference to the data change listener so it can be notified
102         // at a later point if notifications should be enabled or disabled
103         dataChangeListeners.add(dataChangeListenerPath);
104
105         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
106                 new DataChangeListenerProxy(dataChangeListenerPath);
107
108         LOG.debug("{}: Registering for path {}", persistenceId(), message.getPath());
109
110         return getShard().getDataStore().registerChangeListener(message.getPath(), listener,
111                 message.getScope());
112     }
113 }