import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.TreeMap;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
}
@Override
- public synchronized <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(final T listener, final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers) {
+ public synchronized <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(final T listener,
+ final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges,
+ final Collection<DOMDataTreeProducer> producers) throws DOMDataTreeLoopException {
Preconditions.checkNotNull(listener, "listener");
Preconditions.checkArgument(!subtrees.isEmpty(), "Subtrees must not be empty.");
final ShardedDOMDataTreeListenerContext<T> listenerContext =
ShardedDOMDataTreeListenerContext.create(listener, subtrees, allowRxMerges);
try {
// FIXME: Add attachment of producers
+ for (DOMDataTreeProducer producer : producers) {
+ Preconditions.checkArgument(producer instanceof ShardedDOMDataTreeProducer);
+ ShardedDOMDataTreeProducer castedProducer = ((ShardedDOMDataTreeProducer) producer);
+ simpleLoopCheck(subtrees, castedProducer.getSubtrees());
+ // FIXME: We should also unbound listeners
+ castedProducer.boundToListener(listenerContext);
+ }
+
for (DOMDataTreeIdentifier subtree : subtrees) {
DOMDataTreeShard shard = lookupShard(subtree).getRegistration().getInstance();
// FIXME: What should we do if listener is wildcard? And shards are on per
};
}
+ private static void simpleLoopCheck(Collection<DOMDataTreeIdentifier> listen, Set<DOMDataTreeIdentifier> writes)
+ throws DOMDataTreeLoopException {
+ for(DOMDataTreeIdentifier listenPath : listen) {
+ for (DOMDataTreeIdentifier writePath : writes) {
+ if (listenPath.contains(writePath)) {
+ throw new DOMDataTreeLoopException(String.format(
+ "Listener must not listen on parent (%s), and also writes child (%s)", listenPath,
+ writePath));
+ } else if (writePath.contains(listenPath)) {
+ throw new DOMDataTreeLoopException(
+ String.format("Listener must not write parent (%s), and also listen on child (%s)",
+ writePath, listenPath));
+ }
+ }
+ }
+ }
+
void removeListener(ShardedDOMDataTreeListenerContext<?> listener) {
// FIXME: detach producers
listener.close();
Collection<DataTreeCandidate> changesToNotify = unreported;
unreported = new ArrayList<>();
listener.onDataTreeChanged(changesToNotify, currentData);
-
}
void register(DOMDataTreeIdentifier subtree, DOMStoreTreeChangePublisher shard) {
reg.close();
}
}
+
+ DOMDataTreeListener getListener() {
+ return listener;
+ }
}
@GuardedBy("this")
private boolean closed;
+ @GuardedBy("this")
+ private ShardedDOMDataTreeListenerContext<?> attachedListener;
+
ShardedDOMDataTreeProducer(final ShardedDOMDataTree dataTree, final Map<DOMDataTreeIdentifier, DOMDataTreeShard> shardMap, final Set<DOMDataTreeShard> shards) {
this.dataTree = Preconditions.checkNotNull(dataTree);
Preconditions.checkState(openTx.equals(transaction));
openTx = null;
}
+
+ synchronized void boundToListener(ShardedDOMDataTreeListenerContext<?> listener) {
+ // FIXME: Add option to dettach
+ Preconditions.checkState(this.attachedListener == null,
+ "Producer %s is already attached to other listener.",
+ listener.getListener());
+ this.attachedListener = listener;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the terms of the Eclipse
+ * Public License v1.0 which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.broker.test;
+
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+class MockingUtilities {
+
+ private MockingUtilities() {
+ throw new UnsupportedOperationException("Utility class");
+ }
+
+ public static <T> T mock(Class<T> type, String toString) {
+ T mock = Mockito.mock(type);
+ Mockito.doReturn(toString).when(mock).toString();
+ return mock;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static <T, F extends T> ArgumentCaptor<F> captorFor(Class<T> rawClass) {
+ return (ArgumentCaptor) ArgumentCaptor.forClass(rawClass);
+ }
+}
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.opendaylight.mdsal.dom.broker.test.MockingUtilities.captorFor;
import java.util.Collection;
import java.util.Collections;
public void receiveChangeEvent() throws DOMDataTreeLoopException {
ArgumentCaptor<DOMDataTreeChangeListener> storeListener =
ArgumentCaptor.forClass(DOMDataTreeChangeListener.class);
-
treeService.registerListener(listener, SUBTREES_TEST, true, Collections.<DOMDataTreeProducer>emptyList());
verify(rootShard, times(1)).registerTreeChangeListener(eq(TEST_ID.getRootIdentifier()),
storeListener.capture());
assertEquals(TEST_CONTAINER, receivedMap.get(TEST_ID));
}
- @SuppressWarnings("unchecked")
- private static <T, F extends T> ArgumentCaptor<F> captorFor(Class<T> rawClass) {
- return (ArgumentCaptor) ArgumentCaptor.forClass(rawClass);
- }
-
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.broker.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.mdsal.dom.broker.test.MockingUtilities.captorFor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.mdsal.dom.broker.test.util.TestModel;
+import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+public class ShardedDOMDataTreeListenerWithProducerTest {
+
+
+ private static final DOMDataTreeIdentifier ROOT_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+ YangInstanceIdentifier.EMPTY);
+ private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+ TestModel.TEST_PATH);
+
+ private static final DOMDataTreeIdentifier TEST2_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+ TestModel.TEST2_PATH);
+
+
+ private static final Collection<DOMDataTreeIdentifier> SUBTREES_ROOT = Collections.singleton(ROOT_ID);
+ private static final Collection<DOMDataTreeIdentifier> SUBTREES_TEST = Collections.singleton(TEST_ID);
+ private static final Collection<DOMDataTreeIdentifier> SUBTREES_TEST2 = Collections.singleton(TEST2_ID);
+ private static final ContainerNode TEST_CONTAINER = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ private interface ListenableShard extends DOMDataTreeShard, DOMStoreTreeChangePublisher, DOMStore {
+
+
+ }
+
+ @Mock(name = "rootShard")
+ private ListenableShard rootShard;
+
+ @Mock(name = "childShard")
+ private ListenableShard childShard;
+
+ @Mock
+ private ListenerRegistration<?> storeListenerReg;
+
+ @Mock(name = "storeWriteTx")
+ private DOMStoreWriteTransaction writeTxMock;
+
+ @Mock(name = "storeTxChain")
+ private DOMStoreTransactionChain txChainMock;
+
+ private DOMDataTreeService treeService;
+
+ private ListenerRegistration<ListenableShard> shardReg;
+
+ @Before
+ public void setUp() throws DOMDataTreeShardingConflictException {
+ MockitoAnnotations.initMocks(this);
+ final ShardedDOMDataTree impl = new ShardedDOMDataTree();
+ treeService = impl;
+ shardReg = impl.registerDataTreeShard(ROOT_ID, rootShard);
+ doReturn("rootShard").when(rootShard).toString();
+ doReturn("childShard").when(childShard).toString();
+
+ doReturn(txChainMock).when(rootShard).createTransactionChain();
+ doReturn(writeTxMock).when(txChainMock).newWriteOnlyTransaction();
+ doReturn(TestCommitCohort.ALLWAYS_SUCCESS).when(writeTxMock).ready();
+
+ doReturn(storeListenerReg).when(rootShard).registerTreeChangeListener(any(YangInstanceIdentifier.class),
+ any(DOMDataTreeChangeListener.class));
+ doNothing().when(storeListenerReg).close();
+ }
+
+ @Test
+ public void registerListenerWithOneProducer() throws DOMDataTreeLoopException {
+ DOMDataTreeListener listener = Mockito.mock(DOMDataTreeListener.class);
+ DOMDataTreeProducer producer = treeService.createProducer(SUBTREES_TEST2);
+ treeService.registerListener(listener, SUBTREES_TEST, true, Collections.singleton(producer));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void registerListenerWithAlreadyBoundProducer() throws DOMDataTreeLoopException {
+ DOMDataTreeListener listener1 = MockingUtilities.mock(DOMDataTreeListener.class, "listener1");
+ DOMDataTreeProducer producer = treeService.createProducer(SUBTREES_TEST2);
+ treeService.registerListener(listener1, SUBTREES_TEST, true, Collections.singleton(producer));
+
+ DOMDataTreeListener listener2 = MockingUtilities.mock(DOMDataTreeListener.class, "listener2");
+ treeService.registerListener(listener2, SUBTREES_TEST, true, Collections.singleton(producer));
+ }
+
+ @Test(expected = DOMDataTreeLoopException.class)
+ public void loopSameSubtree() throws DOMDataTreeLoopException {
+ DOMDataTreeListener listener = Mockito.mock(DOMDataTreeListener.class);
+ DOMDataTreeProducer producer = treeService.createProducer(SUBTREES_TEST);
+ treeService.registerListener(listener, SUBTREES_TEST, true, Collections.singleton(producer));
+ }
+
+ @Test(expected = DOMDataTreeLoopException.class)
+ public void loopListenParentWritesChild() throws DOMDataTreeLoopException {
+ DOMDataTreeListener listener = Mockito.mock(DOMDataTreeListener.class);
+ DOMDataTreeProducer producer = treeService.createProducer(SUBTREES_TEST);
+ treeService.registerListener(listener, SUBTREES_ROOT, true, Collections.singleton(producer));
+ }
+
+ @Test(expected = DOMDataTreeLoopException.class)
+ public void loopListenChildWritesParent() throws DOMDataTreeLoopException {
+ DOMDataTreeListener listener = Mockito.mock(DOMDataTreeListener.class);
+ DOMDataTreeProducer producer = treeService.createProducer(SUBTREES_ROOT);
+ treeService.registerListener(listener, SUBTREES_ROOT, true, Collections.singleton(producer));
+ }
+
+ @Test
+ public void receiveChangeEvent() throws DOMDataTreeLoopException {
+ DOMDataTreeListener listener = Mockito.mock(DOMDataTreeListener.class);
+ ArgumentCaptor<DOMDataTreeChangeListener> storeListener =
+ ArgumentCaptor.forClass(DOMDataTreeChangeListener.class);
+ treeService.registerListener(listener, SUBTREES_TEST, true, Collections.<DOMDataTreeProducer>emptyList());
+ verify(rootShard, times(1)).registerTreeChangeListener(eq(TEST_ID.getRootIdentifier()),
+ storeListener.capture());
+
+ DataTreeCandidate sentStoreCandidate =
+ DataTreeCandidates.fromNormalizedNode(TEST_ID.getRootIdentifier(), TEST_CONTAINER);
+ Collection<DataTreeCandidate> changes = Collections.singleton(sentStoreCandidate);
+
+ doNothing().when(listener).onDataTreeChanged(Mockito.<Collection<DataTreeCandidate>>any(), Mockito.anyMap());
+ storeListener.getValue().onDataTreeChanged(changes);
+
+ ArgumentCaptor<Collection<DataTreeCandidate>> candidateCapture = captorFor(Collection.class);
+ ArgumentCaptor<Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>>> mapCapture = captorFor(Map.class);
+ verify(listener, times(1)).onDataTreeChanged(candidateCapture.capture(), mapCapture.capture());
+
+ Collection<DataTreeCandidate> receivedCandidate = candidateCapture.getValue();
+ Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> receivedMap = mapCapture.getValue();
+
+ assertNotNull("receivedCandidate", receivedCandidate);
+ assertNotNull("receivedMap", receivedMap);
+ assertFalse("candidate collection must not be empty", receivedCandidate.isEmpty());
+ assertEquals(1, receivedCandidate.size());
+ DataTreeCandidate firstItem = receivedCandidate.iterator().next();
+ assertEquals(TEST_ID.getRootIdentifier(), firstItem.getRootPath());
+ assertEquals(TEST_CONTAINER, receivedMap.get(TEST_ID));
+ }
+
+}
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
import java.util.Collections;
import org.junit.Before;
import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
import org.opendaylight.mdsal.dom.broker.test.util.TestModel;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
private static final Collection<DOMDataTreeIdentifier> SUBTREES_ROOT = Collections.singleton(ROOT_ID);
private static final Collection<DOMDataTreeIdentifier> SUBTREES_TEST = Collections.singleton(TEST_ID);
- private static final DOMStoreThreePhaseCommitCohort ALLWAYS_SUCCESS = new DOMStoreThreePhaseCommitCohort() {
-
- @Override
- public ListenableFuture<Void> preCommit() {
- return Futures.immediateFuture(null);
- }
-
- @Override
- public ListenableFuture<Void> commit() {
- return Futures.immediateFuture(null);
- }
-
- @Override
- public ListenableFuture<Boolean> canCommit() {
- return Futures.immediateFuture(Boolean.TRUE);
- }
-
- @Override
- public ListenableFuture<Void> abort() {
- return Futures.immediateFuture(null);
- }
- };
-
interface MockTestShard extends DOMDataTreeShard, DOMStore {
}
-
@Mock(name = "rootShard")
private MockTestShard rootShard;
@Mock(name = "storeTxChain")
private DOMStoreTransactionChain txChainMock;
-
-
private DOMDataTreeService treeService;
private ListenerRegistration<MockTestShard> shardReg;
private DOMDataTreeProducer producer;
-
-
-
@Before
public void setUp() throws DOMDataTreeShardingConflictException {
MockitoAnnotations.initMocks(this);
doReturn("rootShard").when(rootShard).toString();
doReturn(txChainMock).when(rootShard).createTransactionChain();
doReturn(writeTxMock).when(txChainMock).newWriteOnlyTransaction();
- doReturn(ALLWAYS_SUCCESS).when(writeTxMock).ready();
+ doReturn(TestCommitCohort.ALLWAYS_SUCCESS).when(writeTxMock).ready();
producer = treeService.createProducer(SUBTREES_ROOT);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.broker.test;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+
+public enum TestCommitCohort implements DOMStoreThreePhaseCommitCohort {
+
+
+ ALLWAYS_SUCCESS(true, true, true, true), CAN_COMMIT_FAILED(false, false, false, true), PRE_COMMIT_FAILED(true,
+ false, false, true), COMMIT_FAILED(true, true, false, true);
+ ;
+
+
+
+ private TestCommitCohort(boolean canCommit, boolean preCommit, boolean commit, boolean abort) {
+ this.canCommit = Futures.immediateFuture(canCommit);
+ this.preCommit = immediate(canCommit, new IllegalStateException());
+ this.commit = immediate(commit, new IllegalStateException());
+ this.abort = immediate(abort, new IllegalStateException());
+ }
+
+
+ private final ListenableFuture<Boolean> canCommit;
+ private final ListenableFuture<Void> preCommit;
+ private final ListenableFuture<Void> commit;
+ private final ListenableFuture<Void> abort;
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return canCommit;
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return preCommit;
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return abort;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ return commit;
+ }
+
+ private static ListenableFuture<Void> immediate(boolean isSuccess, Exception e) {
+ return isSuccess ? Futures.<Void>immediateFuture(null) : Futures.<Void>immediateFailedFuture(e);
+ }
+
+
+
+}