Fix checkstyle reported by odlparent-3.0.0
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.PoisonPill;
14 import akka.dispatch.OnComplete;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Preconditions;
17 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
19 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
20 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
25 import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
26 import org.opendaylight.yangtools.concepts.ListenerRegistration;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.Future;
32
33 /**
34  * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard.
35  *
36  * <p>
37  * Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
38  * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
39  */
40 @SuppressWarnings("rawtypes")
41 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
42
43     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
44
45     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
46     private final String shardName;
47     private final ActorContext actorContext;
48     private ActorRef dataChangeListenerActor;
49     private volatile ActorSelection listenerRegistrationActor;
50     private boolean closed = false;
51
52     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
53             DataChangeListenerRegistrationProxy(final String shardName, final ActorContext actorContext,
54                     final L listener) {
55         this.shardName = Preconditions.checkNotNull(shardName);
56         this.actorContext = Preconditions.checkNotNull(actorContext);
57         this.listener = Preconditions.checkNotNull(listener);
58     }
59
60     @VisibleForTesting
61     ActorSelection getListenerRegistrationActor() {
62         return listenerRegistrationActor;
63     }
64
65     @VisibleForTesting
66     ActorRef getDataChangeListenerActor() {
67         return dataChangeListenerActor;
68     }
69
70     @Override
71     public Object getInstance() {
72         return listener;
73     }
74
75     private void setListenerRegistrationActor(final ActorSelection listenerRegistrationActor) {
76         if (listenerRegistrationActor == null) {
77             return;
78         }
79
80         boolean sendCloseMessage = false;
81         synchronized (this) {
82             if (closed) {
83                 sendCloseMessage = true;
84             } else {
85                 this.listenerRegistrationActor = listenerRegistrationActor;
86             }
87         }
88
89         if (sendCloseMessage) {
90             listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
91                     ActorRef.noSender());
92         }
93     }
94
95     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
96
97         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
98                 DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
99
100         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
101         findFuture.onComplete(new OnComplete<ActorRef>() {
102             @Override
103             public void onComplete(final Throwable failure, final ActorRef shard) {
104                 if (failure instanceof LocalShardNotFoundException) {
105                     LOG.debug("No local shard found for {} - DataChangeListener {} at path {} "
106                             + "cannot be registered", shardName, listener, path);
107                 } else if (failure != null) {
108                     LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} "
109                             + "cannot be registered: {}", shardName, listener, path, failure);
110                 } else {
111                     doRegistration(shard, path, scope);
112                 }
113             }
114         }, actorContext.getClientDispatcher());
115     }
116
117     private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path,
118             final DataChangeScope scope) {
119
120         Future<Object> future = actorContext.executeOperationAsync(shard,
121                 new RegisterChangeListener(path, dataChangeListenerActor, scope,
122                     listener instanceof ClusteredDOMDataChangeListener),
123                 actorContext.getDatastoreContext().getShardInitializationTimeout());
124
125         future.onComplete(new OnComplete<Object>() {
126             @Override
127             public void onComplete(final Throwable failure, final Object result) {
128                 if (failure != null) {
129                     LOG.error("Failed to register DataChangeListener {} at path {}",
130                             listener, path.toString(), failure);
131                 } else {
132                     RegisterDataTreeNotificationListenerReply reply = (RegisterDataTreeNotificationListenerReply)result;
133                     setListenerRegistrationActor(actorContext.actorSelection(
134                             reply.getListenerRegistrationPath()));
135                 }
136             }
137         }, actorContext.getClientDispatcher());
138     }
139
140     @Override
141     public void close() {
142
143         boolean sendCloseMessage;
144         synchronized (this) {
145             sendCloseMessage = !closed && listenerRegistrationActor != null;
146             closed = true;
147         }
148
149         if (sendCloseMessage) {
150             listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
151                     ActorRef.noSender());
152             listenerRegistrationActor = null;
153         }
154
155         if (dataChangeListenerActor != null) {
156             dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
157             dataChangeListenerActor = null;
158         }
159     }
160 }