2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.PoisonPill;
14 import akka.dispatch.OnComplete;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Preconditions;
17 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
19 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
20 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
25 import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
26 import org.opendaylight.yangtools.concepts.ListenerRegistration;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.Future;
34 * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard.
37 * Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
38 * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
40 @SuppressWarnings("rawtypes")
41 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
43 private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
45 private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
46 private final String shardName;
47 private final ActorContext actorContext;
48 private ActorRef dataChangeListenerActor;
49 private volatile ActorSelection listenerRegistrationActor;
50 private boolean closed = false;
52 public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
53 DataChangeListenerRegistrationProxy(final String shardName, final ActorContext actorContext,
55 this.shardName = Preconditions.checkNotNull(shardName);
56 this.actorContext = Preconditions.checkNotNull(actorContext);
57 this.listener = Preconditions.checkNotNull(listener);
61 ActorSelection getListenerRegistrationActor() {
62 return listenerRegistrationActor;
66 ActorRef getDataChangeListenerActor() {
67 return dataChangeListenerActor;
71 public Object getInstance() {
75 private void setListenerRegistrationActor(final ActorSelection listenerRegistrationActor) {
76 if (listenerRegistrationActor == null) {
80 boolean sendCloseMessage = false;
83 sendCloseMessage = true;
85 this.listenerRegistrationActor = listenerRegistrationActor;
89 if (sendCloseMessage) {
90 listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
95 public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
97 dataChangeListenerActor = actorContext.getActorSystem().actorOf(
98 DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
100 Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
101 findFuture.onComplete(new OnComplete<ActorRef>() {
103 public void onComplete(final Throwable failure, final ActorRef shard) {
104 if (failure instanceof LocalShardNotFoundException) {
105 LOG.debug("No local shard found for {} - DataChangeListener {} at path {} "
106 + "cannot be registered", shardName, listener, path);
107 } else if (failure != null) {
108 LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} "
109 + "cannot be registered: {}", shardName, listener, path, failure);
111 doRegistration(shard, path, scope);
114 }, actorContext.getClientDispatcher());
117 private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path,
118 final DataChangeScope scope) {
120 Future<Object> future = actorContext.executeOperationAsync(shard,
121 new RegisterChangeListener(path, dataChangeListenerActor, scope,
122 listener instanceof ClusteredDOMDataChangeListener),
123 actorContext.getDatastoreContext().getShardInitializationTimeout());
125 future.onComplete(new OnComplete<Object>() {
127 public void onComplete(final Throwable failure, final Object result) {
128 if (failure != null) {
129 LOG.error("Failed to register DataChangeListener {} at path {}",
130 listener, path.toString(), failure);
132 RegisterDataTreeNotificationListenerReply reply = (RegisterDataTreeNotificationListenerReply)result;
133 setListenerRegistrationActor(actorContext.actorSelection(
134 reply.getListenerRegistrationPath()));
137 }, actorContext.getClientDispatcher());
141 public void close() {
143 boolean sendCloseMessage;
144 synchronized (this) {
145 sendCloseMessage = !closed && listenerRegistrationActor != null;
149 if (sendCloseMessage) {
150 listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
151 ActorRef.noSender());
152 listenerRegistrationActor = null;
155 if (dataChangeListenerActor != null) {
156 dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
157 dataChangeListenerActor = null;