LOG.debug("{} - Received success from remote nodes, creating producer:{}",
distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
- distributedConfigDatastore.getActorContext());
+ distributedConfigDatastore.getActorContext(), shards);
} else if (response instanceof Exception) {
closeProducer(producer);
throw Throwables.propagate((Exception) response);
@GuardedBy("shardAccessMap")
private final Map<DOMDataTreeIdentifier, CDSShardAccessImpl> shardAccessMap = new HashMap<>();
+ // We don't have to guard access to shardTable in ProxyProducer.
+ // ShardTable's entries relevant to this ProxyProducer shouldn't
+ // change during producer's lifetime.
+ private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shardTable;
+
ProxyProducer(final DOMDataTreeProducer delegate,
final Collection<DOMDataTreeIdentifier> subtrees,
final ActorRef shardDataTreeActor,
- final ActorContext actorContext) {
+ final ActorContext actorContext,
+ final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shardLayout) {
this.delegate = Preconditions.checkNotNull(delegate);
this.subtrees = Preconditions.checkNotNull(subtrees);
this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor);
this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.shardTable = Preconditions.checkNotNull(shardLayout);
}
@Nonnull
@SuppressWarnings("checkstyle:IllegalCatch")
public void close() throws DOMDataTreeProducerException {
delegate.close();
+
synchronized (shardAccessMap) {
shardAccessMap.values().forEach(CDSShardAccessImpl::close);
}
@Nonnull
@Override
public CDSShardAccess getShardAccess(@Nonnull final DOMDataTreeIdentifier subtree) {
+ Preconditions.checkArgument(
+ subtrees.stream().anyMatch(dataTreeIdentifier -> dataTreeIdentifier.contains(subtree)),
+ "Subtree {} is not controlled by this producer {}", subtree, this);
+
+ final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
+ shardTable.lookup(subtree);
+ Preconditions.checkState(lookup != null, "Subtree {} is not contained in any registered shard.");
+
+ final DOMDataTreeIdentifier lookupId = lookup.getValue().getPrefix();
+
synchronized (shardAccessMap) {
- Preconditions.checkArgument(subtrees.contains(subtree),
- "Subtree {} is not controlled by this producer {}", subtree, this);
- if (shardAccessMap.get(subtree) != null) {
- return shardAccessMap.get(subtree);
+ if (shardAccessMap.get(lookupId) != null) {
+ return shardAccessMap.get(lookupId);
}
// TODO Maybe we can have static factory method and return the same instance
// for same subtrees. But maybe it is not needed since there can be only one
// producer attached to some subtree at a time. And also how we can close ShardAccess
// then
- final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(subtree, actorContext);
- shardAccessMap.put(subtree, shardAccess);
+ final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(lookupId, actorContext);
+ shardAccessMap.put(lookupId, shardAccess);
return shardAccess;
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.doNothing;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
+import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
}
}
+ @Test
+ public void testCDSDataTreeProducer() throws Exception {
+ initEmptyDatastores();
+
+ final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+ TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+
+ final DOMDataTreeIdentifier configRoot =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+ final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(configRoot));
+
+ assertTrue(producer instanceof CDSDataTreeProducer);
+
+ final CDSDataTreeProducer cdsProducer = (CDSDataTreeProducer) producer;
+ CDSShardAccess shardAccess = cdsProducer.getShardAccess(TEST_ID);
+ assertEquals(shardAccess.getShardIdentifier(), TEST_ID);
+
+ shardAccess = cdsProducer.getShardAccess(INNER_LIST_ID);
+ assertEquals(TEST_ID, shardAccess.getShardIdentifier());
+
+ shardAccess = cdsProducer.getShardAccess(configRoot);
+ assertEquals(configRoot, shardAccess.getShardIdentifier());
+
+ waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+ }
+
private static Collection<MapEntryNode> createOuterEntries(final int amount, final String valuePrefix) {
final Collection<MapEntryNode> ret = new ArrayList<>();
for (int i = 0; i < amount; i++) {