import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import javax.annotation.Nonnull;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
+import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@Beta
public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener {
+ private static final int DEFAULT_SUBMIT_QUEUE_SIZE = 1000;
+
private static final class SubshardProducerSpecification {
private final Collection<DOMDataTreeIdentifier> prefixes = new ArrayList<>(1);
private final ChildShardContext shard;
private final ListeningExecutorService executor;
private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor,
- final int maxDataChangeListenerQueueSize) {
+ final int maxDataChangeListenerQueueSize, final int submitQueueSize) {
this.prefix = Preconditions.checkNotNull(prefix);
final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor,
maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
- this.executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+
+ final FastThreadPoolExecutor fte = new FastThreadPoolExecutor(1, submitQueueSize, "Shard[" + prefix + "]");
+ fte.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerWaitsPolicy());
+ this.executor = MoreExecutors.listeningDecorator(fte);
+ }
+
+ public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
+ final ExecutorService dataTreeChangeExecutor,
+ final int maxDataChangeListenerQueueSize) {
+ return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor,
+ maxDataChangeListenerQueueSize, DEFAULT_SUBMIT_QUEUE_SIZE);
}
public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
- final ExecutorService dataTreeChangeExecutor, final int maxDataChangeListenerQueueSize) {
- return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize);
+ final ExecutorService dataTreeChangeExecutor,
+ final int maxDataChangeListenerQueueSize,
+ final int submitQueueSize) {
+ return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor,
+ maxDataChangeListenerQueueSize, submitQueueSize);
}
@Override