--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
+import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
+import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
+import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedShardChangePublisher
+ extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
+ implements DOMStoreTreeChangePublisher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
+
+ private final DistributedDataStore distributedDataStore;
+ private final YangInstanceIdentifier shardPath;
+
+ // This will be useful for signaling back pressure
+ private final DataStoreClient client;
+
+ private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
+
+ @GuardedBy("this")
+ private final DataTree dataTree;
+
+ public DistributedShardChangePublisher(final DataStoreClient client,
+ final DistributedDataStore distributedDataStore,
+ final DOMDataTreeIdentifier prefix,
+ final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
+ this.client = client;
+ this.distributedDataStore = distributedDataStore;
+ // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
+ // maybe the whole listener logic would be better in the backend shards where we have direct access to the
+ // dataTree and wont have to cache it redundantly.
+ this.dataTree = InMemoryDataTreeFactory.getInstance().create(
+ TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier());
+
+ dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
+
+ this.shardPath = prefix.getRootIdentifier();
+ this.childShards = childShards;
+ }
+
+ protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
+ LOG.debug("Closing registration {}", registration);
+ }
+
+ @Override
+ public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
+ registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
+ takeLock();
+ try {
+ return setupListenerContext(path, listener);
+ } finally {
+ releaseLock();
+ }
+ }
+
+ private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
+ setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
+ // we need to register the listener registration path based on the shards root
+ // we have to strip the shard path from the listener path and then register
+ YangInstanceIdentifier strippedIdentifier = listenerPath;
+ if (!shardPath.isEmpty()) {
+ strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
+ }
+
+ final DOMDataTreeListenerWithSubshards subshardListener =
+ new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
+ final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
+ setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
+
+ for (final ChildShardContext maybeAffected : childShards.values()) {
+ if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
+ // consumer has initialDataChangeEvent subshard somewhere on lower level
+ // register to the notification manager with snapshot and forward child notifications to parent
+ LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
+ subshardListener.addSubshard(maybeAffected);
+ } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
+ // bind path is inside subshard
+ // TODO can this happen? seems like in ShardedDOMDataTree we are
+ // already registering to the lowest shard possible
+ throw new UnsupportedOperationException("Listener should be registered directly "
+ + "into initialDataChangeEvent subshard");
+ }
+ }
+
+ return reg;
+ }
+
+ private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
+ setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
+ final YangInstanceIdentifier listenerPath,
+ final DOMDataTreeListenerWithSubshards listener) {
+
+ LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
+
+ // register in the shard tree
+ final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
+ findNodeFor(listenerPath.getPathArguments());
+
+ // register listener in CDS
+ final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore
+ .registerProxyListener(shardLookup, listenerPath, listener), listener);
+
+ @SuppressWarnings("unchecked")
+ final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
+ new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
+ @Override
+ protected void removeRegistration() {
+ listener.close();
+ DistributedShardChangePublisher.this.removeRegistration(node, this);
+ registrationRemoved(this);
+ proxyReg.close();
+ }
+ };
+ addRegistration(node, registration);
+
+ return registration;
+ }
+
+ private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
+ final YangInstanceIdentifier listenerPath) {
+ if (shardPath.isEmpty()) {
+ return listenerPath.getPathArguments();
+ }
+
+ final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
+ final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
+ final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
+
+ while (shardIter.hasNext()) {
+ if (shardIter.next().equals(listenerIter.next())) {
+ listenerIter.remove();
+ } else {
+ break;
+ }
+ }
+
+ return listenerPathArgs;
+ }
+
+ private static class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
+
+ private final ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
+ private final DOMDataTreeChangeListener listener;
+
+ private ProxyRegistration(
+ final ListenerRegistration<
+ org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
+ final DOMDataTreeChangeListener listener) {
+ this.proxy = proxy;
+ this.listener = listener;
+ }
+
+ @Override
+ public DOMDataTreeChangeListener getInstance() {
+ return listener;
+ }
+
+ @Override
+ public void close() {
+ proxy.close();
+ }
+ }
+
+ synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
+ final Collection<DataTreeCandidate> changes) {
+ final DataTreeModification modification = dataTree.takeSnapshot().newModification();
+ for (final DataTreeCandidate change : changes) {
+ try {
+ DataTreeCandidates.applyToModification(modification, change);
+ } catch (SchemaValidationFailedException e) {
+ LOG.error("Validation failed {}", e);
+ }
+ }
+
+ modification.ready();
+
+ final DataTreeCandidate candidate;
+
+ try {
+ dataTree.validate(modification);
+ } catch (final DataValidationFailedException e) {
+ LOG.error("Validation failed for built modification, modification {}, current data tree: {}",
+ modification, dataTree, e);
+ throw new RuntimeException("Notification validation failed", e);
+ }
+
+ // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
+ candidate = dataTree.prepare(modification);
+ dataTree.commit(candidate);
+
+
+ DataTreeCandidateNode modifiedChild = candidate.getRootNode();
+
+ for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
+ modifiedChild = modifiedChild.getModifiedChild(pathArgument);
+ }
+
+ if (modifiedChild == null) {
+ modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument());
+ }
+
+ return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
+ }
+
+
+ private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
+
+ private final YangInstanceIdentifier listenerPath;
+ private final DOMDataTreeChangeListener delegate;
+
+ private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
+ new ConcurrentHashMap<>();
+
+ DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
+ final DOMDataTreeChangeListener delegate) {
+ this.listenerPath = Preconditions.checkNotNull(listenerPath);
+ this.delegate = Preconditions.checkNotNull(delegate);
+ }
+
+ @Override
+ public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+ LOG.debug("Received data changed {}", changes);
+ applyChanges(listenerPath, changes);
+ delegate.onDataTreeChanged(changes);
+ }
+
+ synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
+ final Collection<DataTreeCandidate> changes) {
+ final YangInstanceIdentifier changeId =
+ YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
+
+ final List<DataTreeCandidate> newCandidates = changes.stream()
+ .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
+ .collect(Collectors.toList());
+ delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+ }
+
+ void addSubshard(final ChildShardContext context) {
+ Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
+ "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
+
+ final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
+ // since this is going into subshard we want to listen for ALL changes in the subshard
+ registrations.put(context.getPrefix().getRootIdentifier(),
+ listenableShard.registerTreeChangeListener(
+ context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
+ context.getPrefix().getRootIdentifier(), changes)));
+ }
+
+ void close() {
+ for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
+ registration.close();
+ }
+ registrations.clear();
+ }
+ }
+
+ private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode {
+
+ private final PathArgument identifier;
+
+ EmptyDataTreeCandidateNode(final PathArgument identifier) {
+ this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
+ }
+
+ @Nonnull
+ @Override
+ public PathArgument getIdentifier() {
+ return identifier;
+ }
+
+ @Nonnull
+ @Override
+ public Collection<DataTreeCandidateNode> getChildNodes() {
+ return Collections.emptySet();
+ }
+
+ @Nullable
+ @Override
+ public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
+ return null;
+ }
+
+ @Nonnull
+ @Override
+ public ModificationType getModificationType() {
+ return ModificationType.UNMODIFIED;
+ }
+
+ @Nonnull
+ @Override
+ public Optional<NormalizedNode<?, ?>> getDataAfter() {
+ return Optional.absent();
+ }
+
+ @Nonnull
+ @Override
+ public Optional<NormalizedNode<?, ?>> getDataBefore() {
+ return Optional.absent();
+ }
+ }
+}
import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
@GuardedBy("this")
private final List<ShardProxyProducer> producers = new ArrayList<>();
- private final DistributedDataStore distributedDataStore;
+
+ private final DistributedShardChangePublisher publisher;
DistributedShardFrontend(final DistributedDataStore distributedDataStore,
final DataStoreClient client,
final DOMDataTreeIdentifier shardRoot) {
- this.distributedDataStore = Preconditions.checkNotNull(distributedDataStore);
this.client = Preconditions.checkNotNull(client);
this.shardRoot = Preconditions.checkNotNull(shardRoot);
+
+ publisher = new DistributedShardChangePublisher(client, Preconditions.checkNotNull(distributedDataStore),
+ shardRoot, childShards);
}
@Override
@SuppressWarnings("unchecked")
public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
final YangInstanceIdentifier treeId, final L listener) {
-
- final List<PathArgument> toStrip = new ArrayList<>(shardRoot.getRootIdentifier().getPathArguments());
- final List<PathArgument> stripFrom = new ArrayList<>(treeId.getPathArguments());
-
- while (!toStrip.isEmpty()) {
- stripFrom.remove(0);
- toStrip.remove(0);
- }
-
- return (ListenerRegistration<L>) new ProxyRegistration(distributedDataStore
- .registerProxyListener(treeId, YangInstanceIdentifier.create(stripFrom), listener), listener);
- }
-
- private static class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
-
- private ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
- private DOMDataTreeChangeListener listener;
-
- private ProxyRegistration(
- final ListenerRegistration<
- org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
- final DOMDataTreeChangeListener listener) {
- this.proxy = proxy;
- this.listener = listener;
- }
-
- @Override
- public DOMDataTreeChangeListener getInstance() {
- return listener;
- }
-
- @Override
- public void close() {
- proxy.close();
- }
+ return publisher.registerTreeChangeListener(treeId, listener);
}
-
}
private void onProducerCreated(final ProducerCreated message) {
LOG.debug("Received ProducerCreated: {}", message);
- // fastpath if no replication is needed, since there is only one node
- if (resolver.getShardingServicePeerActorAddresses().size() == 1) {
+ // fastpath if we have no peers
+ if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
getSender().tell(new Status.Success(null), noSender());
}
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
public void testClientTransaction() throws Exception {
final DistributedDataStore distributedDataStore = mock(DistributedDataStore.class);
+ final ActorContext context = mock(ActorContext.class);
+ doReturn(context).when(distributedDataStore).getActorContext();
+ doReturn(SchemaContextHelper.full()).when(context).getSchemaContext();
+
final DistributedShardFrontend rootShard = new DistributedShardFrontend(distributedDataStore, client, ROOT);
try (final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT))) {
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
-import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
-import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
}
@Test
- @Ignore("Needs different shard creation handling due to replicas")
public void testProducerRegistrations() throws Exception {
initEmptyDatastores("config");
followerProducer.close();
// try to create a shard on an already registered prefix on follower
try {
- followerShardFactory.createDistributedShard(TEST_ID,
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ waitOnAsyncTask(followerShardFactory.createDistributedShard(
+ TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
fail("This prefix already should have a shard registration that was forwarded from the other node");
} catch (final DOMDataTreeShardingConflictException e) {
- assertTrue(e.getMessage().contains("is already occupied by shard"));
+ assertTrue(e.getMessage().contains("is already occupied by another shard"));
}
}
@Test
- @Ignore("Needs different shard creation handling due to replicas")
public void testWriteIntoMultipleShards() throws Exception {
initEmptyDatastores("config");
leaderTestKit.waitForMembersUp("member-2");
- LOG.warn("registering first shard");
+ LOG.debug("registering first shard");
final DistributedShardRegistration shardRegistration =
waitOnAsyncTask(leaderShardFactory.createDistributedShard(
TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+
leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
findLocalShard(followerDistributedDataStore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
- LOG.warn("Got after waiting for nonleader");
- final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
-
- new JavaTestKit(leaderSystem) {
- {
- leaderShardManager.tell(
- new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
- final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
-
- followerShardManager.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
- followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
- LOG.warn("Found follower shard");
-
- leaderDistributedDataStore.getActorContext().getShardManager().tell(
- new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
- }
- };
-
+ LOG.debug("Got after waiting for nonleader");
final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
cursor.close();
LOG.warn("Got to pre submit");
- tx.submit();
+ tx.submit().checkedGet();
}
@Test
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.AbstractTest;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.create(getOuterListIdFor(0).getPathArguments())
.node(TestModel.INNER_LIST_QNAME));
+ private static final Set<MemberName> SINGLE_MEMBER = Collections.singleton(AbstractTest.MEMBER_NAME);
private ActorSystem leaderSystem;
}
- private static Collection<MapEntryNode> createInnerListMapEntries(final int amount, final String valuePrefix) {
- final Collection<MapEntryNode> ret = new ArrayList<>();
- for (int i = 0; i < amount; i++) {
- ret.add(ImmutableNodes.mapEntryBuilder()
- .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.INNER_LIST_QNAME,
- QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i)))
- .withChild(ImmutableNodes
- .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i))
- .build());
+ // top level shard at TEST element, with subshards on each outer-list map entry
+ @Test
+ public void testMultipleShardLevels() throws Exception {
+ initEmptyDatastore("config");
+
+ final DistributedShardRegistration testShardId = waitOnAsyncTask(
+ leaderShardFactory.createDistributedShard(TEST_ID, SINGLE_MEMBER),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+
+ final ArrayList<DistributedShardRegistration> registrations = new ArrayList<>();
+ final int listSize = 5;
+ for (int i = 0; i < listSize; i++) {
+ final YangInstanceIdentifier entryYID = getOuterListIdFor(i);
+ final CompletionStage<DistributedShardRegistration> future = leaderShardFactory.createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, entryYID), SINGLE_MEMBER);
+
+ registrations.add(waitOnAsyncTask(future, DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION));
}
- return ret;
+ final DOMDataTreeIdentifier rootId =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+ final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singletonList(
+ rootId));
+
+ DOMDataTreeCursorAwareTransaction transaction = producer.createTransaction(false);
+
+ DOMDataTreeWriteCursor cursor = transaction.createCursor(rootId);
+ assertNotNull(cursor);
+
+ final MapNode outerList =
+ ImmutableMapNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+ final ContainerNode testNode =
+ ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
+ .withChild(outerList)
+ .build();
+
+ cursor.write(testNode.getIdentifier(), testNode);
+
+ cursor.close();
+ transaction.submit().checkedGet();
+
+ final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class);
+ doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap());
+
+ final MapNode wholeList = ImmutableMapNodeBuilder.create(outerList)
+ .withValue(createOuterEntries(listSize, "testing-values")).build();
+
+ transaction = producer.createTransaction(false);
+ cursor = transaction.createCursor(TEST_ID);
+ assertNotNull(cursor);
+
+ cursor.write(wholeList.getIdentifier(), wholeList);
+ cursor.close();
+
+ transaction.submit().checkedGet();
+
+ leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID),
+ true, Collections.emptyList());
+
+ // need 6 invocations, first initial thats from the parent shard, and then each individual subshard
+ verify(mockedDataTreeListener, timeout(10000).times(6)).onDataTreeChanged(captorForChanges.capture(),
+ captorForSubtrees.capture());
+ verifyNoMoreInteractions(mockedDataTreeListener);
+ final List<Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>>> allSubtrees = captorForSubtrees.getAllValues();
+
+ final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> lastSubtree = allSubtrees.get(allSubtrees.size() - 1);
+
+ final NormalizedNode<?, ?> actual = lastSubtree.get(TEST_ID);
+ assertNotNull(actual);
+
+ final NormalizedNode<?, ?> expected =
+ ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
+ .withChild(ImmutableMapNodeBuilder.create(outerList)
+ .withValue(createOuterEntries(listSize, "testing-values")).build())
+ .build();
+
+ assertEquals(expected, actual);
}
@Test
}
}
+ private static Collection<MapEntryNode> createOuterEntries(final int amount, final String valuePrefix) {
+ final Collection<MapEntryNode> ret = new ArrayList<>();
+ for (int i = 0; i < amount; i++) {
+ ret.add(ImmutableNodes.mapEntryBuilder()
+ .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME,
+ QName.create(TestModel.OUTER_LIST_QNAME, "id"), i))
+ .withChild(ImmutableNodes
+ .leafNode(QName.create(TestModel.OUTER_LIST_QNAME, "id"), i))
+ .withChild(createWholeInnerList(amount, "outer id: " + i + " " + valuePrefix))
+ .build());
+ }
+
+ return ret;
+ }
+
+ private static MapNode createWholeInnerList(final int amount, final String valuePrefix) {
+ return ImmutableMapNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME))
+ .withValue(createInnerListMapEntries(amount, valuePrefix)).build();
+ }
+
+ private static Collection<MapEntryNode> createInnerListMapEntries(final int amount, final String valuePrefix) {
+ final Collection<MapEntryNode> ret = new ArrayList<>();
+ for (int i = 0; i < amount; i++) {
+ ret.add(ImmutableNodes.mapEntryBuilder()
+ .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.INNER_LIST_QNAME,
+ QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i)))
+ .withChild(ImmutableNodes
+ .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i))
+ .build());
+ }
+
+ return ret;
+ }
+
private static YangInstanceIdentifier getOuterListIdFor(final int id) {
return TestModel.OUTER_LIST_PATH.node(new NodeIdentifierWithPredicates(
TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), id));