Fix followerDistributedDataStore tear down
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerActor.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
15 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
16 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
17 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
18 import org.opendaylight.controller.cluster.datastore.messages.GetInfo;
19 import org.opendaylight.controller.cluster.datastore.messages.OnInitialData;
20 import org.opendaylight.controller.cluster.mgmt.api.DataTreeListenerInfo;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23
24 /**
25  * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating
26  * DataTreeChanged messages and dispatching their context to the user.
27  */
28 class DataTreeChangeListenerActor extends AbstractUntypedActor {
29     private final DOMDataTreeChangeListener listener;
30     private final YangInstanceIdentifier registeredPath;
31
32     private boolean notificationsEnabled = false;
33     private long notificationCount;
34     private String logContext = "";
35
36     DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener,
37             final YangInstanceIdentifier registeredPath) {
38         this.listener = requireNonNull(listener);
39         this.registeredPath = requireNonNull(registeredPath);
40     }
41
42     @Override
43     protected final void handleReceive(final Object message) {
44         if (message instanceof DataTreeChanged) {
45             dataTreeChanged((DataTreeChanged) message);
46         } else if (message instanceof OnInitialData) {
47             onInitialData((OnInitialData) message);
48         } else if (message instanceof EnableNotification) {
49             enableNotification((EnableNotification) message);
50         } else if (message instanceof GetInfo) {
51             getSender().tell(new DataTreeListenerInfo(listener.toString(), registeredPath.toString(),
52                     notificationsEnabled, notificationCount), getSelf());
53         } else {
54             unknownMessage(message);
55         }
56     }
57
58     @SuppressWarnings("checkstyle:IllegalCatch")
59     void onInitialData(final OnInitialData message) {
60         LOG.debug("{}: Notifying onInitialData to listener {}", logContext, listener);
61
62         try {
63             listener.onInitialData();
64         } catch (Exception e) {
65             LOG.error("{}: Error notifying listener {}", logContext, listener, e);
66         }
67     }
68
69     @SuppressWarnings("checkstyle:IllegalCatch")
70     void dataTreeChanged(final DataTreeChanged message) {
71         // Do nothing if notifications are not enabled
72         if (!notificationsEnabled) {
73             LOG.debug("{}: Notifications not enabled for listener {} - dropping change notification",
74                     logContext, listener);
75             return;
76         }
77
78         final var changes = message.getChanges();
79         LOG.debug("{}: Sending {} change notification(s) to listener {}", logContext, changes.size(), listener);
80         if (LOG.isTraceEnabled() && !changes.isEmpty()) {
81             LOG.trace("{}: detailed change follow", logContext);
82             for (int i = 0, size = changes.size(); i < size; ++i) {
83                 LOG.trace("{}: change {}: {}", logContext, i, changes.get(i));
84             }
85         }
86
87         notificationCount++;
88
89         try {
90             listener.onDataTreeChanged(changes);
91         } catch (Exception e) {
92             LOG.error("{}: Error notifying listener {}", logContext, listener, e);
93         }
94
95         // TODO: do we really need this?
96         // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
97         // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
98         final ActorRef sender = getSender();
99         if (sender != null && !sender.equals(getContext().system().deadLetters())) {
100             sender.tell(DataTreeChangedReply.getInstance(), getSelf());
101         }
102     }
103
104     private void enableNotification(final EnableNotification message) {
105         logContext = message.getLogContext();
106         notificationsEnabled = message.isEnabled();
107         LOG.debug("{}: {} notifications for listener {}", logContext, notificationsEnabled ? "Enabled" : "Disabled",
108                 listener);
109     }
110
111     static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) {
112         return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath);
113     }
114 }