Merge "BUG 2221 : Add metering to ShardTransaction actor"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import java.util.concurrent.TimeUnit;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.dispatch.OnComplete;
16 import akka.util.Timeout;
17 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
19 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
20 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
25 import org.opendaylight.yangtools.concepts.ListenerRegistration;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import com.google.common.annotations.VisibleForTesting;
31 import scala.concurrent.Future;
32
33 /**
34  * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
35  * <p>
36  * Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
37  * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
38  * </p>
39  */
40 @SuppressWarnings("rawtypes")
41 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
42
43     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
44
45     public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
46
47     private volatile ActorSelection listenerRegistrationActor;
48     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
49     private ActorRef dataChangeListenerActor;
50     private final String shardName;
51     private final ActorContext actorContext;
52     private boolean closed = false;
53
54     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
55                                                               DataChangeListenerRegistrationProxy (
56             String shardName, ActorContext actorContext, L listener) {
57         this.shardName = shardName;
58         this.actorContext = actorContext;
59         this.listener = listener;
60     }
61
62     @VisibleForTesting
63     ActorSelection getListenerRegistrationActor() {
64         return listenerRegistrationActor;
65     }
66
67     @VisibleForTesting
68     ActorRef getDataChangeListenerActor() {
69         return dataChangeListenerActor;
70     }
71
72     @Override
73     public Object getInstance() {
74         return listener;
75     }
76
77     private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
78         if(listenerRegistrationActor == null) {
79             return;
80         }
81
82         boolean sendCloseMessage = false;
83         synchronized(this) {
84             if(closed) {
85                 sendCloseMessage = true;
86             } else {
87                 this.listenerRegistrationActor = listenerRegistrationActor;
88             }
89         }
90
91         if(sendCloseMessage) {
92             listenerRegistrationActor.tell(new
93                 CloseDataChangeListenerRegistration().toSerializable(), null);
94         }
95     }
96
97     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
98
99         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
100                 DataChangeListener.props(listener));
101
102         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
103         findFuture.onComplete(new OnComplete<ActorRef>() {
104             @Override
105             public void onComplete(Throwable failure, ActorRef shard) {
106                 if(failure instanceof LocalShardNotFoundException) {
107                     LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
108                             "cannot be registered", shardName, listener, path);
109                 } else if(failure != null) {
110                     LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
111                             "cannot be registered: {}", shardName, listener, path, failure);
112                 } else {
113                     doRegistration(shard, path, scope);
114                 }
115             }
116         }, actorContext.getActorSystem().dispatcher());
117     }
118
119     private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
120             DataChangeScope scope) {
121
122         Future<Object> future = actorContext.executeOperationAsync(shard,
123                 new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
124                 REGISTER_TIMEOUT);
125
126         future.onComplete(new OnComplete<Object>(){
127             @Override
128             public void onComplete(Throwable failure, Object result) {
129                 if(failure != null) {
130                     LOG.error("Failed to register DataChangeListener {} at path {}",
131                             listener, path.toString(), failure);
132                 } else {
133                     RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
134                     setListenerRegistrationActor(actorContext.actorSelection(
135                             reply.getListenerRegistrationPath()));
136                 }
137             }
138         }, actorContext.getActorSystem().dispatcher());
139     }
140
141     @Override
142     public void close() {
143
144         boolean sendCloseMessage;
145         synchronized(this) {
146             sendCloseMessage = !closed && listenerRegistrationActor != null;
147             closed = true;
148         }
149
150         if(sendCloseMessage) {
151             listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(),
152                     ActorRef.noSender());
153             listenerRegistrationActor = null;
154         }
155
156         if(dataChangeListenerActor != null) {
157             dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
158             dataChangeListenerActor = null;
159         }
160     }
161 }