0821951a1a67b7cc30d7c98dae86994119af5cf6
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractDataListenerSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.EventListener;
15 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
16 import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
17 import org.opendaylight.yangtools.concepts.ListenerRegistration;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
22         D extends DelayedListenerRegistration<L, M>, R extends ListenerRegistration<L>>
23                 extends LeaderLocalDelegateFactory<M, R> {
24     private final Logger log = LoggerFactory.getLogger(getClass());
25
26     private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
27     private final ArrayList<D> delayedListenerOnAllRegistrations = new ArrayList<>();
28     private final Collection<ActorSelection> actors = new ArrayList<>();
29
30     protected AbstractDataListenerSupport(Shard shard) {
31         super(shard);
32     }
33
34     @Override
35     void onLeadershipChange(boolean isLeader, boolean hasLeader) {
36         log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
37
38         final EnableNotification msg = new EnableNotification(isLeader);
39         for (ActorSelection dataChangeListener : actors) {
40             dataChangeListener.tell(msg, getSelf());
41         }
42
43         if (hasLeader) {
44             for (D reg : delayedListenerOnAllRegistrations) {
45                 reg.createDelegate(this);
46             }
47
48             delayedListenerOnAllRegistrations.clear();
49             delayedListenerOnAllRegistrations.trimToSize();
50         }
51
52         if (isLeader) {
53             for (D reg : delayedListenerRegistrations) {
54                 reg.createDelegate(this);
55             }
56
57             delayedListenerRegistrations.clear();
58             delayedListenerRegistrations.trimToSize();
59         }
60     }
61
62     @Override
63     void onMessage(M message, boolean isLeader, boolean hasLeader) {
64         log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
65
66         final ListenerRegistration<L> registration;
67         if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
68             registration = createDelegate(message);
69         } else {
70             log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
71
72             D delayedReg = newDelayedListenerRegistration(message);
73             if (message.isRegisterOnAllInstances()) {
74                 delayedListenerOnAllRegistrations.add(delayedReg);
75             } else {
76                 delayedListenerRegistrations.add(delayedReg);
77             }
78
79             registration = delayedReg;
80         }
81
82         ActorRef registrationActor = newRegistrationActor(registration);
83
84         log.debug("{}: {} sending reply, listenerRegistrationPath = {} ", persistenceId(), logName(),
85                 registrationActor.path());
86
87         tellSender(newRegistrationReplyMessage(registrationActor));
88     }
89
90     protected Logger log() {
91         return log;
92     }
93
94     protected void addListenerActor(ActorSelection actor) {
95         actors.add(actor);
96     }
97
98     protected abstract D newDelayedListenerRegistration(M message);
99
100     protected abstract ActorRef newRegistrationActor(ListenerRegistration<L> registration);
101
102     protected abstract Object newRegistrationReplyMessage(ActorRef registrationActor);
103
104     protected abstract String logName();
105 }