9eba41aac73efb900bd33b3fe52e43c33e655eff
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.PoisonPill;
14 import akka.dispatch.OnComplete;
15 import com.google.common.annotations.VisibleForTesting;
16 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
18 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
19 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
20 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
24 import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
25 import org.opendaylight.yangtools.concepts.ListenerRegistration;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31
32 /**
33  * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
34  * <p>
35  * Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
36  * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
37  * </p>
38  */
39 @SuppressWarnings("rawtypes")
40 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
41
42     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
43
44     private volatile ActorSelection listenerRegistrationActor;
45     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
46     private ActorRef dataChangeListenerActor;
47     private final String shardName;
48     private final ActorContext actorContext;
49     private boolean closed = false;
50
51     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
52             DataChangeListenerRegistrationProxy(String shardName, ActorContext actorContext, L listener) {
53         this.shardName = shardName;
54         this.actorContext = actorContext;
55         this.listener = listener;
56     }
57
58     @VisibleForTesting
59     ActorSelection getListenerRegistrationActor() {
60         return listenerRegistrationActor;
61     }
62
63     @VisibleForTesting
64     ActorRef getDataChangeListenerActor() {
65         return dataChangeListenerActor;
66     }
67
68     @Override
69     public Object getInstance() {
70         return listener;
71     }
72
73     private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
74         if (listenerRegistrationActor == null) {
75             return;
76         }
77
78         boolean sendCloseMessage = false;
79         synchronized (this) {
80             if (closed) {
81                 sendCloseMessage = true;
82             } else {
83                 this.listenerRegistrationActor = listenerRegistrationActor;
84             }
85         }
86
87         if (sendCloseMessage) {
88             listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, null);
89         }
90     }
91
92     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
93
94         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
95                 DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
96
97         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
98         findFuture.onComplete(new OnComplete<ActorRef>() {
99             @Override
100             public void onComplete(Throwable failure, ActorRef shard) {
101                 if (failure instanceof LocalShardNotFoundException) {
102                     LOG.debug("No local shard found for {} - DataChangeListener {} at path {} "
103                             + "cannot be registered", shardName, listener, path);
104                 } else if (failure != null) {
105                     LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} "
106                             + "cannot be registered: {}", shardName, listener, path, failure);
107                 } else {
108                     doRegistration(shard, path, scope);
109                 }
110             }
111         }, actorContext.getClientDispatcher());
112     }
113
114     private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
115             DataChangeScope scope) {
116
117         Future<Object> future = actorContext.executeOperationAsync(shard,
118                 new RegisterChangeListener(path, dataChangeListenerActor, scope,
119                     listener instanceof ClusteredDOMDataChangeListener),
120                 actorContext.getDatastoreContext().getShardInitializationTimeout());
121
122         future.onComplete(new OnComplete<Object>() {
123             @Override
124             public void onComplete(Throwable failure, Object result) {
125                 if (failure != null) {
126                     LOG.error("Failed to register DataChangeListener {} at path {}",
127                             listener, path.toString(), failure);
128                 } else {
129                     RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
130                     setListenerRegistrationActor(actorContext.actorSelection(
131                             reply.getListenerRegistrationPath()));
132                 }
133             }
134         }, actorContext.getClientDispatcher());
135     }
136
137     @Override
138     public void close() {
139
140         boolean sendCloseMessage;
141         synchronized (this) {
142             sendCloseMessage = !closed && listenerRegistrationActor != null;
143             closed = true;
144         }
145
146         if (sendCloseMessage) {
147             listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, ActorRef.noSender());
148             listenerRegistrationActor = null;
149         }
150
151         if (dataChangeListenerActor != null) {
152             dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
153             dataChangeListenerActor = null;
154         }
155     }
156 }