From a222d4e5d6928065adf09a16c112c73221b8403f Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Fri, 29 Apr 2016 11:24:09 +0200 Subject: [PATCH] BUG 5697 : Consumer support in shards Change-Id: Ida05e5553421273c8d3bd5dcd74eaf0b1d3f6fb4 Signed-off-by: Tomas Cere --- ...rdedDOMDataTreeProducerMultiShardTest.java | 191 +++++++++++++---- .../AbstractDOMShardTreeChangePublisher.java | 201 ++++++++++++++++++ .../dom/store/inmemory/ChildShardContext.java | 30 +++ .../inmemory/InMemoryDOMDataTreeShard.java | 49 +++-- ...MemoryDOMDataTreeShardChangePublisher.java | 75 +++++++ ...OMDataTreeShardThreePhaseCommitCohort.java | 17 +- ...emoryDOMDataTreeShardWriteTransaction.java | 9 +- .../ReadableWriteableDOMDataTreeShard.java | 19 ++ .../ShardCanCommitCoordinationTask.java | 2 +- .../inmemory/ShardCommitCoordinationTask.java | 2 +- .../dom/spi/DOMDataTreePrefixTableEntry.java | 7 +- .../AbstractDOMStoreTreeChangePublisher.java | 2 +- 12 files changed, 522 insertions(+), 82 deletions(-) create mode 100644 dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/AbstractDOMShardTreeChangePublisher.java create mode 100644 dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ChildShardContext.java create mode 100644 dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardChangePublisher.java create mode 100644 dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ReadableWriteableDOMDataTreeShard.java diff --git a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/ShardedDOMDataTreeProducerMultiShardTest.java b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/ShardedDOMDataTreeProducerMultiShardTest.java index f8d4b249d1..c108d1446d 100644 --- a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/ShardedDOMDataTreeProducerMultiShardTest.java +++ b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/ShardedDOMDataTreeProducerMultiShardTest.java @@ -1,18 +1,34 @@ package org.opendaylight.mdsal.dom.broker.test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyMap; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.util.concurrent.Futures; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree; import org.opendaylight.mdsal.dom.broker.test.util.TestModel; @@ -27,89 +43,132 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.parser.spi.meta.ReactorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ShardedDOMDataTreeProducerMultiShardTest { - private SchemaContext schemaContext; + private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTreeProducerMultiShardTest.class); - private static final DOMDataTreeIdentifier ROOT_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, - YangInstanceIdentifier.EMPTY); - private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, - TestModel.TEST_PATH); + private static SchemaContext schemaContext = null; - private static final DOMDataTreeIdentifier TEST2_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, - TestModel.TEST2_PATH); + static { + try { + schemaContext = TestModel.createTestContext(); + } catch (ReactorException e) { + LOG.error("Unable to create schema context for TestModel", e); + } + } - private static final DOMDataTreeIdentifier INNER_CONTAINER_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.INNER_CONTAINER_PATH); - private static final DOMDataTreeIdentifier ANOTHER_SHARD_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.ANOTHER_SHARD_PATH); + private static final DOMDataTreeIdentifier ROOT_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY); + private static final DOMDataTreeIdentifier TEST_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH); - private InMemoryDOMDataTreeShard rootShard; + private static final DOMDataTreeIdentifier TEST2_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.TEST2_PATH); - private InMemoryDOMDataTreeShard anotherInnerShard; + private static final DOMDataTreeIdentifier INNER_CONTAINER_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.INNER_CONTAINER_PATH); + private static final DOMDataTreeIdentifier ANOTHER_SHARD_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.ANOTHER_SHARD_PATH); + private InMemoryDOMDataTreeShard rootShard; + private InMemoryDOMDataTreeShard anotherInnerShard; private ShardedDOMDataTree dataTreeService; private ListenerRegistration rootShardReg; private ListenerRegistration innerShardReg; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @Captor + private ArgumentCaptor> captorForChanges; + @Captor + private ArgumentCaptor>> captorForSubtrees; + + private final ContainerNode crossShardContainer = createCrossShardContainer(); + @Before public void setUp() throws Exception { - schemaContext = TestModel.createTestContext(); + MockitoAnnotations.initMocks(this); - rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID); + rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 5000); rootShard.onGlobalContextUpdated(schemaContext); - ShardedDOMDataTree dataTree = new ShardedDOMDataTree(); + final ShardedDOMDataTree dataTree = new ShardedDOMDataTree(); rootShardReg = dataTree.registerDataTreeShard(ROOT_ID, rootShard); dataTreeService = dataTree; } + @Test + public void testSingleShardListener() throws Exception { + final DOMDataTreeListener mockedDataTreeListener = Mockito.mock(DOMDataTreeListener.class); + doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap()); + + dataTreeService.registerListener(mockedDataTreeListener, Collections.singletonList(INNER_CONTAINER_ID), true, Collections.emptyList()); + + final DOMDataTreeShardProducer producer = rootShard.createProducer(Collections.singletonList(TEST_ID)); + final DOMDataTreeShardWriteTransaction transaction = producer.createTransaction(); + writeCrossShardContainer(transaction); + + verify(mockedDataTreeListener, timeout(1000)).onDataTreeChanged(captorForChanges.capture(), captorForSubtrees.capture()); + final Collection capturedValue = captorForChanges.getValue(); + assertTrue(capturedValue.size() == 1); + + final ContainerNode dataAfter = (ContainerNode) capturedValue.iterator().next().getRootNode().getDataAfter().get(); + assertEquals(crossShardContainer.getChild(TestModel.INNER_CONTAINER_PATH.getLastPathArgument()).get(), dataAfter); + + final Map> capturedSubtrees = captorForSubtrees.getValue(); + assertTrue(capturedSubtrees.size() == 1); + assertTrue(capturedSubtrees.containsKey(INNER_CONTAINER_ID)); + assertEquals(crossShardContainer.getChild(TestModel.INNER_CONTAINER_PATH.getLastPathArgument()).get(), capturedSubtrees.get(INNER_CONTAINER_ID)); + + verifyNoMoreInteractions(mockedDataTreeListener); + } + @Test public void testMultipleShards() throws Exception { - //FIXME after listeners are implemented add them here and test those + final DOMDataTreeListener mockedDataTreeListener = Mockito.mock(DOMDataTreeListener.class); + doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap()); - final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID); + final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID, executor, 5000); innerShard.onGlobalContextUpdated(schemaContext); innerShardReg = dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, innerShard); + dataTreeService.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID), true, Collections.emptyList()); + final DOMDataTreeShardProducer producer = rootShard.createProducer(Collections.singletonList(TEST_ID)); final DOMDataTreeShardWriteTransaction transaction = producer.createTransaction(); - final DOMDataTreeWriteCursor cursor = transaction.createCursor(ROOT_ID); - cursor.enter(TestModel.TEST_PATH.getLastPathArgument()); + writeCrossShardContainer(transaction); - final LeafNode shardedValue1 = - ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_1)).withValue("sharded value 1").build(); - final LeafNode shardedValue2 = - ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_2)).withValue("sharded value 2").build(); + final ContainerNode testContainerVerificationNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)) + .build(); - final DataContainerNodeAttrBuilder containerNodeBuilder = ImmutableContainerNodeBuilder.create(); - final ContainerNode containerNode = - containerNodeBuilder - .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_CONTAINER)) - .withChild(shardedValue1) - .withChild(shardedValue2) - .build(); + //verify listeners have been notified + verify(mockedDataTreeListener, timeout(1000).times(2)).onDataTreeChanged(captorForChanges.capture(), captorForSubtrees.capture()); + final List> capturedChanges = captorForChanges.getAllValues(); + final List>> capturedSubtrees = captorForSubtrees.getAllValues(); + final DataTreeCandidate firstNotificationCandidate = capturedChanges.get(0).iterator().next(); - cursor.write(TestModel.INNER_CONTAINER_PATH.getLastPathArgument(), containerNode); - cursor.enter(TestModel.INNER_CONTAINER_PATH.getLastPathArgument()); + assertTrue(capturedSubtrees.get(0).size() == 1); + assertEquals(testContainerVerificationNode, firstNotificationCandidate.getRootNode().getDataAfter().get()); + assertEquals(testContainerVerificationNode, capturedSubtrees.get(0).get(TEST_ID)); - final ContainerNode lowerShardContainer = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_CONTAINER)) - .withChild(ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_VALUE)).build()) - .build(); - - cursor.write(TestModel.ANOTHER_SHARD_PATH.getLastPathArgument(), lowerShardContainer); - cursor.close(); - transaction.ready(); - transaction.submit().get(); + final DataTreeCandidate secondNotificationCandidate = capturedChanges.get(1).iterator().next(); + assertTrue(capturedSubtrees.get(1).size() == 1); + assertEquals(crossShardContainer, secondNotificationCandidate.getRootNode().getDataAfter().get()); + assertEquals(crossShardContainer, capturedSubtrees.get(1).get(TEST_ID)); - //verify listeners have been notified + verifyNoMoreInteractions(mockedDataTreeListener); } @Test @@ -164,10 +223,52 @@ public class ShardedDOMDataTreeProducerMultiShardTest { transaction.ready(); transaction.submit().get(); - verify(mockedTx).ready(); - verify(mockedTx).validate(); - verify(mockedTx).prepare(); - verify(mockedTx).commit(); + final InOrder inOrder = inOrder(mockedTx); + inOrder.verify(mockedTx).ready(); + inOrder.verify(mockedTx).validate(); + inOrder.verify(mockedTx).prepare(); + inOrder.verify(mockedTx).commit(); } + + private ContainerNode createCrossShardContainer() { + final LeafNode shardedValue1 = + ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_1)).withValue("sharded value 1").build(); + final LeafNode shardedValue2 = + ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_2)).withValue("sharded value 2").build(); + + + final ContainerNode lowerShardContainer = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_CONTAINER)) + .withChild(ImmutableLeafNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_VALUE)) + .withValue("testing-value") + .build()) + .build(); + + final ContainerNode containerNode = + ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_CONTAINER)) + .withChild(shardedValue1) + .withChild(shardedValue2) + .withChild(lowerShardContainer) + .build(); + + final ContainerNode testContainer = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(containerNode) + .build(); + + return testContainer; + } + + private void writeCrossShardContainer(final DOMDataTreeShardWriteTransaction transaction) throws Exception{ + final DOMDataTreeWriteCursor cursor = transaction.createCursor(ROOT_ID); + + cursor.write(TestModel.TEST_PATH.getLastPathArgument(), crossShardContainer); + + cursor.close(); + transaction.ready(); + transaction.submit().get(); + } } diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/AbstractDOMShardTreeChangePublisher.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/AbstractDOMShardTreeChangePublisher.java new file mode 100644 index 0000000000..fc4ad229d9 --- /dev/null +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/AbstractDOMShardTreeChangePublisher.java @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.store.inmemory; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; +import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; +import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode; +import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTreeChangePublisher; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class AbstractDOMShardTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher implements DOMStoreTreeChangePublisher { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractDOMShardTreeChangePublisher.class); + + private YangInstanceIdentifier shardPath; + private final Map childShards; + private final DataTree dataTree; + + protected AbstractDOMShardTreeChangePublisher(final DataTree dataTree, + final YangInstanceIdentifier shardPath, + final Map childShards) { + this.dataTree = Preconditions.checkNotNull(dataTree); + this.shardPath = Preconditions.checkNotNull(shardPath); + this.childShards = Preconditions.checkNotNull(childShards); + } + + @Override + public AbstractDOMDataTreeChangeListenerRegistration registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) { + takeLock(); + try { + return setupListenerContext(path, listener); + } finally { + releaseLock(); + } + } + + private AbstractDOMDataTreeChangeListenerRegistration setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) { + // we need to register the listener registration path based on the shards root + // we have to strip the shard path from the listener path and then register + YangInstanceIdentifier strippedIdentifier = listenerPath; + if (!shardPath.isEmpty()) { + strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(listenerPath)); + } + + final DOMDataTreeListenerWithSubshards subshardListener = new DOMDataTreeListenerWithSubshards(dataTree, strippedIdentifier, listener); + final AbstractDOMDataTreeChangeListenerRegistration reg = setupContextWithoutSubshards(strippedIdentifier, subshardListener); + + for (final ChildShardContext maybeAffected : childShards.values()) { + if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) { + // consumer has a subshard somewhere on lower level + // register to the notification manager with snapshot and forward child notifications to parent + LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath); + subshardListener.addSubshard(maybeAffected); + } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) { + // bind path is inside subshard + // TODO can this happen? seems like in ShardedDOMDataTree we are already registering to the lowest shard possible + throw new UnsupportedOperationException("Listener should be registered directly into a subshard"); + } + } + return reg; + } + + private AbstractDOMDataTreeChangeListenerRegistration setupContextWithoutSubshards(final YangInstanceIdentifier listenerPath, final DOMDataTreeListenerWithSubshards listener) { + LOG.debug("Registering root listener at {}", listenerPath); + final RegistrationTreeNode> node = findNodeFor(listenerPath.getPathArguments()); + final AbstractDOMDataTreeChangeListenerRegistration registration = new AbstractDOMDataTreeChangeListenerRegistration((L) listener) { + @Override + protected void removeRegistration() { + listener.close(); + AbstractDOMShardTreeChangePublisher.this.removeRegistration(node, this); + registrationRemoved(this); + } + }; + addRegistration(node, registration); + return registration; + } + + private Iterable stripShardPath(final YangInstanceIdentifier listenerPath) { + if (shardPath.isEmpty()) { + return listenerPath.getPathArguments(); + } + + final List listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments()); + final Iterator shardIter = shardPath.getPathArguments().iterator(); + final Iterator listenerIter = listenerPathArgs.iterator(); + + while (shardIter.hasNext()) { + if (shardIter.next().equals(listenerIter.next())) { + listenerIter.remove(); + } else { + break; + } + } + + return listenerPathArgs; + } + + private static final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener { + + // TODO should we synchronize the access to the dataTree snapshots? + private final DataTree dataTree; + private final YangInstanceIdentifier listenerPath; + private final DOMDataTreeChangeListener delegate; + + private final Map> registrations = + new HashMap<>(); + + DOMDataTreeListenerWithSubshards(final DataTree dataTree, + final YangInstanceIdentifier listenerPath, + final DOMDataTreeChangeListener delegate) { + this.dataTree = Preconditions.checkNotNull(dataTree); + this.listenerPath = Preconditions.checkNotNull(listenerPath); + this.delegate = Preconditions.checkNotNull(delegate); + } + + @Override + public void onDataTreeChanged(@Nonnull final Collection changes) { + LOG.debug("Received data changed {}", changes.iterator().next()); + final DataTreeCandidate newCandidate = applyChanges(changes); + delegate.onDataTreeChanged(Collections.singleton(newCandidate)); + } + + void onDataTreeChanged(final YangInstanceIdentifier rootPath, final Collection changes) { + onDataTreeChanged(changes.stream() + .map(candidate -> DataTreeCandidates.newDataTreeCandidate(rootPath, candidate.getRootNode())) + .collect(Collectors.toList())); + } + + void addSubshard(final ChildShardContext context) { + Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher, + "All subshards that are a part of ListenerContext need to be listenable"); + + final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard(); + // since this is going into subshard we want to listen for ALL changes in the subshard + registrations.put(context.getPrefix().getRootIdentifier(), + listenableShard.registerTreeChangeListener(context.getPrefix().getRootIdentifier(), + changes -> onDataTreeChanged(context.getPrefix().getRootIdentifier(), changes))); + } + + void close() { + for (ListenerRegistration registration : registrations.values()) { + registration.close(); + } + registrations.clear(); + } + + private DataTreeCandidate applyChanges(final Collection changes) { + final DataTreeModification modification = dataTree.takeSnapshot().newModification(); + for (final DataTreeCandidate change : changes) { + DataTreeCandidates.applyToModification(modification, change); + } + + modification.ready(); + try { + dataTree.validate(modification); + } catch (DataValidationFailedException e) { + LOG.error("Validation failed for built modification", e); + throw new RuntimeException("Notification validation failed", e); + } + + // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree + DataTreeCandidateNode modifiedChild = dataTree.prepare(modification).getRootNode(); + for (final PathArgument pathArgument : listenerPath.getPathArguments()) { + // there should be no null pointers since we wouldn't get a notification change + // if there was no node modified at the listener's location + modifiedChild = modifiedChild.getModifiedChild(pathArgument); + } + + return DataTreeCandidates.newDataTreeCandidate(listenerPath, modifiedChild); + } + } +} \ No newline at end of file diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ChildShardContext.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ChildShardContext.java new file mode 100644 index 0000000000..3923681125 --- /dev/null +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ChildShardContext.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.store.inmemory; + +import com.google.common.base.Preconditions; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; + +final class ChildShardContext { + private final WriteableDOMDataTreeShard shard; + private final DOMDataTreeIdentifier prefix; + + ChildShardContext(final DOMDataTreeIdentifier prefix, final WriteableDOMDataTreeShard shard) { + this.prefix = Preconditions.checkNotNull(prefix); + this.shard = Preconditions.checkNotNull(shard); + } + + WriteableDOMDataTreeShard getShard() { + return shard; + } + + DOMDataTreeIdentifier getPrefix() { + return prefix; + } +} diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java index 5ccc8e4f3b..4a48a6471a 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java @@ -17,10 +17,15 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nonnull; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; @@ -29,7 +34,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; @Beta -public class InMemoryDOMDataTreeShard implements WriteableDOMDataTreeShard, SchemaContextListener { +public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener { private static final class SubshardProducerSpecification { private final Collection prefixes = new ArrayList<>(1); @@ -52,39 +57,27 @@ public class InMemoryDOMDataTreeShard implements WriteableDOMDataTreeShard, Sche } } - private static final class ChildShardContext { - private final WriteableDOMDataTreeShard shard; - private final DOMDataTreeIdentifier prefix; - - public ChildShardContext(final DOMDataTreeIdentifier prefix, final WriteableDOMDataTreeShard shard) { - this.prefix = Preconditions.checkNotNull(prefix); - this.shard = Preconditions.checkNotNull(shard); - } - - public WriteableDOMDataTreeShard getShard() { - return shard; - } - - public DOMDataTreeIdentifier getPrefix() { - return prefix; - } - } - private final DOMDataTreePrefixTable childShardsTable = DOMDataTreePrefixTable.create(); + private final Map childShards = new HashMap<>(); private final DOMDataTreeIdentifier prefix; private final DataTree dataTree; - private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix) { + private InMemoryDOMDataTreeShardChangePublisher shardChangePublisher; + + private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor, + final int maxDataChangeListenerQueueSize) { this.prefix = Preconditions.checkNotNull(prefix); final TreeType treeType = treeTypeFor(prefix.getDatastoreType()); - this.dataTree = prefix.getRootIdentifier().isEmpty() ? InMemoryDataTreeFactory.getInstance().create(treeType) - : InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier()); + this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier()); + + this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor, maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards); } - public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id) { - return new InMemoryDOMDataTreeShard(id); + public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id, final ExecutorService dataTreeChangeExecutor, + final int maxDataChangeListenerQueueSize) { + return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize); } @Override @@ -113,6 +106,12 @@ public class InMemoryDOMDataTreeShard implements WriteableDOMDataTreeShard, Sche return new InMemoryDOMDataTreeShardProducer(this, prefixes); } + @Nonnull + @Override + public ListenerRegistration registerTreeChangeListener(@Nonnull YangInstanceIdentifier treeId, @Nonnull L listener) { + return shardChangePublisher.registerTreeChangeListener(treeId, listener); + } + private void addChildShard(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) { ChildShardContext context = createContextFor(prefix, child); childShards.put(prefix, context); @@ -212,6 +211,6 @@ public class InMemoryDOMDataTreeShard implements WriteableDOMDataTreeShard, Sche builder.addSubshard(spec.getPrefix(), foreignContext); } - return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree); + return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree, shardChangePublisher); } } diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardChangePublisher.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardChangePublisher.java new file mode 100644 index 0000000000..90c05f649c --- /dev/null +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardChangePublisher.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.store.inmemory; + +import com.google.common.collect.ImmutableList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nonnull; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class InMemoryDOMDataTreeShardChangePublisher extends AbstractDOMShardTreeChangePublisher { + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShardChangePublisher.class); + + private static final Invoker, DataTreeCandidate> MANAGER_INVOKER = + (listener, notification) -> { + final DOMDataTreeChangeListener inst = listener.getInstance(); + if (inst != null) { + inst.onDataTreeChanged(ImmutableList.of(notification)); + } + }; + + private final QueuedNotificationManager, DataTreeCandidate> notificationManager; + + InMemoryDOMDataTreeShardChangePublisher(final ExecutorService executorService, + final int maxQueueSize, + final DataTree dataTree, + final YangInstanceIdentifier rootPath, + final Map childShards) { + super(dataTree, rootPath, childShards); + notificationManager = new QueuedNotificationManager<>(executorService, MANAGER_INVOKER, maxQueueSize, "DataTreeChangeListenerQueueMgr"); + } + + @Override + protected void notifyListeners(@Nonnull Collection> registrations, @Nonnull YangInstanceIdentifier path, @Nonnull DataTreeCandidateNode node) { + final DataTreeCandidate candidate = DataTreeCandidates.newDataTreeCandidate(path, node); + + for (AbstractDOMDataTreeChangeListenerRegistration reg : registrations) { + LOG.debug("Enqueueing candidate {} to registration {}", candidate, registrations); + notificationManager.submitNotification(reg, candidate); + } + } + + @Override + protected void registrationRemoved(@Nonnull AbstractDOMDataTreeChangeListenerRegistration registration) { + LOG.debug("Closing registration {}", registration); + + } + + public AbstractDOMDataTreeChangeListenerRegistration registerTreeChangeListener(YangInstanceIdentifier path, L listener) { + return super.registerTreeChangeListener(path, listener); + } + + synchronized void publishChange(@Nonnull final DataTreeCandidate candidate) { + processCandidateTree(candidate); + } +} diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohort.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohort.java index 8c69c59a65..5f20089c5c 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohort.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohort.java @@ -11,6 +11,7 @@ package org.opendaylight.mdsal.dom.store.inmemory; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collections; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; @@ -29,19 +30,21 @@ class InMemoryDOMDataTreeShardThreePhaseCommitCohort implements DOMStoreThreePha private final DataTree dataTree; private final DataTreeModification modification; private DataTreeCandidate candidate; + private final InMemoryDOMDataTreeShardChangePublisher changePublisher; InMemoryDOMDataTreeShardThreePhaseCommitCohort(final DataTree dataTree, - final DataTreeModification modification) { - Preconditions.checkNotNull(dataTree); - this.dataTree = dataTree; - this.modification = modification; + final DataTreeModification modification, + final InMemoryDOMDataTreeShardChangePublisher changePublisher) { + this.dataTree = Preconditions.checkNotNull(dataTree); + this.modification = Preconditions.checkNotNull(modification); + this.changePublisher = Preconditions.checkNotNull(changePublisher); } @Override public ListenableFuture canCommit() { try { dataTree.validate(modification); - LOG.debug("DataTreeModification {} validated"); + LOG.debug("DataTreeModification {} validated", modification); return CAN_COMMIT_FUTURE; } catch (DataValidationFailedException e) { @@ -59,6 +62,7 @@ class InMemoryDOMDataTreeShardThreePhaseCommitCohort implements DOMStoreThreePha public ListenableFuture preCommit() { try { candidate = dataTree.prepare(modification); + LOG.debug("DataTreeModification {} prepared", modification); return SUCCESSFUL_FUTURE; } catch (Exception e) { LOG.warn("Unexpected failure in preparation phase", e); @@ -75,7 +79,10 @@ class InMemoryDOMDataTreeShardThreePhaseCommitCohort implements DOMStoreThreePha @Override public ListenableFuture commit() { Preconditions.checkState(candidate != null, "Attempted to commit an aborted transaction"); + LOG.debug("Commiting candidate {}", candidate); dataTree.commit(candidate); + // publish this change for listeners + changePublisher.publishChange(candidate); return SUCCESSFUL_FUTURE; } } diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java index 4634d4f704..09e07b5277 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java @@ -78,20 +78,23 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT } } - private InMemoryDOMDataTreeShardThreePhaseCommitCohort commitCohort; private final ShardDataModification modification; private DOMDataTreeWriteCursor cursor; private DataTree rootShardDataTree; private DataTreeModification rootModification = null; private ArrayList cohorts = new ArrayList<>(); + private InMemoryDOMDataTreeShardChangePublisher changePublisher; // FIXME inject into shard? private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root, final DataTree rootShardDataTree) { + InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root, + final DataTree rootShardDataTree, + final InMemoryDOMDataTreeShardChangePublisher changePublisher) { this.modification = Preconditions.checkNotNull(root); this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree); + this.changePublisher = Preconditions.checkNotNull(changePublisher); } private DOMDataTreeWriteCursor getCursor() { @@ -149,7 +152,7 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT LOG.debug("Readying open transaction on shard {}", modification.getPrefix()); rootModification = modification.seal(); - cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification)); + cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification, changePublisher)); for (Entry entry : modification.getChildShards().entrySet()) { cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue())); } diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ReadableWriteableDOMDataTreeShard.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ReadableWriteableDOMDataTreeShard.java new file mode 100644 index 0000000000..7a5796f1c5 --- /dev/null +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ReadableWriteableDOMDataTreeShard.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.store.inmemory; + +import com.google.common.annotations.Beta; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher; + +/** + * Marker interface for readable/writeable DOMDataTreeShard + */ +@Beta +public interface ReadableWriteableDOMDataTreeShard extends DOMStoreTreeChangePublisher, WriteableDOMDataTreeShard { +} diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCanCommitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCanCommitCoordinationTask.java index 9c2f5a02f7..766650a330 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCanCommitCoordinationTask.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCanCommitCoordinationTask.java @@ -51,7 +51,7 @@ class ShardCanCommitCoordinationTask implements Callable { try { final Boolean result = (Boolean)canCommit.get(); if (result == null || !result) { - throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available."); + throw new TransactionCommitFailedException("CanCommit failed, no detailed cause available."); } } catch (InterruptedException | ExecutionException e) { throw new TransactionCommitFailedException("CanCommit failed", e); diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java index 5dc6cad03d..7631821219 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java @@ -40,7 +40,7 @@ class ShardCommitCoordinationTask implements Callable { return null; } catch (TransactionCommitFailedException e) { - LOG.warn("Shard: {} Submit Error during phase {}, starting Abort", rootShardPrefix, e); + LOG.warn("Shard: {} Submit Error during phase Commit, starting Abort", rootShardPrefix, e); //FIXME abort here throw e; } diff --git a/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/DOMDataTreePrefixTableEntry.java b/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/DOMDataTreePrefixTableEntry.java index 52aaf4a788..7243b0869d 100644 --- a/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/DOMDataTreePrefixTableEntry.java +++ b/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/DOMDataTreePrefixTableEntry.java @@ -48,6 +48,7 @@ public final class DOMDataTreePrefixTableEntry implements Identifiable lookup(final YangInstanceIdentifier id) { final Iterator it = id.getPathArguments().iterator(); DOMDataTreePrefixTableEntry entry = this; + DOMDataTreePrefixTableEntry lastPresentEntry = entry; while (it.hasNext()) { final PathArgument a = it.next(); @@ -58,9 +59,13 @@ public final class DOMDataTreePrefixTableEntry implements Identifiable AbstractDOMDataTreeChangeListenerRegistration registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) { + public AbstractDOMDataTreeChangeListenerRegistration registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) { // Take the write lock takeLock(); try { -- 2.36.6