package org.opendaylight.controller.cluster.datastore;
-import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.dispatch.OnComplete;
-import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import scala.concurrent.Future;
/**
private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
- public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
-
private volatile ActorSelection listenerRegistrationActor;
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
private ActorRef dataChangeListenerActor;
public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataChangeListener.props(listener));
+ DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
- Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
+ Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
findFuture.onComplete(new OnComplete<ActorRef>() {
@Override
public void onComplete(Throwable failure, ActorRef shard) {
doRegistration(shard, path, scope);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
DataChangeScope scope) {
Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
- REGISTER_TIMEOUT);
+ new RegisterChangeListener(path, dataChangeListenerActor, scope,
+ listener instanceof ClusteredDOMDataChangeListener),
+ actorContext.getDatastoreContext().getShardInitializationTimeout());
future.onComplete(new OnComplete<Object>(){
@Override
reply.getListenerRegistrationPath()));
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
@Override