2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.store.inmemory;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.annotations.Beta;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.collect.ImmutableMap;
16 import com.google.common.collect.Maps;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.Iterator;
24 import java.util.Map.Entry;
25 import java.util.concurrent.Executor;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
30 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
31 import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
32 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
33 import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard;
34 import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification;
35 import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard;
36 import org.opendaylight.yangtools.concepts.ListenerRegistration;
37 import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
38 import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
44 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
45 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
46 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextListener;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
51 public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, EffectiveModelContextListener {
52 private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class);
53 private static final int DEFAULT_SUBMIT_QUEUE_SIZE = 1000;
55 private final DOMDataTreePrefixTable<ChildShardContext> childShardsTable = DOMDataTreePrefixTable.create();
56 private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
57 private final Collection<InMemoryDOMDataTreeShardProducer> producers = new HashSet<>();
58 private final InMemoryDOMDataTreeShardChangePublisher shardChangePublisher;
59 private final ListeningExecutorService executor;
60 private final DOMDataTreeIdentifier prefix;
61 private final DataTree dataTree;
63 InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final Executor dataTreeChangeExecutor,
64 final int maxDataChangeListenerQueueSize, final int submitQueueSize) {
65 this.prefix = requireNonNull(prefix);
67 final DataTreeConfiguration treeBaseConfig = treeTypeFor(prefix.getDatastoreType());
68 final DataTreeConfiguration treeConfig = new DataTreeConfiguration.Builder(treeBaseConfig.getTreeType())
69 .setMandatoryNodesValidation(treeBaseConfig.isMandatoryNodesValidationEnabled())
70 .setUniqueIndexes(treeBaseConfig.isUniqueIndexEnabled())
71 .setRootPath(prefix.getRootIdentifier())
74 this.dataTree = new InMemoryDataTreeFactory().create(treeConfig);
76 this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor,
77 maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
79 final FastThreadPoolExecutor fte = new FastThreadPoolExecutor(1, submitQueueSize, "Shard[" + prefix + "]",
80 InMemoryDOMDataTreeShard.class);
81 fte.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerWaitsPolicy());
82 this.executor = MoreExecutors.listeningDecorator(fte);
85 public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
86 final Executor dataTreeChangeExecutor,
87 final int maxDataChangeListenerQueueSize) {
88 return new InMemoryDOMDataTreeShard(id.toOptimized(), dataTreeChangeExecutor,
89 maxDataChangeListenerQueueSize, DEFAULT_SUBMIT_QUEUE_SIZE);
92 public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
93 final Executor dataTreeChangeExecutor,
94 final int maxDataChangeListenerQueueSize,
95 final int submitQueueSize) {
96 return new InMemoryDOMDataTreeShard(id.toOptimized(), dataTreeChangeExecutor,
97 maxDataChangeListenerQueueSize, submitQueueSize);
101 public void onModelContextUpdated(final EffectiveModelContext newModelContext) {
102 dataTree.setEffectiveModelContext(newModelContext);
106 public void onChildAttached(final DOMDataTreeIdentifier childPrefix, final DOMDataTreeShard child) {
107 checkArgument(child != this, "Attempted to attach child %s onto self", this);
108 reparentChildShards(childPrefix, child);
110 final ChildShardContext context = createContextFor(childPrefix, child);
111 childShards.put(childPrefix, context);
112 childShardsTable.store(childPrefix, context);
117 public void onChildDetached(final DOMDataTreeIdentifier childPrefix, final DOMDataTreeShard child) {
118 childShards.remove(childPrefix);
119 childShardsTable.remove(childPrefix);
120 //FIXME: Producers not being affected could be skipped over.
124 private void updateProducers() {
125 for (InMemoryDOMDataTreeShardProducer p : producers) {
126 p.setModificationFactory(createModificationFactory(p.getPrefixes()));
131 InMemoryShardDataModificationFactory createModificationFactory(final Collection<DOMDataTreeIdentifier> prefixes) {
132 final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affected = new HashMap<>();
133 for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
134 for (final ChildShardContext child : childShards.values()) {
135 final DOMDataTreeIdentifier bindPath;
136 if (producerPrefix.contains(child.getPrefix())) {
137 bindPath = child.getPrefix();
138 } else if (child.getPrefix().contains(producerPrefix)) {
139 // Bound path is inside subshard
140 bindPath = producerPrefix;
145 SubshardProducerSpecification spec = affected.get(child.getPrefix());
147 spec = new SubshardProducerSpecification(child);
148 affected.put(child.getPrefix(), spec);
150 spec.addPrefix(bindPath);
154 final InmemoryShardDataModificationFactoryBuilder builder =
155 new InmemoryShardDataModificationFactoryBuilder(prefix);
156 for (final SubshardProducerSpecification spec : affected.values()) {
157 final ForeignShardModificationContext foreignContext =
158 new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
159 builder.addSubshard(foreignContext);
160 builder.addSubshard(spec.getPrefix(), foreignContext);
163 return builder.build();
167 public InMemoryDOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> prefixes) {
168 for (final DOMDataTreeIdentifier prodPrefix : prefixes) {
169 checkArgument(prefix.contains(prodPrefix), "Prefix %s is not contained under shard root", prodPrefix,
173 final InMemoryDOMDataTreeShardProducer ret = new InMemoryDOMDataTreeShardProducer(this, prefixes,
174 createModificationFactory(prefixes));
179 void closeProducer(final InMemoryDOMDataTreeShardProducer producer) {
180 synchronized (this) {
181 if (!producers.remove(producer)) {
182 LOG.warn("Producer {} not found in shard {}", producer, this);
188 public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
189 final YangInstanceIdentifier treeId, final L listener) {
190 return shardChangePublisher.registerTreeChangeListener(treeId, listener);
193 private void reparentChildShards(final DOMDataTreeIdentifier newChildPrefix, final DOMDataTreeShard newChild) {
194 final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren =
195 childShards.entrySet().iterator();
196 final Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
197 while (actualChildren.hasNext()) {
198 final Entry<DOMDataTreeIdentifier, ChildShardContext> actualChild = actualChildren.next();
199 final DOMDataTreeIdentifier actualPrefix = actualChild.getKey();
200 checkArgument(!newChildPrefix.equals(actualPrefix), "Child shard with prefix %s already attached",
202 if (newChildPrefix.contains(actualPrefix)) {
203 final ChildShardContext actualContext = actualChild.getValue();
204 actualChildren.remove();
205 newChild.onChildAttached(actualPrefix, actualContext.getShard());
206 reparented.put(actualChild.getKey(), actualContext);
207 childShardsTable.remove(actualPrefix);
210 updateProducersAndListeners(reparented);
213 private void updateProducersAndListeners(final Map<DOMDataTreeIdentifier, ChildShardContext> reparented) {
214 // FIXME: remove reparenting of producers, shards have to be registered from top to bottom
215 if (reparented.isEmpty()) {
216 //nothing was reparented no need to update anything
219 throw new UnsupportedOperationException();
222 private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix,
223 final DOMDataTreeShard child) {
224 checkArgument(child instanceof WriteableDOMDataTreeShard, "Child %s is not a writable shared", child);
225 return new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child);
228 private static DataTreeConfiguration treeTypeFor(final LogicalDatastoreType dsType) {
231 return DataTreeConfiguration.DEFAULT_CONFIGURATION;
233 return DataTreeConfiguration.DEFAULT_OPERATIONAL;
235 throw new IllegalArgumentException("Unsupported Data Store type:" + dsType);
240 Map<DOMDataTreeIdentifier, DOMDataTreeShard> getChildShards() {
241 return ImmutableMap.copyOf(Maps.transformValues(childShards, ChildShardContext::getShard));
244 DataTreeSnapshot takeSnapshot() {
245 return dataTree.takeSnapshot();
248 InmemoryDOMDataTreeShardWriteTransaction createTransaction(final String transactionId,
249 final InMemoryDOMDataTreeShardProducer producer, final DataTreeSnapshot snapshot) {
250 checkArgument(snapshot instanceof CursorAwareDataTreeSnapshot);
251 return new InmemoryDOMDataTreeShardWriteTransaction(producer,
252 producer.getModificationFactory().createModification((CursorAwareDataTreeSnapshot) snapshot), dataTree,
253 shardChangePublisher, executor);
257 public Collection<InMemoryDOMDataTreeShardProducer> getProducers() {