2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import java.util.concurrent.TimeUnit;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.dispatch.OnComplete;
16 import akka.util.Timeout;
17 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
19 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
20 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
25 import org.opendaylight.yangtools.concepts.ListenerRegistration;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import com.google.common.annotations.VisibleForTesting;
31 import scala.concurrent.Future;
34 * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
36 * Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
37 * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
40 @SuppressWarnings("rawtypes")
41 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
43 private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
45 public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
47 private volatile ActorSelection listenerRegistrationActor;
48 private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
49 private ActorRef dataChangeListenerActor;
50 private final String shardName;
51 private final ActorContext actorContext;
52 private boolean closed = false;
54 public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
55 DataChangeListenerRegistrationProxy (
56 String shardName, ActorContext actorContext, L listener) {
57 this.shardName = shardName;
58 this.actorContext = actorContext;
59 this.listener = listener;
63 ActorSelection getListenerRegistrationActor() {
64 return listenerRegistrationActor;
68 ActorRef getDataChangeListenerActor() {
69 return dataChangeListenerActor;
73 public Object getInstance() {
77 private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
78 if(listenerRegistrationActor == null) {
82 boolean sendCloseMessage = false;
85 sendCloseMessage = true;
87 this.listenerRegistrationActor = listenerRegistrationActor;
91 if(sendCloseMessage) {
92 listenerRegistrationActor.tell(new
93 CloseDataChangeListenerRegistration().toSerializable(), null);
97 public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
99 dataChangeListenerActor = actorContext.getActorSystem().actorOf(
100 DataChangeListener.props(listener));
102 Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
103 findFuture.onComplete(new OnComplete<ActorRef>() {
105 public void onComplete(Throwable failure, ActorRef shard) {
106 if(failure instanceof LocalShardNotFoundException) {
107 LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
108 "cannot be registered", shardName, listener, path);
109 } else if(failure != null) {
110 LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
111 "cannot be registered: {}", shardName, listener, path, failure);
113 doRegistration(shard, path, scope);
116 }, actorContext.getActorSystem().dispatcher());
119 private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
120 DataChangeScope scope) {
122 Future<Object> future = actorContext.executeOperationAsync(shard,
123 new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
126 future.onComplete(new OnComplete<Object>(){
128 public void onComplete(Throwable failure, Object result) {
129 if(failure != null) {
130 LOG.error("Failed to register DataChangeListener {} at path {}",
131 listener, path.toString(), failure);
133 RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
134 setListenerRegistrationActor(actorContext.actorSelection(
135 reply.getListenerRegistrationPath()));
138 }, actorContext.getActorSystem().dispatcher());
142 public void close() {
144 boolean sendCloseMessage;
146 sendCloseMessage = !closed && listenerRegistrationActor != null;
150 if(sendCloseMessage) {
151 listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(),
152 ActorRef.noSender());
153 listenerRegistrationActor = null;
156 if(dataChangeListenerActor != null) {
157 dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
158 dataChangeListenerActor = null;