import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
+import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@Beta
public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener {
+ private static final int DEFAULT_SUBMIT_QUEUE_SIZE = 1000;
+
private static final class SubshardProducerSpecification {
private final Collection<DOMDataTreeIdentifier> prefixes = new ArrayList<>(1);
private final ChildShardContext shard;
+
SubshardProducerSpecification(final ChildShardContext subshard) {
this.shard = Preconditions.checkNotNull(subshard);
}
private final InMemoryDOMDataTreeShardChangePublisher shardChangePublisher;
private final ListeningExecutorService executor;
- private SchemaContext schemaContext;
-
private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor,
- final int maxDataChangeListenerQueueSize, final int maxCommitQueueSize) {
+ final int maxDataChangeListenerQueueSize, final int submitQueueSize) {
this.prefix = Preconditions.checkNotNull(prefix);
final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier());
- this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor, maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
- this.executor = MoreExecutors.listeningDecorator(SpecialExecutors.newBoundedSingleThreadExecutor(maxCommitQueueSize, "Shard-executor[" + prefix + "]"));
+ this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor,
+ maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
+
+ final FastThreadPoolExecutor fte = new FastThreadPoolExecutor(1, submitQueueSize, "Shard[" + prefix + "]");
+ fte.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerWaitsPolicy());
+ this.executor = MoreExecutors.listeningDecorator(fte);
+ }
+
+ public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
+ final ExecutorService dataTreeChangeExecutor,
+ final int maxDataChangeListenerQueueSize) {
+ return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor,
+ maxDataChangeListenerQueueSize, DEFAULT_SUBMIT_QUEUE_SIZE);
}
- public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id, final ExecutorService dataTreeChangeExecutor,
- final int maxDataChangeListenerQueueSize, final int maxCommitQueueSize) {
- return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize, maxCommitQueueSize);
+ public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
+ final ExecutorService dataTreeChangeExecutor,
+ final int maxDataChangeListenerQueueSize,
+ final int submitQueueSize) {
+ return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor,
+ maxDataChangeListenerQueueSize, submitQueueSize);
}
@Override
public void onGlobalContextUpdated(final SchemaContext context) {
dataTree.setSchemaContext(context);
- schemaContext = context;
}
@Override
@Nonnull
@Override
- public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(@Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+ @Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
return shardChangePublisher.registerTreeChangeListener(treeId, listener);
}
}
private void reparentChildShards(final DOMDataTreeIdentifier newChildPrefix, final DOMDataTreeShard newChild) {
- final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren = childShards.entrySet().iterator();
+ final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren =
+ childShards.entrySet().iterator();
final Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
while (actualChildren.hasNext()) {
final Entry<DOMDataTreeIdentifier, ChildShardContext> actualChild = actualChildren.next();
throw new UnsupportedOperationException();
}
- private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+ private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix,
+ final DOMDataTreeShard child) {
Preconditions.checkArgument(child instanceof WriteableDOMDataTreeShard,
"Child %s is not a writable shared", child);
return new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child);
return ret;
}
- InmemoryDOMDataTreeShardWriteTransaction createTransaction(final InmemoryDOMDataTreeShardWriteTransaction previousTx) {
- // FIXME: implement this
- throw new UnsupportedOperationException();
+ DataTreeSnapshot takeSnapshot() {
+ return dataTree.takeSnapshot();
}
- InmemoryDOMDataTreeShardWriteTransaction createTransaction(final Collection<DOMDataTreeIdentifier> prefixes) {
+ InmemoryDOMDataTreeShardWriteTransaction createTransaction(final String transactionId,
+ final InMemoryDOMDataTreeShardProducer producer,
+ final Collection<DOMDataTreeIdentifier> prefixes,
+ final DataTreeSnapshot snapshot) {
+
+ return createTxForSnapshot(producer, prefixes, (CursorAwareDataTreeSnapshot) snapshot);
+ }
+
+ private InmemoryDOMDataTreeShardWriteTransaction createTxForSnapshot(
+ final InMemoryDOMDataTreeShardProducer producer,
+ final Collection<DOMDataTreeIdentifier> prefixes,
+ final CursorAwareDataTreeSnapshot snapshot) {
final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
}
}
- final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix,
- (CursorAwareDataTreeSnapshot) dataTree.takeSnapshot());
+ final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix, snapshot);
final ShardDataModificationBuilder builder = new ShardDataModificationBuilder(rootContext);
for (final SubshardProducerSpecification spec : affectedSubshards.values()) {
final ForeignShardModificationContext foreignContext =
builder.addSubshard(spec.getPrefix(), foreignContext);
}
- return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree, shardChangePublisher, executor);
+ return new InmemoryDOMDataTreeShardWriteTransaction(producer, builder.build(),
+ dataTree, shardChangePublisher, executor);
}
+
}