import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class DataChangeListenerRegistration extends AbstractUntypedActor {
+public class DataChangeListenerRegistrationActor extends AbstractUntypedActor {
private final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
registration;
- public DataChangeListenerRegistration(
+ public DataChangeListenerRegistrationActor(
ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
this.registration = registration;
}
}
private static class DataChangeListenerRegistrationCreator
- implements Creator<DataChangeListenerRegistration> {
+ implements Creator<DataChangeListenerRegistrationActor> {
private static final long serialVersionUID = 1L;
final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
}
@Override
- public DataChangeListenerRegistration create() throws Exception {
- return new DataChangeListenerRegistration(registration);
+ public DataChangeListenerRegistrationActor create() throws Exception {
+ return new DataChangeListenerRegistrationActor(registration);
}
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener, ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> {
+final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener,
+ DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
+ Optional<DataTreeCandidate>> {
private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
private void registerDelayedListeners(DelayedListenerRegistration reg) {
if(!reg.isClosed()) {
- final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
- createDelegate(reg.getRegisterChangeListener());
+ final Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
+ Optional<DataTreeCandidate>> res = createDelegate(reg.getRegisterChangeListener());
reg.setDelegate(res.getKey());
- if (res.getValue() != null) {
- reg.getInstance().onDataChanged(res.getValue());
- }
+ getShard().getDataStore().notifyOfInitialData(res.getKey(), res.getValue());
}
}
final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
- final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
- final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
- createDelegate(message);
+ final Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
+ Optional<DataTreeCandidate>> res = createDelegate(message);
registration = res.getKey();
- event = res.getValue();
+
+ getShard().getDataStore().notifyOfInitialData(res.getKey(), res.getValue());
} else {
LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
delayedListenerRegistrations.add(delayedReg);
}
registration = delayedReg;
- event = null;
}
- ActorRef listenerRegistration = createActor(DataChangeListenerRegistration.props(registration));
+ ActorRef listenerRegistration = createActor(DataChangeListenerRegistrationActor.props(registration));
LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
persistenceId(), listenerRegistration.path());
tellSender(new RegisterChangeListenerReply(listenerRegistration));
- if (event != null) {
- registration.getInstance().onDataChanged(event);
- }
}
@Override
- Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> createDelegate(
- final RegisterChangeListener message) {
+ Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
+ Optional<DataTreeCandidate>> createDelegate(final RegisterChangeListener message) {
ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
// Notify the listener if notifications should be enabled or not
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map.Entry;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> {
+final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener,
+ ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> {
private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
private final ArrayList<DelayedDataTreeListenerRegistration> delayedRegistrations = new ArrayList<>();
private final Collection<ActorSelection> actors = new ArrayList<>();
@Override
void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
+ final EnableNotification msg = new EnableNotification(isLeader);
+ for (ActorSelection dataChangeListener : actors) {
+ dataChangeListener.tell(msg, getSelf());
+ }
+
if (isLeader) {
for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
reg.createDelegate(this);
delayedRegistrations.clear();
delayedRegistrations.trimToSize();
}
-
- final EnableNotification msg = new EnableNotification(isLeader);
- for (ActorSelection dataChangeListener : actors) {
- dataChangeListener.tell(msg, getSelf());
- }
}
@Override
LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
final ListenerRegistration<DOMDataTreeChangeListener> registration;
- final DataTreeCandidate event;
if (!isLeader) {
LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
new DelayedDataTreeListenerRegistration(registerTreeChangeListener);
delayedRegistrations.add(delayedReg);
registration = delayedReg;
- event = null;
} else {
- final Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> res = createDelegate(registerTreeChangeListener);
+ final Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> res =
+ createDelegate(registerTreeChangeListener);
registration = res.getKey();
- event = res.getValue();
+ getShard().getDataStore().notifyOfInitialData(registerTreeChangeListener.getPath(),
+ registration.getInstance(), res.getValue());
}
ActorRef listenerRegistration = createActor(DataTreeChangeListenerRegistrationActor.props(registration));
persistenceId(), listenerRegistration.path());
tellSender(new RegisterDataTreeChangeListenerReply(listenerRegistration));
- if (event != null) {
- registration.getInstance().onDataTreeChanged(Collections.singletonList(event));
- }
}
@Override
- Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> createDelegate(final RegisterDataTreeChangeListener message) {
+ Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> createDelegate(final RegisterDataTreeChangeListener message) {
ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
// Notify the listener if notifications should be enabled or not
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import java.util.Collections;
import java.util.Map.Entry;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener);
}
- synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> factory) {
+ synchronized void createDelegate(final LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> factory) {
if (!closed) {
- final Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> res = factory.createDelegate(registerTreeChangeListener);
+ final Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> res =
+ factory.createDelegate(registerTreeChangeListener);
this.delegate = res.getKey();
- if (res.getValue() != null) {
- delegate.getInstance().onDataTreeChanged(Collections.singletonList(res.getValue()));
- }
+ factory.getShard().getDataStore().notifyOfInitialData(registerTreeChangeListener.getPath(),
+ this.delegate.getInstance(), res.getValue());
}
}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
@NotThreadSafe
public class ShardDataTree extends ShardDataTreeTransactionParent {
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
+ private static final YangInstanceIdentifier ROOT_PATH = YangInstanceIdentifier.builder().build();
private static final ShardDataTreeNotificationManager MANAGER = new ShardDataTreeNotificationManager();
private final Map<String, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
private final ShardDataTreeChangePublisher treeChangePublisher = new ShardDataTreeChangePublisher();
ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(MANAGER);
}
+ void notifyOfInitialData(DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> listenerReg, Optional<DataTreeCandidate> currentState) {
+
+ if(currentState.isPresent()) {
+ ListenerTree localListenerTree = ListenerTree.create();
+ localListenerTree.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
+ listenerReg.getScope());
+
+ ResolveDataChangeEventsTask.create(currentState.get(), localListenerTree).resolve(MANAGER);
+ }
+ }
+
+ void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+ final Optional<DataTreeCandidate> currentState) {
+ if(currentState.isPresent()) {
+ ShardDataTreeChangePublisher localTreeChangePublisher = new ShardDataTreeChangePublisher();
+ localTreeChangePublisher.registerTreeChangeListener(path, listener);
+ localTreeChangePublisher.publishChanges(currentState.get());
+ }
+ }
+
void closeAllTransactionChains() {
for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
chain.close();
}
}
- Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> registerChangeListener(
- final YangInstanceIdentifier path,
- final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, final DataChangeScope scope) {
- final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+ Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
+ Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
+ final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+ final DataChangeScope scope) {
+ final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
listenerTree.registerDataChangeListener(path, listener, scope);
- final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
- final DOMImmutableDataChangeEvent event;
- if (currentState.isPresent()) {
- final NormalizedNode<?, ?> data = currentState.get();
- event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE).setAfter(data).addCreated(path, data).build();
- } else {
- event = null;
- }
+ return new SimpleEntry<>(reg, readCurrentData());
+ }
- return new SimpleEntry<>(reg, event);
+ private Optional<DataTreeCandidate> readCurrentData() {
+ final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(ROOT_PATH);
+ return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
+ ROOT_PATH, currentState.get())) : Optional.<DataTreeCandidate>absent();
}
- public Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> registerTreeChangeListener(final YangInstanceIdentifier path,
- final DOMDataTreeChangeListener listener) {
- final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangePublisher.registerTreeChangeListener(path, listener);
+ public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
+ final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
+ final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangePublisher.registerTreeChangeListener(
+ path, listener);
- final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
- final DataTreeCandidate event;
- if (currentState.isPresent()) {
- event = DataTreeCandidates.fromNormalizedNode(path, currentState.get());
- } else {
- event = null;
- }
- return new SimpleEntry<>(reg, event);
+ return new SimpleEntry<>(reg, readCurrentData());
}
void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
cohort.commit();
}
+ public static void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
+ final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
+ final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null);
+
+ transaction.getSnapshot().merge(id, node);
+ final ShardDataTreeCohort cohort = transaction.ready();
+ cohort.canCommit().get();
+ cohort.preCommit().get();
+ cohort.commit();
+ }
+
public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
final NormalizedNode<?,?> node) throws DataValidationFailedException {
final DataTreeModification transaction = store.takeSnapshot().newModification();
@Test
public void testOnReceiveCloseListenerRegistration() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = DataChangeListenerRegistration.props(store
+ final Props props = DataChangeListenerRegistrationActor.props(store
.registerChangeListener(TestModel.TEST_PATH, noOpDataChangeListener(),
AsyncDataBroker.DataChangeScope.BASE));
final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_CONTAINER_PATH;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_CONTAINER_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryKey;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNodeEntry;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.testNodeWithOuter;
+import akka.actor.ActorRef;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+/**
+ * Unit tests for DataChangeListenerSupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataChangeListenerSupportTest extends AbstractShardTest {
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+ private Shard shard;
+ private DataChangeListenerSupport support;
+
+ @Before
+ public void setup() {
+ shard = createShard();
+ support = new DataChangeListenerSupport(shard);
+ }
+
+ @Override
+ @After
+ public void tearDown() {
+ super.tearDown();
+ actorFactory.close();
+ }
+
+ @Test
+ public void testChangeListenerWithNoInitialData() throws Exception {
+ MockDataChangeListener listener = registerChangeListener(TEST_PATH, DataChangeScope.ONE, 0, true);
+
+ listener.expectNoMoreChanges("Unexpected initial change event");
+ }
+
+ @Test
+ public void testInitialChangeListenerEventWithContainerPath() throws Exception {
+ writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
+
+ MockDataChangeListener listener = registerChangeListener(TEST_PATH, DataChangeScope.ONE, 1, true);
+
+ listener.waitForChangeEvents(TEST_PATH);
+ }
+
+ @Test
+ public void testInitialChangeListenerEventWithListPath() throws Exception {
+ mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
+
+ MockDataChangeListener listener = registerChangeListener(OUTER_LIST_PATH, DataChangeScope.ONE, 1, true);
+
+ listener.waitForChangeEvents();
+ assertEquals("Outer entry 1 present", true, NormalizedNodes.findNode(
+ listener.getCreatedData(0, OUTER_LIST_PATH), outerEntryKey(1)).isPresent());
+ assertEquals("Outer entry 2 present", true, NormalizedNodes.findNode(
+ listener.getCreatedData(0, OUTER_LIST_PATH), outerEntryKey(2)).isPresent());
+ }
+
+ @Test
+ public void testInitialChangeListenerEventWithWildcardedListPath() throws Exception {
+ mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
+ writeToStore(shard.getDataStore(), OUTER_CONTAINER_PATH, ImmutableNodes.containerNode(OUTER_CONTAINER_QNAME));
+
+ MockDataChangeListener listener = registerChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME),
+ DataChangeScope.ONE, 1, true);
+
+ listener.waitForChangeEvents();
+ listener.verifyCreatedData(0, outerEntryPath(1));
+ listener.verifyCreatedData(0, outerEntryPath(2));
+ listener.verifyNoCreatedData(0, OUTER_CONTAINER_PATH);
+ }
+
+ @Test
+ public void testInitialChangeListenerEventWithNestedWildcardedListsPath() throws Exception {
+ mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(outerNode(
+ outerNodeEntry(1, innerNode("one", "two")), outerNodeEntry(2, innerNode("three", "four")))));
+
+ MockDataChangeListener listener = registerChangeListener(
+ OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME),
+ DataChangeScope.ONE, 1, true);
+
+ listener.waitForChangeEvents();
+ listener.verifyCreatedData(0, innerEntryPath(1, "one"));
+ listener.verifyCreatedData(0, innerEntryPath(1, "two"));
+ listener.verifyCreatedData(0, innerEntryPath(2, "three"));
+ listener.verifyCreatedData(0, innerEntryPath(2, "four"));
+
+ // Register for a specific outer list entry
+
+ MockDataChangeListener listener2 = registerChangeListener(
+ OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME),
+ DataChangeScope.ONE, 1, true);
+
+ listener2.waitForChangeEvents();
+ listener2.verifyCreatedData(0, innerEntryPath(1, "one"));
+ listener2.verifyCreatedData(0, innerEntryPath(1, "two"));
+ listener2.verifyNoCreatedData(0, innerEntryPath(2, "three"));
+ listener2.verifyNoCreatedData(0, innerEntryPath(2, "four"));
+ }
+
+ @Test
+ public void testInitialChangeListenerEventWhenNotInitiallyLeader() throws Exception {
+ mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(outerNode(
+ outerNodeEntry(1, innerNode("one", "two")), outerNodeEntry(2, innerNode("three", "four")))));
+
+ MockDataChangeListener listener = registerChangeListener(
+ OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME),
+ DataChangeScope.ONE, 0, false);
+
+ listener.expectNoMoreChanges("Unexpected initial change event");
+ listener.reset(1);
+
+ support.onLeadershipChange(true, true);
+
+ listener.waitForChangeEvents();
+ listener.verifyCreatedData(0, innerEntryPath(1, "one"));
+ listener.verifyCreatedData(0, innerEntryPath(1, "two"));
+ listener.verifyCreatedData(0, innerEntryPath(2, "three"));
+ listener.verifyCreatedData(0, innerEntryPath(2, "four"));
+ }
+
+ private MockDataChangeListener registerChangeListener(YangInstanceIdentifier path, DataChangeScope scope,
+ int expectedEvents, boolean isLeader) {
+ MockDataChangeListener listener = new MockDataChangeListener(expectedEvents);
+ ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener));
+
+ support.onMessage(new RegisterChangeListener(path, dclActor, scope, false), isLeader, true);
+ return listener;
+ }
+
+ private Shard createShard() {
+ TestActorRef<Shard> actor = actorFactory.createTestActor(newShardProps());
+ return actor.underlyingActor();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryKey;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNodeEntry;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.testNodeWithOuter;
+import akka.actor.ActorRef;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+/**
+ * Unit tests for DataTreeChangeListenerSupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+ private Shard shard;
+ private DataTreeChangeListenerSupport support;
+
+ @Before
+ public void setup() {
+ shard = createShard();
+ support = new DataTreeChangeListenerSupport(shard);
+ }
+
+ @Override
+ @After
+ public void tearDown() {
+ super.tearDown();
+ actorFactory.close();
+ }
+
+ @Test
+ public void testChangeListenerWithNoInitialData() throws Exception {
+ MockDataTreeChangeListener listener = registerChangeListener(TEST_PATH, 0, true);
+
+ listener.expectNoMoreChanges("Unexpected initial change event");
+ }
+
+ @Test
+ public void testInitialChangeListenerEventWithContainerPath() throws Exception {
+ writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
+
+ MockDataTreeChangeListener listener = registerChangeListener(TEST_PATH, 1, true);
+
+ listener.waitForChangeEvents();
+ listener.verifyNotifiedData(TEST_PATH);
+ }
+
+ @Test
+ public void testInitialChangeListenerEventWithListPath() throws Exception {
+ mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
+
+ MockDataTreeChangeListener listener = registerChangeListener(OUTER_LIST_PATH, 1, true);
+
+ listener.waitForChangeEvents();
+ listener.verifyNotifiedData(OUTER_LIST_PATH);
+ }
+
+ @Test
+ public void testInitialChangeListenerEventWithWildcardedListPath() throws Exception {
+ mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
+
+ MockDataTreeChangeListener listener = registerChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME), 2, true);
+
+ listener.waitForChangeEvents();
+ listener.verifyNotifiedData(outerEntryPath(1), outerEntryPath(2));
+ }
+
+ @Test
+ public void testInitialChangeListenerEventWithNestedWildcardedListsPath() throws Exception {
+ mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(outerNode(
+ outerNodeEntry(1, innerNode("one", "two")), outerNodeEntry(2, innerNode("three", "four")))));
+
+ MockDataTreeChangeListener listener = registerChangeListener(
+ OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), 4, true);
+
+ listener.waitForChangeEvents();
+ listener.verifyNotifiedData(innerEntryPath(1, "one"), innerEntryPath(1, "two"), innerEntryPath(2, "three"),
+ innerEntryPath(2, "four"));
+
+ // Register for a specific outer list entry
+
+ MockDataTreeChangeListener listener2 = registerChangeListener(
+ OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), 2, true);
+
+ listener2.waitForChangeEvents();
+ listener2.verifyNotifiedData(innerEntryPath(1, "one"), innerEntryPath(1, "two"));
+ listener2.verifyNoNotifiedData(innerEntryPath(2, "three"), innerEntryPath(2, "four"));
+ }
+
+ private MockDataTreeChangeListener registerChangeListener(YangInstanceIdentifier path,
+ int expectedEvents, boolean isLeader) {
+ MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
+ ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
+ support.onMessage(new RegisterDataTreeChangeListener(path, dclActor), isLeader, true);
+ return listener;
+ }
+
+ private Shard createShard() {
+ TestActorRef<Shard> actor = actorFactory.createTestActor(newShardProps());
+ return actor.underlyingActor();
+ }
+}
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
}
for(int i = 0; i < expPaths.length; i++) {
- assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),
- changeList.get(i).getCreatedData().containsKey(expPaths[i]));
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = changeList.get(i).getCreatedData();
+ assertTrue(String.format("Change %d does not contain %s. Actual: %s", (i+1), expPaths[i], createdData),
+ createdData.containsKey(expPaths[i]));
}
}
+ public NormalizedNode<?, ?> getCreatedData(int i, YangInstanceIdentifier path) {
+ return changeList.get(i).getCreatedData().get(path);
+ }
+
+ public void verifyCreatedData(int i, YangInstanceIdentifier path) {
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = changeList.get(i).getCreatedData();
+ assertTrue(path + " not present in " + createdData.keySet(), createdData.get(path) != null);
+ }
+
public void expectNoMoreChanges(String assertMsg) {
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
assertEquals(assertMsg, expChangeEventCount, changeList.size());
}
+
+ public void verifyNoCreatedData(int i, YangInstanceIdentifier path) {
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = changeList.get(i).getCreatedData();
+ assertTrue("Unexpected " + path + " present in createdData", createdData.get(path) == null);
+ }
}
*/
package org.opendaylight.controller.cluster.datastore.utils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-
-import javax.annotation.Nonnull;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
}
}
+ public void verifyNotifiedData(YangInstanceIdentifier... paths) {
+ Set<YangInstanceIdentifier> pathSet = new HashSet<>(Arrays.asList(paths));
+ for(Collection<DataTreeCandidate> list: changeList) {
+ for(DataTreeCandidate c: list) {
+ pathSet.remove(c.getRootPath());
+ }
+ }
+
+ if(!pathSet.isEmpty()) {
+ fail(pathSet + " not present in " + changeList);
+ }
+ }
+
public void expectNoMoreChanges(String assertMsg) {
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
assertEquals(assertMsg, expChangeEventCount, changeList.size());
}
+
+ public void verifyNoNotifiedData(YangInstanceIdentifier... paths) {
+ Set<YangInstanceIdentifier> pathSet = new HashSet<>(Arrays.asList(paths));
+ for(Collection<DataTreeCandidate> list: changeList) {
+ for(DataTreeCandidate c: list) {
+ assertFalse("Unexpected " + c.getRootPath() + " present in DataTreeCandidate",
+ pathSet.contains(c.getRootPath()));
+ }
+ }
+ }
}
import java.util.Collections;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
+ public static final QName OUTER_CONTAINER_QNAME = QName.create(TEST_QNAME, "outer-container");
public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
node(OUTER_LIST_QNAME).build();
public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
+ public static final YangInstanceIdentifier OUTER_CONTAINER_PATH = TEST_PATH.node(OUTER_CONTAINER_QNAME);
public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
throw new ExceptionInInitializerError(e);
}
}
+
+ public static DataContainerChild<?, ?> outerNode(int... ids) {
+ CollectionNodeBuilder<MapEntryNode, MapNode> outer = ImmutableNodes.mapNodeBuilder(OUTER_LIST_QNAME);
+ for(int id: ids) {
+ outer.addChild(ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, id));
+ }
+
+ return outer.build();
+ }
+
+ public static DataContainerChild<?, ?> outerNode(MapEntryNode... entries) {
+ CollectionNodeBuilder<MapEntryNode, MapNode> outer = ImmutableNodes.mapNodeBuilder(OUTER_LIST_QNAME);
+ for(MapEntryNode e: entries) {
+ outer.addChild(e);
+ }
+
+ return outer.build();
+ }
+
+ public static DataContainerChild<?, ?> innerNode(String... names) {
+ CollectionNodeBuilder<MapEntryNode, MapNode> outer = ImmutableNodes.mapNodeBuilder(INNER_LIST_QNAME);
+ for(String name: names) {
+ outer.addChild(ImmutableNodes.mapEntry(INNER_LIST_QNAME, NAME_QNAME, name));
+ }
+
+ return outer.build();
+ }
+
+ public static MapEntryNode outerNodeEntry(int id, DataContainerChild<?, ?> inner) {
+ return ImmutableNodes.mapEntryBuilder(OUTER_LIST_QNAME, ID_QNAME, id).addChild(inner).build();
+ }
+
+ public static NormalizedNode<?, ?> testNodeWithOuter(int... ids) {
+ return testNodeWithOuter(outerNode(ids));
+ }
+
+ public static NormalizedNode<?, ?> testNodeWithOuter(DataContainerChild<?, ?> outer) {
+ return ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)).withChild(outer).build();
+ }
+
+ public static NodeIdentifierWithPredicates outerEntryKey(int id) {
+ return new NodeIdentifierWithPredicates(OUTER_LIST_QNAME, ID_QNAME, id);
+ }
+
+ public static YangInstanceIdentifier outerEntryPath(int id) {
+ return OUTER_LIST_PATH.node(outerEntryKey(id));
+ }
+
+ public static NodeIdentifierWithPredicates innerEntryKey(String name) {
+ return new NodeIdentifierWithPredicates(INNER_LIST_QNAME, NAME_QNAME, name);
+ }
+
+ public static YangInstanceIdentifier innerEntryPath(int id, String name) {
+ return OUTER_LIST_PATH.node(outerEntryKey(id)).node(INNER_LIST_QNAME).node(innerEntryKey(name));
+ }
}
}
}
}
+
+ container outer-container {
+ }
}
container test2 {