From 61791b72e0137609d15c18efc64d227b6a4006ec Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Mon, 28 Nov 2016 14:30:00 +0100 Subject: [PATCH] BUG-2138: Listener support in shard frontend Change-Id: Icc997649ab56dc66a95d53f11c514fa3f2cc457f Signed-off-by: Tomas Cere --- .../cluster/datastore/AbstractDataStore.java | 22 +++++ .../sharding/DistributedShardFrontend.java | 38 ++++++- .../DistributedShardedDOMDataTree.java | 10 +- ...ributedShardedDOMDataTreeRemotingTest.java | 16 ++- .../DistributedShardedDOMDataTreeTest.java | 98 +++++++++++++++++-- 5 files changed, 170 insertions(+), 14 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java index 87706763c5..01f2ceb544 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java @@ -287,4 +287,26 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface public CountDownLatch getWaitTillReadyCountDownLatch() { return waitTillReadyCountDownLatch; } + + @SuppressWarnings("unchecked") + public ListenerRegistration registerProxyListener( + final YangInstanceIdentifier shardLookup, + final YangInstanceIdentifier insideShard, + final org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener delegate) { + + Preconditions.checkNotNull(shardLookup, "shardLookup should not be null"); + Preconditions.checkNotNull(insideShard, "insideShard should not be null"); + Preconditions.checkNotNull(delegate, "delegate should not be null"); + + final String shardName = actorContext.getShardStrategyFactory().getStrategy(shardLookup).findShard(shardLookup); + LOG.debug("Registering tree listener: {} for tree: {} shard: {}, path inside shard: {}", + delegate,shardLookup, shardName, insideShard); + + final DataTreeChangeListenerProxy listenerRegistrationProxy = + new DataTreeChangeListenerProxy<>(actorContext, delegate::onDataTreeChanged, insideShard); + listenerRegistrationProxy.init(shardName); + + return (ListenerRegistration) listenerRegistrationProxy; + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java index 8edfddedf4..7664897479 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java @@ -29,6 +29,7 @@ import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification; import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,9 +135,44 @@ class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard { @Nonnull @Override + @SuppressWarnings("unchecked") public ListenerRegistration registerTreeChangeListener( final YangInstanceIdentifier treeId, final L listener) { - throw new UnsupportedOperationException("Listener registration not supported"); + + final List toStrip = new ArrayList<>(shardRoot.getRootIdentifier().getPathArguments()); + final List stripFrom = new ArrayList<>(treeId.getPathArguments()); + + while (!toStrip.isEmpty()) { + stripFrom.remove(0); + toStrip.remove(0); + } + + return (ListenerRegistration) new ProxyRegistration(distributedDataStore + .registerProxyListener(treeId, YangInstanceIdentifier.create(stripFrom), listener), listener); + } + + private static class ProxyRegistration implements ListenerRegistration { + + private ListenerRegistration proxy; + private DOMDataTreeChangeListener listener; + + private ProxyRegistration( + final ListenerRegistration< + org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy, + final DOMDataTreeChangeListener listener) { + this.proxy = proxy; + this.listener = listener; + } + + @Override + public DOMDataTreeChangeListener getInstance() { + return listener; + } + + @Override + public void close() { + proxy.close(); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java index 5575438037..0bb6aac36f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java @@ -31,6 +31,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; @@ -94,10 +95,10 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat private final EnumMap defaultShardRegistrations = new EnumMap<>(LogicalDatastoreType.class); - public DistributedShardedDOMDataTree(final ActorSystem actorSystem, + public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider, final DistributedDataStore distributedOperDatastore, final DistributedDataStore distributedConfigDatastore) { - this.actorSystem = Preconditions.checkNotNull(actorSystem); + this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem(); this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore); this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore); shardedDOMDataTree = new ShardedDOMDataTree(); @@ -135,8 +136,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat final T listener, final Collection subtrees, final boolean allowRxMerges, final Collection producers) throws DOMDataTreeLoopException { - - throw new UnsupportedOperationException("Not implemented"); + return shardedDOMDataTree.registerListener(listener, subtrees, allowRxMerges, producers); } @Nonnull @@ -289,7 +289,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType) throws DOMDataTreeProducerException, DOMDataTreeShardingConflictException { final Collection members = JavaConverters.asJavaCollectionConverter( - Cluster.get(actorSystem).state().members()).asJavaCollection(); + Cluster.get(actorSystem).state().members()).asJavaCollection(); final Collection names = Collections2.transform(members, m -> MemberName.forName(m.roles().iterator().next())); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java index eb109f1e0a..f054100134 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.sharding; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard; import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown; @@ -30,6 +31,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.cluster.datastore.AbstractTest; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; @@ -86,6 +89,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { private DistributedShardedDOMDataTree leaderShardFactory; private DistributedShardedDOMDataTree followerShardFactory; + private ActorSystemProvider leaderSystemProvider; + private ActorSystemProvider followerSystemProvider; + @Before public void setUp() { @@ -95,6 +101,12 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); Cluster.get(followerSystem).join(MEMBER_1_ADDRESS); + leaderSystemProvider = Mockito.mock(ActorSystemProvider.class); + doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem(); + + followerSystemProvider = Mockito.mock(ActorSystemProvider.class); + doReturn(followerSystem).when(followerSystemProvider).getActorSystem(); + } @After @@ -123,11 +135,11 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { followerDistributedDataStore = followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); - leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem, + leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, leaderDistributedDataStore, leaderDistributedDataStore); - followerShardFactory = new DistributedShardedDOMDataTree(followerSystem, + followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider, followerDistributedDataStore, followerDistributedDataStore); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java index 88a12fe7d7..cd21a4c19b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java @@ -8,7 +8,16 @@ package org.opendaylight.controller.cluster.sharding; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard; import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown; @@ -18,17 +27,25 @@ import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.testkit.JavaTestKit; +import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; import com.typesafe.config.ConfigFactory; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.cluster.datastore.AbstractTest; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; @@ -46,15 +63,20 @@ import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder; import org.slf4j.Logger; @@ -71,6 +93,11 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + private static final DOMDataTreeIdentifier INNER_LIST_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, + YangInstanceIdentifier.create(getOuterListIdFor(0).getPathArguments()) + .node(TestModel.INNER_LIST_QNAME)); + private ActorSystem leaderSystem; private final Builder leaderDatastoreContextBuilder = @@ -85,12 +112,22 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { private DistributedShardedDOMDataTree leaderShardFactory; + @Captor + private ArgumentCaptor> captorForChanges; + @Captor + private ArgumentCaptor>> captorForSubtrees; + + private ActorSystemProvider leaderSystemProvider; + @Before public void setUp() { + MockitoAnnotations.initMocks(this); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); + leaderSystemProvider = Mockito.mock(ActorSystemProvider.class); + doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem(); } @After @@ -108,11 +145,13 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { leaderDistributedDataStore = leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full()); - leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem, + + leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider, leaderDistributedDataStore, leaderDistributedDataStore); } + @Test public void testWritesIntoDefaultShard() throws Exception { initEmptyDatastore("config"); @@ -167,14 +206,35 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { Assert.assertNotNull(cursor); final YangInstanceIdentifier nameId = YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build(); + final LeafNode valueToCheck = ImmutableLeafNodeBuilder.create().withNodeIdentifier( + new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build(); cursor.write(nameId.getLastPathArgument(), - ImmutableLeafNodeBuilder.create().withNodeIdentifier( - new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build()); + valueToCheck); cursor.close(); LOG.warn("Got to pre submit"); tx.submit().checkedGet(); + + final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class); + doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap()); + + leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID), + true, Collections.emptyList()); + + verify(mockedDataTreeListener, timeout(1000).times(1)).onDataTreeChanged(captorForChanges.capture(), + captorForSubtrees.capture()); + final List> capturedValue = captorForChanges.getAllValues(); + + final Optional> dataAfter = + capturedValue.get(0).iterator().next().getRootNode().getDataAfter(); + + final NormalizedNode expected = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)).withChild(valueToCheck).build(); + assertEquals(expected, dataAfter.get()); + + verifyNoMoreInteractions(mockedDataTreeListener); + } @Test @@ -192,8 +252,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TestModel.TEST_PATH)); - final YangInstanceIdentifier oid1 = TestModel.OUTER_LIST_PATH.node(new NodeIdentifierWithPredicates( - TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), 0)); + final YangInstanceIdentifier oid1 = getOuterListIdFor(0); final DOMDataTreeIdentifier outerListPath = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1); final DistributedShardRegistration outerListShardReg = leaderShardFactory.createDistributedShard(outerListPath, @@ -235,6 +294,28 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { futures.get(futures.size() - 1).checkedGet(); + final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class); + doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap()); + + leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(INNER_LIST_ID), + true, Collections.emptyList()); + + verify(mockedDataTreeListener, timeout(1000).times(1)).onDataTreeChanged(captorForChanges.capture(), + captorForSubtrees.capture()); + verifyNoMoreInteractions(mockedDataTreeListener); + final List> capturedValue = captorForChanges.getAllValues(); + + final NormalizedNode expected = + ImmutableMapNodeBuilder + .create() + .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME)) + // only the values from the last run should be present + .withValue(createInnerListMapEntries(1000, "run-999")) + .build(); + + assertEquals("List values dont match the expected values from the last run", + expected, capturedValue.get(0).iterator().next().getRootNode().getDataAfter().get()); + } private static Collection createInnerListMapEntries(final int amount, final String valuePrefix) { @@ -300,4 +381,9 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { } } -} \ No newline at end of file + + private static YangInstanceIdentifier getOuterListIdFor(final int id) { + return TestModel.OUTER_LIST_PATH.node(new NodeIdentifierWithPredicates( + TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), id)); + } +} -- 2.36.6