import java.util.ArrayList;
import java.util.Collection;
import java.util.EventListener;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
- D extends DelayedListenerRegistration<L, M>, R extends ListenerRegistration<L>>
- extends LeaderLocalDelegateFactory<M, R> {
+ D extends DelayedListenerRegistration<L, M>> extends LeaderLocalDelegateFactory<M> {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
- private final ArrayList<D> delayedListenerOnAllRegistrations = new ArrayList<>();
- private final Collection<ActorSelection> actors = new ArrayList<>();
+ private final Collection<D> delayedListenerRegistrations = ConcurrentHashMap.newKeySet();
+ private final Collection<D> delayedListenerOnAllRegistrations = ConcurrentHashMap.newKeySet();
+ private final Collection<ActorSelection> leaderOnlyListenerActors = ConcurrentHashMap.newKeySet();
+ private final Collection<ActorSelection> allListenerActors = ConcurrentHashMap.newKeySet();
protected AbstractDataListenerSupport(Shard shard) {
super(shard);
}
+ Collection<ActorSelection> getListenerActors() {
+ return new ArrayList<>(allListenerActors);
+ }
+
@Override
void onLeadershipChange(boolean isLeader, boolean hasLeader) {
log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
final EnableNotification msg = new EnableNotification(isLeader);
- for (ActorSelection dataChangeListener : actors) {
+ for (ActorSelection dataChangeListener : leaderOnlyListenerActors) {
dataChangeListener.tell(msg, getSelf());
}
}
delayedListenerOnAllRegistrations.clear();
- delayedListenerOnAllRegistrations.trimToSize();
}
if (isLeader) {
}
delayedListenerRegistrations.clear();
- delayedListenerRegistrations.trimToSize();
}
}
void onMessage(M message, boolean isLeader, boolean hasLeader) {
log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
- final ListenerRegistration<L> registration;
+ ActorRef registrationActor = createActor(DataTreeNotificationListenerRegistrationActor.props());
+
if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
- registration = createDelegate(message);
+ doRegistration(message, registrationActor);
} else {
log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
- D delayedReg = newDelayedListenerRegistration(message);
+ D delayedReg = newDelayedListenerRegistration(message, registrationActor);
+ Collection<D> delayedRegList;
if (message.isRegisterOnAllInstances()) {
- delayedListenerOnAllRegistrations.add(delayedReg);
+ delayedRegList = delayedListenerOnAllRegistrations;
} else {
- delayedListenerRegistrations.add(delayedReg);
+ delayedRegList = delayedListenerRegistrations;
}
- registration = delayedReg;
+ delayedRegList.add(delayedReg);
+ registrationActor.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(
+ delayedReg, () -> delayedRegList.remove(delayedReg)), ActorRef.noSender());
}
- ActorRef registrationActor = newRegistrationActor(registration);
-
log.debug("{}: {} sending reply, listenerRegistrationPath = {} ", persistenceId(), logName(),
registrationActor.path());
tellSender(newRegistrationReplyMessage(registrationActor));
}
+ protected ActorSelection processListenerRegistrationMessage(M message) {
+ final ActorSelection listenerActor = selectActor(message.getListenerActorPath());
+
+ // We have a leader so enable the listener.
+ listenerActor.tell(new EnableNotification(true), getSelf());
+
+ if (!message.isRegisterOnAllInstances()) {
+ // This is a leader-only registration so store a reference to the listener actor so it can be notified
+ // at a later point if notifications should be enabled or disabled.
+ leaderOnlyListenerActors.add(listenerActor);
+ }
+
+ allListenerActors.add(listenerActor);
+
+ return listenerActor;
+ }
+
protected Logger log() {
return log;
}
- protected void addListenerActor(ActorSelection actor) {
- actors.add(actor);
+ protected void removeListenerActor(ActorSelection listenerActor) {
+ allListenerActors.remove(listenerActor);
+ leaderOnlyListenerActors.remove(listenerActor);
}
- protected abstract D newDelayedListenerRegistration(M message);
+ abstract void doRegistration(M message, ActorRef registrationActor);
- protected abstract ActorRef newRegistrationActor(ListenerRegistration<L> registration);
+ protected abstract D newDelayedListenerRegistration(M message, ActorRef registrationActor);
protected abstract Object newRegistrationReplyMessage(ActorRef registrationActor);
import akka.actor.ActorContext;
import akka.actor.ActorRef;
+import akka.actor.Props;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
private final ActorContext actorContext;
private final String actorName;
+ private final String logContext;
private ActorRef notifierActor;
- protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName) {
+ protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName,
+ String logContext) {
this.actorContext = actorContext;
this.actorName = actorName;
+ this.logContext = logContext;
}
- protected AbstractShardDataTreeNotificationPublisherActorProxy(
- AbstractShardDataTreeNotificationPublisherActorProxy other) {
- this.actorContext = null;
- this.actorName = null;
- this.notifierActor = other.getNotifierActor();
+ protected abstract Props props();
+
+ protected final String actorName() {
+ return actorName;
}
- protected abstract ShardDataTreeNotificationPublisher getDelegatePublisher();
+ protected final String logContext() {
+ return logContext;
+ }
@Override
public void publishChanges(DataTreeCandidate candidate, String logContext) {
- getNotifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(
- getDelegatePublisher(), candidate, logContext), ActorRef.noSender());
+ notifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(candidate),
+ ActorRef.noSender());
}
- private ActorRef getNotifierActor() {
+ protected final ActorRef notifierActor() {
if (notifierActor == null) {
LOG.debug("Creating actor {}", actorName);
String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Notification);
- notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props(actorName)
- .withDispatcher(dispatcher).withMailbox(
- org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX),
- actorName);
+ notifierActor = actorContext.actorOf(props().withDispatcher(dispatcher).withMailbox(
+ org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
}
return notifierActor;
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.japi.Creator;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public class DataChangeListenerRegistrationActor extends AbstractUntypedActor {
-
- private final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- registration;
-
- public DataChangeListenerRegistrationActor(
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
- this.registration = registration;
- }
-
- @Override
- public void handleReceive(Object message) {
- if (message instanceof CloseDataChangeListenerRegistration) {
- closeListenerRegistration();
- } else {
- unknownMessage(message);
- }
- }
-
- public static Props props(final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> registration) {
- return Props.create(new DataChangeListenerRegistrationCreator(registration));
- }
-
- private void closeListenerRegistration() {
- registration.close();
-
- if (isValidSender(getSender())) {
- getSender().tell(CloseDataChangeListenerRegistrationReply.INSTANCE, getSelf());
- }
-
- getSelf().tell(PoisonPill.getInstance(), getSelf());
- }
-
- private static class DataChangeListenerRegistrationCreator
- implements Creator<DataChangeListenerRegistrationActor> {
- private static final long serialVersionUID = 1L;
-
- @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
- + "create remote instances of this actor and thus don't need it to be Serializable.")
- final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> registration;
-
- DataChangeListenerRegistrationCreator(
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> registration) {
- this.registration = registration;
- }
-
- @Override
- public DataChangeListenerRegistrationActor create() throws Exception {
- return new DataChangeListenerRegistrationActor(registration);
- }
- }
-}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
}
if (sendCloseMessage) {
- listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, null);
+ listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+ ActorRef.noSender());
}
}
}
if (sendCloseMessage) {
- listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, ActorRef.noSender());
+ listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+ ActorRef.noSender());
listenerRegistrationActor = null;
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
final class DataChangeListenerSupport extends AbstractDataListenerSupport<
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener,
- DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
-
- private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+ DelayedDataChangeListenerRegistration> {
DataChangeListenerSupport(final Shard shard) {
super(shard);
}
- Collection<ActorSelection> getListenerActors() {
- return new ArrayList<>(listenerActors);
- }
-
@Override
- DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- createDelegate(final RegisterChangeListener message) {
- final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
-
- // Notify the listener if notifications should be enabled or not
- // If this shard is the leader then it will enable notifications else
- // it will not
- dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
-
- // Now store a reference to the data change listener so it can be notified
- // at a later point if notifications should be enabled or disabled
- if (!message.isRegisterOnAllInstances()) {
- addListenerActor(dataChangeListenerPath);
- }
+ void doRegistration(final RegisterChangeListener message, final ActorRef registrationActor) {
+ final ActorSelection listenerActor = processListenerRegistrationMessage(message);
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
- new DataChangeListenerProxy(dataChangeListenerPath);
+ new DataChangeListenerProxy(listenerActor);
log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
- Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
- Optional<DataTreeCandidate>> regEntry = getShard().getDataStore().registerChangeListener(
- message.getPath(), listener, message.getScope());
-
- getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
-
- listenerActors.add(dataChangeListenerPath);
- final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- delegate = regEntry.getKey();
- return new DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?,?>>>() {
- @Override
- public void close() {
- listenerActors.remove(dataChangeListenerPath);
- delegate.close();
- }
-
- @Override
- public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
- return delegate.getInstance();
- }
-
- @Override
- public YangInstanceIdentifier getPath() {
- return delegate.getPath();
- }
-
- @Override
- public DataChangeScope getScope() {
- return delegate.getScope();
- }
- };
- }
-
- @Override
- protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message) {
- return new DelayedDataChangeListenerRegistration(message);
+ final ShardDataTree shardDataTree = getShard().getDataStore();
+ shardDataTree.registerDataChangeListener(message.getPath(), listener, message.getScope(),
+ shardDataTree.readCurrentData(), registration -> registrationActor.tell(
+ new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () ->
+ removeListenerActor(listenerActor)), ActorRef.noSender()));
}
@Override
- protected ActorRef newRegistrationActor(
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
- return createActor(DataChangeListenerRegistrationActor.props(registration));
+ protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message,
+ ActorRef registrationActor) {
+ return new DelayedDataChangeListenerRegistration(message, registrationActor);
}
@Override
import com.google.common.base.Preconditions;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@Override
protected synchronized void removeRegistration() {
if (listenerRegistrationActor != null) {
- listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender());
+ listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+ ActorRef.noSender());
listenerRegistrationActor = null;
}
}
// This registration has already been closed, notify the actor
- actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
+ actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null);
}
private void doRegistration(final ActorRef shard) {
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.japi.Creator;
-import com.google.common.base.Preconditions;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-
-/**
- * Actor co-located with a shard. It exists only to terminate the registration when
- * asked to do so via {@link CloseDataTreeChangeListenerRegistration}.
- */
-public final class DataTreeChangeListenerRegistrationActor extends AbstractUntypedActor {
- private final ListenerRegistration<DOMDataTreeChangeListener> registration;
-
- public DataTreeChangeListenerRegistrationActor(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
- this.registration = Preconditions.checkNotNull(registration);
- }
-
- @Override
- protected void handleReceive(Object message) throws Exception {
- if (message instanceof CloseDataTreeChangeListenerRegistration) {
- registration.close();
- if (isValidSender(getSender())) {
- getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf());
- }
-
- getSelf().tell(PoisonPill.getInstance(), getSelf());
- } else {
- unknownMessage(message);
- }
- }
-
- public static Props props(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
- return Props.create(new DataTreeChangeListenerRegistrationCreator(registration));
- }
-
- private static final class DataTreeChangeListenerRegistrationCreator
- implements Creator<DataTreeChangeListenerRegistrationActor> {
- private static final long serialVersionUID = 1L;
-
- @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
- + "create remote instances of this actor and thus don't need it to be Serializable.")
- final ListenerRegistration<DOMDataTreeChangeListener> registration;
-
- DataTreeChangeListenerRegistrationCreator(ListenerRegistration<DOMDataTreeChangeListener> registration) {
- this.registration = Preconditions.checkNotNull(registration);
- }
-
- @Override
- public DataTreeChangeListenerRegistrationActor create() {
- return new DataTreeChangeListenerRegistrationActor(registration);
- }
- }
-}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
- RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration,
- ListenerRegistration<DOMDataTreeChangeListener>> {
-
- private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+ RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration> {
DataTreeChangeListenerSupport(final Shard shard) {
super(shard);
}
- Collection<ActorSelection> getListenerActors() {
- return new ArrayList<>(listenerActors);
- }
-
@Override
- ListenerRegistration<DOMDataTreeChangeListener> createDelegate(
- final RegisterDataTreeChangeListener message) {
- final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
+ void doRegistration(final RegisterDataTreeChangeListener message, final ActorRef registrationActor) {
+ final ActorSelection listenerActor = processListenerRegistrationMessage(message);
- // Notify the listener if notifications should be enabled or not
- // If this shard is the leader then it will enable notifications else
- // it will not
- dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
-
- // Now store a reference to the data change listener so it can be notified
- // at a later point if notifications should be enabled or disabled
- if (!message.isRegisterOnAllInstances()) {
- addListenerActor(dataChangeListenerPath);
- }
-
- DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath);
+ DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(listenerActor);
log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
- Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> regEntry =
- getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener);
-
- getShard().getDataStore().notifyOfInitialData(message.getPath(),
- regEntry.getKey().getInstance(), regEntry.getValue());
-
- listenerActors.add(dataChangeListenerPath);
- final ListenerRegistration<DOMDataTreeChangeListener> delegate = regEntry.getKey();
- return new ListenerRegistration<DOMDataTreeChangeListener>() {
- @Override
- public DOMDataTreeChangeListener getInstance() {
- return delegate.getInstance();
- }
-
- @Override
- public void close() {
- listenerActors.remove(dataChangeListenerPath);
- delegate.close();
- }
- };
+ final ShardDataTree shardDataTree = getShard().getDataStore();
+ shardDataTree.registerTreeChangeListener(message.getPath(),
+ listener, shardDataTree.readCurrentData(), registration -> registrationActor.tell(
+ new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () ->
+ removeListenerActor(listenerActor)), ActorRef.noSender()));
}
@Override
protected DelayedDataTreeListenerRegistration newDelayedListenerRegistration(
- RegisterDataTreeChangeListener message) {
- return new DelayedDataTreeListenerRegistration(message);
- }
-
- @Override
- protected ActorRef newRegistrationActor(ListenerRegistration<DOMDataTreeChangeListener> registration) {
- return createActor(DataTreeChangeListenerRegistrationActor.props(registration));
+ RegisterDataTreeChangeListener message, ActorRef registrationActor) {
+ return new DelayedDataTreeListenerRegistration(message, registrationActor);
}
@Override
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.concurrent.NotificationManager;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
}
@Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- DataChangeListenerRegistration<L> registerDataChangeListener(YangInstanceIdentifier path, L listener,
- DataChangeScope scope) {
- return dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+ public void registerDataChangeListener(YangInstanceIdentifier path,
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+ Optional<DataTreeCandidate> initialState,
+ Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+ onRegistration) {
+ final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ registration = dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+
+ onRegistration.accept(registration);
+
+ if (initialState.isPresent()) {
+ notifySingleListener(path, listener, scope, initialState.get());
+ }
}
- @Override
- public ShardDataChangeListenerPublisher newInstance() {
- return new DefaultShardDataChangeListenerPublisher();
+ static void notifySingleListener(final YangInstanceIdentifier path,
+ final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
+ final DataChangeScope scope, final DataTreeCandidate initialState) {
+ DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher();
+ publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { });
+ publisher.publishChanges(initialState, "");
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Optional;
import java.util.Collection;
+import java.util.function.Consumer;
import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTreeChangePublisher;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
processCandidateTree(candidate);
}
- @Override
- public ShardDataTreeChangeListenerPublisher newInstance() {
- return new DefaultShardDataTreeChangeListenerPublisher();
- }
-
@Override
protected void notifyListener(AbstractDOMDataTreeChangeListenerRegistration<?> registration,
Collection<DataTreeCandidate> changes) {
}
@Override
- public <L extends org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> ListenerRegistration<L>
- registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
- final AbstractDOMDataTreeChangeListenerRegistration<DOMDataTreeChangeListener> registration =
- super.registerTreeChangeListener(treeId, (org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener)
- changes -> listener.onDataTreeChanged(changes));
+ public void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
+ Optional<DataTreeCandidate> initialState,
+ Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+ AbstractDOMDataTreeChangeListenerRegistration<org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener>
+ registration = super.registerTreeChangeListener(treeId,
+ (org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener)changes ->
+ listener.onDataTreeChanged(changes));
+
+ onRegistration.accept(
+ new org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration<
+ DOMDataTreeChangeListener>(listener) {
+ @Override
+ protected void removeRegistration() {
+ registration.close();
+ }
+ });
+
+ if (initialState.isPresent()) {
+ notifySingleListener(treeId, listener, initialState.get());
+ }
+ }
- return new org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration<L>(
- listener) {
- @Override
- protected void removeRegistration() {
- registration.close();
- }
- };
+ static void notifySingleListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
+ DataTreeCandidate state) {
+ DefaultShardDataTreeChangeListenerPublisher publisher = new DefaultShardDataTreeChangeListenerPublisher();
+ publisher.registerTreeChangeListener(treeId, listener, Optional.absent(), noop -> { });
+ publisher.publishChanges(state, "");
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
final class DelayedDataChangeListenerRegistration extends DelayedListenerRegistration<
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener> {
- DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener) {
- super(registerChangeListener);
+ DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener,
+ final ActorRef registrationActor) {
+ super(registerChangeListener, registrationActor);
}
-}
\ No newline at end of file
+}
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
final class DelayedDataTreeListenerRegistration
extends DelayedListenerRegistration<DOMDataTreeChangeListener, RegisterDataTreeChangeListener> {
- DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) {
- super(registerTreeChangeListener);
+ DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener,
+ final ActorRef registrationActor) {
+ super(registerTreeChangeListener, registrationActor);
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import java.util.EventListener;
import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-abstract class DelayedListenerRegistration<L extends EventListener, M> implements ListenerRegistration<L> {
+abstract class DelayedListenerRegistration<L extends EventListener, M extends ListenerRegistrationMessage>
+ implements ListenerRegistration<L> {
private final M registrationMessage;
- private volatile ListenerRegistration<L> delegate;
+ private final ActorRef registrationActor;
@GuardedBy("this")
private boolean closed;
- protected DelayedListenerRegistration(M registrationMessage) {
+ protected DelayedListenerRegistration(M registrationMessage, ActorRef registrationActor) {
this.registrationMessage = registrationMessage;
+ this.registrationActor = registrationActor;
}
M getRegistrationMessage() {
return registrationMessage;
}
- ListenerRegistration<L> getDelegate() {
- return delegate;
- }
-
- synchronized <R extends ListenerRegistration<L>> void createDelegate(
- final LeaderLocalDelegateFactory<M, R> factory) {
+ synchronized void createDelegate(final AbstractDataListenerSupport<L, M, ?> support) {
if (!closed) {
- this.delegate = factory.createDelegate(registrationMessage);
+ support.doRegistration(registrationMessage, registrationActor);
}
}
@Override
public synchronized void close() {
- if (!closed) {
- closed = true;
- if (delegate != null) {
- delegate.close();
- }
- }
+ closed = true;
}
}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-/**
- * Base class for factories instantiating delegates.
- *
- * @param <M> message type
- * @param <D> delegate type
- */
-abstract class DelegateFactory<M, D> {
- abstract D createDelegate(M message);
-}
* @param <D> delegate type
* @param <M> message type
*/
-abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
+abstract class LeaderLocalDelegateFactory<M> {
private final Shard shard;
protected LeaderLocalDelegateFactory(final Shard shard) {
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
- new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
+ new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
- new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
+ new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
/**
* Interface for a class that generates and publishes notifications for DataChangeListeners.
* @author Thomas Pantelis
*/
interface ShardDataChangeListenerPublisher extends ShardDataTreeNotificationPublisher {
- ShardDataChangeListenerPublisher newInstance();
-
- <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L>
- registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope);
+ void registerDataChangeListener(YangInstanceIdentifier path,
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+ Optional<DataTreeCandidate> initialState,
+ Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+ onRegistration);
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
/**
* Implementation of ShardDataChangeListenerPublisher that offloads the generation and publication
class ShardDataChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
implements ShardDataChangeListenerPublisher {
- private final ShardDataChangeListenerPublisher delegatePublisher = new DefaultShardDataChangeListenerPublisher();
-
- ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
- super(actorContext, actorName);
- }
-
- private ShardDataChangeListenerPublisherActorProxy(ShardDataChangeListenerPublisherActorProxy other) {
- super(other);
- }
-
- @Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- DataChangeListenerRegistration<L> registerDataChangeListener(YangInstanceIdentifier path, L listener,
- DataChangeScope scope) {
- return delegatePublisher.registerDataChangeListener(path, listener, scope);
+ ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName, String logContext) {
+ super(actorContext, actorName, logContext);
}
@Override
- public ShardDataChangeListenerPublisher newInstance() {
- return new ShardDataChangeListenerPublisherActorProxy(this);
+ public void registerDataChangeListener(YangInstanceIdentifier path,
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+ Optional<DataTreeCandidate> initialState,
+ Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+ onRegistration) {
+ notifierActor().tell(new ShardDataChangePublisherActor.RegisterListener(path, listener, scope, initialState,
+ onRegistration), ActorRef.noSender());
}
@Override
- protected ShardDataTreeNotificationPublisher getDelegatePublisher() {
- return delegatePublisher;
+ protected Props props() {
+ return ShardDataChangePublisherActor.props(actorName(), logContext());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.function.Consumer;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Actor used to generate and publish DataChange notifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardDataChangePublisherActor
+ extends ShardDataTreeNotificationPublisherActor<ShardDataChangeListenerPublisher> {
+
+ private ShardDataChangePublisherActor(final String name, final String logContext) {
+ super(new DefaultShardDataChangeListenerPublisher(), name, logContext);
+ }
+
+ @Override
+ protected void handleReceive(Object message) {
+ if (message instanceof RegisterListener) {
+ RegisterListener reg = (RegisterListener)message;
+ if (reg.initialState.isPresent()) {
+ DefaultShardDataChangeListenerPublisher.notifySingleListener(reg.path, reg.listener, reg.scope,
+ reg.initialState.get());
+ }
+
+ publisher().registerDataChangeListener(reg.path, reg.listener, reg.scope, Optional.absent(),
+ reg.onRegistration);
+ } else {
+ super.handleReceive(message);
+ }
+ }
+
+ static Props props(final String name, final String logContext) {
+ return Props.create(ShardDataChangePublisherActor.class, name, logContext);
+ }
+
+ static class RegisterListener {
+ private final YangInstanceIdentifier path;
+ private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+ private final DataChangeScope scope;
+ private final Optional<DataTreeCandidate> initialState;
+ private final Consumer<ListenerRegistration<
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> onRegistration;
+
+ RegisterListener(final YangInstanceIdentifier path,
+ final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
+ final DataChangeScope scope, final Optional<DataTreeCandidate> initialState,
+ final Consumer<ListenerRegistration<
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> onRegistration) {
+ this.path = Preconditions.checkNotNull(path);
+ this.listener = Preconditions.checkNotNull(listener);
+ this.scope = Preconditions.checkNotNull(scope);
+ this.initialState = Preconditions.checkNotNull(initialState);
+ this.onRegistration = Preconditions.checkNotNull(onRegistration);
+ }
+ }
+}
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
-import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
dataChangeListenerPublisher.publishChanges(candidate, logContext);
}
- void notifyOfInitialData(final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> listenerReg, final Optional<DataTreeCandidate> currentState) {
- if (currentState.isPresent()) {
- ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
- localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
- listenerReg.getScope());
- localPublisher.publishChanges(currentState.get(), logContext);
- }
- }
-
- void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
- final Optional<DataTreeCandidate> currentState) {
- if (currentState.isPresent()) {
- ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
- localPublisher.registerTreeChangeListener(path, listener);
- localPublisher.publishChanges(currentState.get(), logContext);
- }
- }
-
/**
* Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
* replication callbacks.
replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
}
- Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
- Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
- final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
- final DataChangeScope scope) {
- DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
- dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
-
- return new SimpleEntry<>(reg, readCurrentData());
+ void registerDataChangeListener(YangInstanceIdentifier path,
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+ Optional<DataTreeCandidate> initialState,
+ Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+ onRegistration) {
+ dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration);
}
- private Optional<DataTreeCandidate> readCurrentData() {
+ Optional<DataTreeCandidate> readCurrentData() {
final Optional<NormalizedNode<?, ?>> currentState =
dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
}
- public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
- registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
- final ListenerRegistration<DOMDataTreeChangeListener> reg =
- treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
-
- return new SimpleEntry<>(reg, readCurrentData());
+ public void registerTreeChangeListener(YangInstanceIdentifier path, DOMDataTreeChangeListener listener,
+ Optional<DataTreeCandidate> initialState,
+ Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+ treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
}
int getQueueSize() {
*/
package org.opendaylight.controller.cluster.datastore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
/**
* Interface for a class that generates and publishes notifications for DataTreeChangeListeners.
*
* @author Thomas Pantelis
*/
-interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher, DOMStoreTreeChangePublisher {
- ShardDataTreeChangeListenerPublisher newInstance();
+interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher {
+ void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
+ Optional<DataTreeCandidate> initialState,
+ Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration);
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
/**
* Implementation of ShardDataTreeChangeListenerPublisher that offloads the generation and publication
class ShardDataTreeChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
implements ShardDataTreeChangeListenerPublisher {
- private final ShardDataTreeChangeListenerPublisher delegatePublisher =
- new DefaultShardDataTreeChangeListenerPublisher();
-
- ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
- super(actorContext, actorName);
- }
-
- private ShardDataTreeChangeListenerPublisherActorProxy(ShardDataTreeChangeListenerPublisherActorProxy other) {
- super(other);
- }
-
- @Override
- public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
- YangInstanceIdentifier treeId, L listener) {
- return delegatePublisher.registerTreeChangeListener(treeId, listener);
+ ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName, String logContext) {
+ super(actorContext, actorName, logContext);
}
@Override
- public ShardDataTreeChangeListenerPublisher newInstance() {
- return new ShardDataTreeChangeListenerPublisherActorProxy(this);
+ public void registerTreeChangeListener(YangInstanceIdentifier treeId,
+ DOMDataTreeChangeListener listener, Optional<DataTreeCandidate> currentState,
+ Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+ notifierActor().tell(new ShardDataTreeChangePublisherActor.RegisterListener(treeId, listener, currentState,
+ onRegistration), ActorRef.noSender());
}
@Override
- protected ShardDataTreeNotificationPublisher getDelegatePublisher() {
- return delegatePublisher;
+ protected Props props() {
+ return ShardDataTreeChangePublisherActor.props(actorName(), logContext());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.function.Consumer;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Actor used to generate and publish DataTreeChange notifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardDataTreeChangePublisherActor
+ extends ShardDataTreeNotificationPublisherActor<ShardDataTreeChangeListenerPublisher> {
+
+ private ShardDataTreeChangePublisherActor(final String name, final String logContext) {
+ super(new DefaultShardDataTreeChangeListenerPublisher(), name, logContext);
+ }
+
+ @Override
+ protected void handleReceive(Object message) {
+ if (message instanceof RegisterListener) {
+ RegisterListener reg = (RegisterListener)message;
+ if (reg.initialState.isPresent()) {
+ DefaultShardDataTreeChangeListenerPublisher.notifySingleListener(reg.path, reg.listener,
+ reg.initialState.get());
+ }
+
+ publisher().registerTreeChangeListener(reg.path, reg.listener, Optional.absent(), reg.onRegistration);
+ } else {
+ super.handleReceive(message);
+ }
+ }
+
+ static Props props(final String name, final String logContext) {
+ return Props.create(ShardDataTreeChangePublisherActor.class, name, logContext);
+ }
+
+ static class RegisterListener {
+ private final YangInstanceIdentifier path;
+ private final DOMDataTreeChangeListener listener;
+ private final Optional<DataTreeCandidate> initialState;
+ private final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration;
+
+ RegisterListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+ final Optional<DataTreeCandidate> initialState,
+ final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+ this.path = Preconditions.checkNotNull(path);
+ this.listener = Preconditions.checkNotNull(listener);
+ this.initialState = Preconditions.checkNotNull(initialState);
+ this.onRegistration = Preconditions.checkNotNull(onRegistration);
+ }
+ }
+}
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.Props;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
*
* @author Thomas Pantelis
*/
-public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActor {
+public class ShardDataTreeNotificationPublisherActor<T extends ShardDataTreeNotificationPublisher>
+ extends AbstractUntypedActor {
+ private final T publisher;
private final Stopwatch timer = Stopwatch.createUnstarted();
private final String name;
+ private final String logContext;
- private ShardDataTreeNotificationPublisherActor(String name) {
+ protected ShardDataTreeNotificationPublisherActor(final T publisher, final String name, final String logContext) {
+ this.publisher = publisher;
this.name = name;
+ this.logContext = logContext;
+ }
+
+ protected T publisher() {
+ return publisher;
+ }
+
+ protected String logContext() {
+ return logContext;
}
@Override
protected void handleReceive(Object message) {
if (message instanceof PublishNotifications) {
- PublishNotifications publisher = (PublishNotifications)message;
+ PublishNotifications toPublish = (PublishNotifications)message;
timer.start();
try {
- publisher.publish();
+ publisher.publishChanges(toPublish.candidate, logContext);
} finally {
long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
if (elapsedTime >= ShardDataTreeNotificationPublisher.PUBLISH_DELAY_THRESHOLD_IN_MS) {
LOG.warn("{}: Generation of change events for {} took longer than expected. Elapsed time: {}",
- publisher.logContext, name, timer);
+ logContext, name, timer);
} else {
- LOG.debug("{}: Elapsed time for generation of change events for {}: {}", publisher.logContext,
- name, timer);
+ LOG.debug("{}: Elapsed time for generation of change events for {}: {}", logContext, name, timer);
}
timer.reset();
}
}
- static Props props(String notificationType) {
- return Props.create(ShardDataTreeNotificationPublisherActor.class, notificationType);
- }
-
static class PublishNotifications {
- private final ShardDataTreeNotificationPublisher publisher;
private final DataTreeCandidate candidate;
- private final String logContext;
- PublishNotifications(ShardDataTreeNotificationPublisher publisher, DataTreeCandidate candidate,
- String logContext) {
- this.publisher = publisher;
+ PublishNotifications(DataTreeCandidate candidate) {
this.candidate = candidate;
- this.logContext = logContext;
- }
-
- private void publish() {
- publisher.publishChanges(candidate, logContext);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Actor co-located with a shard. It exists only to terminate the registration when
+ * asked to do so via {@link CloseDataTreeNotificationListenerRegistration}.
+ */
+public final class DataTreeNotificationListenerRegistrationActor extends AbstractUntypedActor {
+ @VisibleForTesting
+ static long killDelay = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
+
+ private ListenerRegistration<?> registration;
+ private Runnable onClose;
+ private boolean closed;
+ private Cancellable killSchedule;
+
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ if (message instanceof CloseDataTreeNotificationListenerRegistration) {
+ closeListenerRegistration();
+ if (isValidSender(getSender())) {
+ getSender().tell(CloseDataTreeNotificationListenerRegistrationReply.getInstance(), getSelf());
+ }
+ } else if (message instanceof SetRegistration) {
+ registration = ((SetRegistration)message).registration;
+ onClose = ((SetRegistration)message).onClose;
+ if (closed) {
+ closeListenerRegistration();
+ }
+ } else {
+ unknownMessage(message);
+ }
+ }
+
+ private void closeListenerRegistration() {
+ closed = true;
+ if (registration != null) {
+ registration.close();
+ onClose.run();
+ registration = null;
+
+ if (killSchedule == null) {
+ killSchedule = getContext().system().scheduler().scheduleOnce(Duration.create(killDelay,
+ TimeUnit.MILLISECONDS), getSelf(), PoisonPill.getInstance(), getContext().dispatcher(),
+ ActorRef.noSender());
+ }
+ }
+ }
+
+ public static Props props() {
+ return Props.create(DataTreeNotificationListenerRegistrationActor.class);
+ }
+
+ public static class SetRegistration {
+ private final ListenerRegistration<?> registration;
+ private final Runnable onClose;
+
+ public SetRegistration(final ListenerRegistration<?> registration, final Runnable onClose) {
+ this.registration = Preconditions.checkNotNull(registration);
+ this.onClose = Preconditions.checkNotNull(onClose);
+ }
+ }
+}
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
+import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
.node(ENTITY_OWNER_QNAME).build();
void init(ShardDataTree shardDataTree) {
- shardDataTree.registerTreeChangeListener(EOS_PATH, this);
+ shardDataTree.registerTreeChangeListener(EOS_PATH, this, Optional.absent(), noop -> { });
}
protected static String extractOwner(LeafNode<?> ownerLeaf) {
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
import akka.actor.ActorRef;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
void init(ShardDataTree shardDataTree) {
shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH)
.node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME)
- .node(Candidate.QNAME).node(Candidate.QNAME).build(), this);
+ .node(Candidate.QNAME).node(Candidate.QNAME).build(), this, Optional.absent(), noop -> { });
}
@Override
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-public class CloseDataChangeListenerRegistration {
- public static final CloseDataChangeListenerRegistration INSTANCE = new CloseDataChangeListenerRegistration();
-
- private CloseDataChangeListenerRegistration() {
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-
-public class CloseDataChangeListenerRegistrationReply {
- public static final CloseDataChangeListenerRegistrationReply INSTANCE =
- new CloseDataChangeListenerRegistrationReply();
-
- private CloseDataChangeListenerRegistrationReply() {
- }
-}
import java.io.ObjectStreamException;
import java.io.Serializable;
-public final class CloseDataTreeChangeListenerRegistration implements Serializable {
+public final class CloseDataTreeNotificationListenerRegistration implements Serializable {
private static final long serialVersionUID = 1L;
- private static final CloseDataTreeChangeListenerRegistration INSTANCE =
- new CloseDataTreeChangeListenerRegistration();
+ private static final CloseDataTreeNotificationListenerRegistration INSTANCE =
+ new CloseDataTreeNotificationListenerRegistration();
- private CloseDataTreeChangeListenerRegistration() {
+ private CloseDataTreeNotificationListenerRegistration() {
}
- public static CloseDataTreeChangeListenerRegistration getInstance() {
+ public static CloseDataTreeNotificationListenerRegistration getInstance() {
return INSTANCE;
}
import java.io.ObjectStreamException;
import java.io.Serializable;
-public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable {
+public final class CloseDataTreeNotificationListenerRegistrationReply implements Serializable {
private static final long serialVersionUID = 1L;
- private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE =
- new CloseDataTreeChangeListenerRegistrationReply();
+ private static final CloseDataTreeNotificationListenerRegistrationReply INSTANCE =
+ new CloseDataTreeNotificationListenerRegistrationReply();
- private CloseDataTreeChangeListenerRegistrationReply() {
+ private CloseDataTreeNotificationListenerRegistrationReply() {
// Use getInstance() instead
}
- public static CloseDataTreeChangeListenerRegistrationReply getInstance() {
+ public static CloseDataTreeNotificationListenerRegistrationReply getInstance() {
return INSTANCE;
}
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import akka.actor.ActorPath;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public interface ListenerRegistrationMessage {
YangInstanceIdentifier getPath();
boolean isRegisterOnAllInstances();
+
+ ActorPath getListenerActorPath();
}
public class RegisterChangeListener implements ListenerRegistrationMessage {
private final YangInstanceIdentifier path;
- private final ActorRef dataChangeListener;
+ private final ActorRef dataChangeListenerActor;
private final AsyncDataBroker.DataChangeScope scope;
private final boolean registerOnAllInstances;
- public RegisterChangeListener(YangInstanceIdentifier path,
- ActorRef dataChangeListener,
- AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
+ public RegisterChangeListener(YangInstanceIdentifier path, ActorRef dataChangeListenerActor,
+ AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
this.path = path;
- this.dataChangeListener = dataChangeListener;
+ this.dataChangeListenerActor = dataChangeListenerActor;
this.scope = scope;
this.registerOnAllInstances = registerOnAllInstances;
}
return scope;
}
- public ActorPath getDataChangeListenerPath() {
- return dataChangeListener.path();
+ @Override
+ public ActorPath getListenerActorPath() {
+ return dataChangeListenerActor.path();
}
@Override
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import akka.actor.ActorPath;
import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
import java.io.Externalizable;
return path;
}
- public ActorRef getDataTreeChangeListenerPath() {
- return dataTreeChangeListenerPath;
+ @Override
+ public ActorPath getListenerActorPath() {
+ return dataTreeChangeListenerPath.path();
}
@Override
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
proxy.close();
// The listener registration actor should get a Close message
- expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
+ expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
// The DataChangeListener actor should be terminated
expectMsgClass(timeout, Terminated.class);
proxy.close();
// The listener registration actor should get a Close message
- expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
+ expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
// The DataChangeListener actor should be terminated
expectMsgClass(timeout, Terminated.class);
proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
AsyncDataBroker.DataChangeScope.ONE);
- expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.class);
+ expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class);
Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
proxy.close();
+++ /dev/null
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public class DataChangeListenerRegistrationTest extends AbstractActorTest {
- private static final InMemoryDOMDataStore STORE = new InMemoryDOMDataStore("OPER",
- MoreExecutors.newDirectExecutorService());
-
- static {
- STORE.onGlobalContextUpdated(TestModel.createTestContext());
- }
-
- @Test
- public void testOnReceiveCloseListenerRegistration() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- final Props props = DataChangeListenerRegistrationActor.props(STORE.registerChangeListener(
- TestModel.TEST_PATH, noOpDataChangeListener(), AsyncDataBroker.DataChangeScope.BASE));
- final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
-
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(CloseDataChangeListenerRegistration.INSTANCE, getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run
- // afterwards
- @Override
- protected String match(final Object in) {
- if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.class)) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
-
- expectNoMsg();
- }
-
- };
- }
- };
- }
-
- private static AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
- return change -> {
- };
- }
-}
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
proxy.close();
// The listener registration actor should get a Close message
- expectMsgClass(timeout, CloseDataTreeChangeListenerRegistration.class);
+ expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
// The DataChangeListener actor should be terminated
expectMsgClass(timeout, Terminated.class);
proxy.init(shardName);
- expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class);
+ expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class);
Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-
-public class DataTreeChangeListenerRegistrationActorTest extends AbstractActorTest {
- private static final InMemoryDOMDataStore STORE = new InMemoryDOMDataStore("OPER",
- MoreExecutors.newDirectExecutorService());
-
- static {
- STORE.onGlobalContextUpdated(TestModel.createTestContext());
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testOnReceiveCloseListenerRegistration() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- final ListenerRegistration mockListenerReg = Mockito.mock(ListenerRegistration.class);
- final Props props = DataTreeChangeListenerRegistrationActor.props(mockListenerReg);
- final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
-
- subject.tell(CloseDataTreeChangeListenerRegistration.getInstance(), getRef());
-
- expectMsgClass(duration("1 second"), CloseDataTreeChangeListenerRegistrationReply.class);
-
- Mockito.verify(mockListenerReg).close();
- }
- };
- }
-}
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
listener.reset(1);
JavaTestKit kit = new JavaTestKit(getSystem());
- entry.getValue().tell(CloseDataTreeChangeListenerRegistration.getInstance(), kit.getRef());
- kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeChangeListenerRegistrationReply.class);
+ entry.getValue().tell(CloseDataTreeNotificationListenerRegistration.getInstance(), kit.getRef());
+ kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeNotificationListenerRegistrationReply.class);
writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
listener.verifyNoNotifiedData(TEST_PATH);
import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@Before
public void setUp() throws IOException {
+ InMemorySnapshotStore.clear();
+ InMemoryJournal.clear();
system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
Cluster.get(system).join(member1Address);
assertNotNull("registerChangeListener returned null", listenerReg);
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("getDataChangeListenerActors", 1,
+ state.getDataChangeListenerActors().size()));
+
// Wait for the initial notification
listener.waitForChangeEvents(TestModel.TEST_PATH);
listener.reset(2);
listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
listenerReg.close();
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("getDataChangeListenerActors", 0,
+ state.getDataChangeListenerActors().size()));
+
+ testWriteTransaction(dataStore,
+ YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+ listener.expectNoMoreChanges("Received unexpected change after close");
+ }
+ }
+ };
+ }
+
+ @Test
+ public void testDataTreeChangeListenerRegistration() throws Exception {
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+ {
+ try (final AbstractDataStore dataStore = setupAbstractDataStore(
+ testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
+
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+
+ ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
+ .registerTreeChangeListener(TestModel.TEST_PATH, listener);
+
+ assertNotNull("registerTreeChangeListener returned null", listenerReg);
+
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("getTreeChangeListenerActors", 1,
+ state.getTreeChangeListenerActors().size()));
+
+ // Wait for the initial notification
+ listener.waitForChangeEvents(TestModel.TEST_PATH);
+ listener.reset(2);
+
+ // Write 2 updates.
+ testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+ YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+ testWriteTransaction(dataStore, listPath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+ // Wait for the 2 updates.
+ listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
+ listenerReg.close();
+
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("getTreeChangeListenerActors", 0,
+ state.getTreeChangeListenerActors().size()));
+
testWriteTransaction(dataStore,
YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
immediatePayloadReplication(shardDataTree, mockShard);
DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
- shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener);
+ shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener,
+ Optional.absent(), noop -> { });
addCar(shardDataTree, "optima");
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
}
@Test
- public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
+ public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {
{
- final String testName = "testClusteredDataChangeListenerDelayedRegistration";
+ final String testName = "testClusteredDataChangeListenerWithDelayedRegistration";
dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
}
@Test
- public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+ public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {
{
- final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+ final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
};
}
+ @Test
+ public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+ TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ setupInMemorySnapshotStore();
+
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
+
+ waitUntilNoLeader(shard);
+
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
+ regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+ expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
+
+ shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+ .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
+
+ listener.expectNoMoreChanges("Received unexpected change after close");
+ }
+ };
+ }
+
@Test
public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
new ShardTestKit(getSystem()) {
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors;
+
+import static org.mockito.Mockito.timeout;
+
+import akka.actor.ActorRef;
+import akka.testkit.JavaTestKit;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+public class DataTreeNotificationListenerRegistrationActorTest extends AbstractActorTest {
+ @Mock
+ private ListenerRegistration<?> mockListenerReg;
+
+ @Mock
+ private Runnable mockOnClose;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ DataTreeNotificationListenerRegistrationActor.killDelay = 100;
+ }
+
+ @Test
+ public void testOnReceiveCloseListenerRegistrationAfterSetRegistration() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(),
+ "testOnReceiveCloseListenerRegistrationAfterSetRegistration");
+ watch(subject);
+
+ subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg,
+ mockOnClose), ActorRef.noSender());
+ subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+
+ expectMsgClass(duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class);
+
+ Mockito.verify(mockListenerReg, timeout(5000)).close();
+ Mockito.verify(mockOnClose, timeout(5000)).run();
+
+ expectTerminated(duration("5 second"), subject);
+ }
+ };
+ }
+
+ @Test
+ public void testOnReceiveCloseListenerRegistrationBeforeSetRegistration() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(),
+ "testOnReceiveSetRegistrationAfterPriorClose");
+ watch(subject);
+
+ subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+ expectMsgClass(duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class);
+
+ subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg,
+ mockOnClose), ActorRef.noSender());
+
+ Mockito.verify(mockListenerReg, timeout(5000)).close();
+ Mockito.verify(mockOnClose, timeout(5000)).run();
+
+ expectTerminated(duration("5 second"), subject);
+ }
+ };
+ }
+
+ @Test
+ public void testOnReceiveSetRegistrationAfterPriorClose() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ DataTreeNotificationListenerRegistrationActor.killDelay = 1000;
+ final ListenerRegistration<?> mockListenerReg2 = Mockito.mock(ListenerRegistration.class);
+ final Runnable mockOnClose2 = Mockito.mock(Runnable.class);
+
+ final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(),
+ "testOnReceiveSetRegistrationAfterPriorClose");
+ watch(subject);
+
+ subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg,
+ mockOnClose), ActorRef.noSender());
+ subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
+ subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg2,
+ mockOnClose2), ActorRef.noSender());
+
+ Mockito.verify(mockListenerReg, timeout(5000)).close();
+ Mockito.verify(mockOnClose, timeout(5000)).run();
+ Mockito.verify(mockListenerReg2, timeout(5000)).close();
+ Mockito.verify(mockOnClose2, timeout(5000)).run();
+
+ expectTerminated(duration("5 second"), subject);
+ }
+ };
+ }
+}
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
+import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
}
}
- public void waitForChangeEvents() {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);
if (!done) {
fail(String.format("Missing change notifications. Expected: %d. Actual: %d",
expChangeEventCount, expChangeEventCount - changeLatch.getCount()));
}
+
+ for (int i = 0; i < expPaths.length; i++) {
+ final DataTreeCandidate candidate = changeList.get(i);
+ final Optional<NormalizedNode<?, ?>> maybeDataAfter = candidate.getRootNode().getDataAfter();
+ if (!maybeDataAfter.isPresent()) {
+ fail(String.format("Change %d does not contain data after. Actual: %s", i + 1,
+ candidate.getRootNode()));
+ }
+
+ final NormalizedNode<?, ?> dataAfter = maybeDataAfter.get();
+ final Optional<YangInstanceIdentifier> relativePath = expPaths[i].relativeTo(candidate.getRootPath());
+ if (!relativePath.isPresent()) {
+ assertEquals(String.format("Change %d does not contain %s. Actual: %s", i + 1, expPaths[i],
+ dataAfter), expPaths[i].getLastPathArgument(), dataAfter.getIdentifier());
+ } else {
+ NormalizedNode<?, ?> nextChild = dataAfter;
+ for (PathArgument pathArg: relativePath.get().getPathArguments()) {
+ boolean found = false;
+ if (nextChild instanceof NormalizedNodeContainer) {
+ Optional<NormalizedNode<?, ?>> maybeChild = ((NormalizedNodeContainer)nextChild)
+ .getChild(pathArg);
+ if (maybeChild.isPresent()) {
+ found = true;
+ nextChild = maybeChild.get();
+ }
+ }
+
+ if (!found) {
+ fail(String.format("Change %d does not contain %s. Actual: %s", i + 1, expPaths[i], dataAfter));
+ }
+ }
+ }
+ }
}
public void verifyNotifiedData(YangInstanceIdentifier... paths) {