*/
package org.opendaylight.mdsal.dom.broker;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-
import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Map.Entry;
import java.util.TreeMap;
import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
parentReg = lookupShard(prefix).getRegistration();
/*
- * FIXME: adjust all producers. This is tricky, as we need different locking strategy,
- * simply because we risk AB/BA deadlock with a producer being split off from
- * a producer.
- *
+ * FIXME: adjust all producers and listeners. This is tricky, as we need different
+ * locking strategy, simply because we risk AB/BA deadlock with a producer being split
+ * off from a producer.
*/
}
@Override
public synchronized <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(final T listener, final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers) {
- // FIXME Implement this.
- throw new UnsupportedOperationException("Not implemented yet.");
+ Preconditions.checkNotNull(listener, "listener");
+ Preconditions.checkArgument(!subtrees.isEmpty(), "Subtrees must not be empty.");
+ final ShardedDOMDataTreeListenerContext<T> listenerContext =
+ ShardedDOMDataTreeListenerContext.create(listener, subtrees, allowRxMerges);
+ try {
+ // FIXME: Add attachment of producers
+ for (DOMDataTreeIdentifier subtree : subtrees) {
+ DOMDataTreeShard shard = lookupShard(subtree).getRegistration().getInstance();
+ // FIXME: What should we do if listener is wildcard? And shards are on per
+ // node basis?
+ Preconditions.checkArgument(shard instanceof DOMStoreTreeChangePublisher,
+ "Subtree %s does not point to listenable subtree.", subtree);
+
+ listenerContext.register(subtree, (DOMStoreTreeChangePublisher) shard);
+ }
+ } catch (Exception e) {
+ listenerContext.close();
+ throw e;
+ }
+ return new AbstractListenerRegistration<T>(listener) {
+ @Override
+ protected void removeRegistration() {
+ ShardedDOMDataTree.this.removeListener(listenerContext);
+ }
+ };
+ }
+
+ void removeListener(ShardedDOMDataTreeListenerContext<?> listener) {
+ // FIXME: detach producers
+ listener.close();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.broker;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.MapAdaptor;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+class ShardedDOMDataTreeListenerContext<T extends DOMDataTreeListener> implements AutoCloseable {
+
+ private final DOMDataTreeListener listener;
+ private final EnumMap<LogicalDatastoreType, StoreListener> storeListeners = new EnumMap<>(
+ LogicalDatastoreType.class);
+ private final Collection<ListenerRegistration<?>> registrations = new ArrayList<>();
+
+ // FIXME: Probably should be encapsulated into state object
+ @GuardedBy("this")
+ private Collection<DataTreeCandidate> unreported = new ArrayList<>();
+ @GuardedBy("this")
+ private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> currentData = Collections.emptyMap();
+
+ private ShardedDOMDataTreeListenerContext(T listener, Collection<DOMDataTreeIdentifier> subtrees,
+ boolean allowRxMerges) {
+ for (LogicalDatastoreType type : LogicalDatastoreType.values()) {
+ storeListeners.put(type, new StoreListener(type));
+ }
+ this.listener = Preconditions.checkNotNull(listener, "listener");
+ }
+
+ static <T extends DOMDataTreeListener> ShardedDOMDataTreeListenerContext<T> create(final T listener,
+ final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges) {
+ return new ShardedDOMDataTreeListenerContext<>(listener, subtrees, allowRxMerges);
+ }
+
+ synchronized void notifyListener() {
+ Collection<DataTreeCandidate> changesToNotify = unreported;
+ unreported = new ArrayList<>();
+ listener.onDataTreeChanged(changesToNotify, currentData);
+
+ }
+
+ void register(DOMDataTreeIdentifier subtree, DOMStoreTreeChangePublisher shard) {
+ ListenerRegistration<?> storeReg =
+ shard.registerTreeChangeListener(subtree.getRootIdentifier(),
+ storeListeners.get(subtree.getDatastoreType()));
+ registrations.add(storeReg);
+ }
+
+ private final class StoreListener implements DOMDataTreeChangeListener {
+
+ private final LogicalDatastoreType type;
+
+ public StoreListener(LogicalDatastoreType type) {
+ this.type = type;
+ }
+
+ @Override
+ public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+ receivedDataTreeChanges(type, changes);
+ scheduleNotification();
+ }
+
+ }
+
+ // FIXME: Should be able to run parallel to notifyListener and should honor
+ // allowRxMerges
+ synchronized void receivedDataTreeChanges(LogicalDatastoreType type, Collection<DataTreeCandidate> changes) {
+ Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> updatedData =
+ MapAdaptor.getDefaultInstance().takeSnapshot(currentData);
+ for (DataTreeCandidate change : changes) {
+ // FIXME: Make sure only one is reported / merged
+ unreported.add(change);
+ DOMDataTreeIdentifier treeId = new DOMDataTreeIdentifier(type, change.getRootPath());
+ // FIXME: Probably we should apply data tree candidate to previously observed state
+ Optional<NormalizedNode<?, ?>> dataAfter = change.getRootNode().getDataAfter();
+ if (dataAfter.isPresent()) {
+ updatedData.put(treeId, dataAfter.get());
+ } else {
+ updatedData.remove(treeId);
+ }
+ }
+ currentData = MapAdaptor.getDefaultInstance().optimize(updatedData);
+ }
+
+ void scheduleNotification() {
+ // FIXME: This callout should schedule delivery task
+ notifyListener();
+ }
+
+ @Override
+ public void close() {
+ for (ListenerRegistration<?> reg : registrations) {
+ reg.close();
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.broker.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.mdsal.dom.broker.test.util.TestModel;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+public class ShardedDOMDataTreeListenerTest {
+
+
+ private static final DOMDataTreeIdentifier ROOT_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+ YangInstanceIdentifier.EMPTY);
+ private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+ TestModel.TEST_PATH);
+
+ private static final Collection<DOMDataTreeIdentifier> SUBTREES_ROOT = Collections.singleton(ROOT_ID);
+ private static final Collection<DOMDataTreeIdentifier> SUBTREES_TEST = Collections.singleton(TEST_ID);
+ private static final ContainerNode TEST_CONTAINER = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ private interface ListenableShard extends DOMDataTreeShard, DOMStoreTreeChangePublisher {
+
+
+ }
+
+ @Mock(name = "rootShard")
+ private ListenableShard rootShard;
+
+ @Mock(name = "childShard")
+ private ListenableShard childShard;
+
+ @Mock(name = "listener")
+ private DOMDataTreeListener listener;
+
+ @Mock
+ private ListenerRegistration<?> storeListenerReg;
+
+ private DOMDataTreeService treeService;
+ private ListenerRegistration<ListenableShard> shardReg;
+
+ @Before
+ public void setUp() throws DOMDataTreeShardingConflictException {
+ MockitoAnnotations.initMocks(this);
+ final ShardedDOMDataTree impl = new ShardedDOMDataTree();
+ treeService = impl;
+ shardReg = impl.registerDataTreeShard(ROOT_ID, rootShard);
+ doReturn("rootShard").when(rootShard).toString();
+ doReturn("childShard").when(childShard).toString();
+
+ doReturn(storeListenerReg).when(rootShard).registerTreeChangeListener(any(YangInstanceIdentifier.class),
+ any(DOMDataTreeChangeListener.class));
+ doNothing().when(storeListenerReg).close();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void registerListenerWithEmptySubtrees() throws DOMDataTreeLoopException {
+ treeService.registerListener(listener, Collections.<DOMDataTreeIdentifier>emptyList(), true,
+ Collections.<DOMDataTreeProducer>emptyList());
+ }
+
+ @Test
+ public void registerRootListener() throws DOMDataTreeLoopException {
+ treeService.registerListener(listener, SUBTREES_ROOT, true, Collections.<DOMDataTreeProducer>emptyList());
+ verify(rootShard, times(1)).registerTreeChangeListener(eq(ROOT_ID.getRootIdentifier()),
+ any(DOMDataTreeChangeListener.class));
+ }
+
+ @Test
+ public void registerTreeListener() throws DOMDataTreeLoopException {
+ treeService.registerListener(listener, SUBTREES_TEST, true, Collections.<DOMDataTreeProducer>emptyList());
+ verify(rootShard, times(1)).registerTreeChangeListener(eq(TEST_ID.getRootIdentifier()),
+ any(DOMDataTreeChangeListener.class));
+ }
+
+ @Test
+ public void registerAndCloseListener() throws DOMDataTreeLoopException {
+ ListenerRegistration<DOMDataTreeListener> reg =
+ treeService.registerListener(listener, SUBTREES_TEST, true,
+ Collections.<DOMDataTreeProducer>emptyList());
+ reg.close();
+ verify(storeListenerReg, times(1)).close();
+ }
+
+ @Test
+ public void receiveChangeEvent() throws DOMDataTreeLoopException {
+ ArgumentCaptor<DOMDataTreeChangeListener> storeListener =
+ ArgumentCaptor.forClass(DOMDataTreeChangeListener.class);
+
+ treeService.registerListener(listener, SUBTREES_TEST, true, Collections.<DOMDataTreeProducer>emptyList());
+ verify(rootShard, times(1)).registerTreeChangeListener(eq(TEST_ID.getRootIdentifier()),
+ storeListener.capture());
+
+ DataTreeCandidate sentStoreCandidate =
+ DataTreeCandidates.fromNormalizedNode(TEST_ID.getRootIdentifier(), TEST_CONTAINER);
+ Collection<DataTreeCandidate> changes = Collections.singleton(sentStoreCandidate);
+
+ doNothing().when(listener).onDataTreeChanged(Mockito.<Collection<DataTreeCandidate>>any(), Mockito.anyMap());
+ storeListener.getValue().onDataTreeChanged(changes);
+
+ ArgumentCaptor<Collection<DataTreeCandidate>> candidateCapture = captorFor(Collection.class);
+ ArgumentCaptor<Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>>> mapCapture = captorFor(Map.class);
+ verify(listener, times(1)).onDataTreeChanged(candidateCapture.capture(), mapCapture.capture());
+
+ Collection<DataTreeCandidate> receivedCandidate = candidateCapture.getValue();
+ Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> receivedMap = mapCapture.getValue();
+
+ assertNotNull("receivedCandidate", receivedCandidate);
+ assertNotNull("receivedMap", receivedMap);
+ assertFalse("candidate collection must not be empty", receivedCandidate.isEmpty());
+ assertEquals(1, receivedCandidate.size());
+ DataTreeCandidate firstItem = receivedCandidate.iterator().next();
+ assertEquals(TEST_ID.getRootIdentifier(), firstItem.getRootPath());
+ assertEquals(TEST_CONTAINER, receivedMap.get(TEST_ID));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T, F extends T> ArgumentCaptor<F> captorFor(Class<T> rawClass) {
+ return (ArgumentCaptor) ArgumentCaptor.forClass(rawClass);
+ }
+
+}