Move transaction-invariants into producer
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InMemoryDOMDataTreeShard.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.dom.store.inmemory;
10
11 import com.google.common.annotations.Beta;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Maps;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.concurrent.Executor;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
31 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
32 import org.opendaylight.yangtools.concepts.ListenerRegistration;
33 import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
34 import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
40 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 @Beta
47 public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener {
48     private static final class SubshardProducerSpecification {
49         private final Collection<DOMDataTreeIdentifier> prefixes = new ArrayList<>(1);
50         private final ChildShardContext shard;
51
52         SubshardProducerSpecification(final ChildShardContext subshard) {
53             this.shard = Preconditions.checkNotNull(subshard);
54         }
55
56         void addPrefix(final DOMDataTreeIdentifier prefix) {
57             prefixes.add(prefix);
58         }
59
60         DOMDataTreeShardProducer createProducer() {
61             return shard.getShard().createProducer(prefixes);
62         }
63
64         DOMDataTreeIdentifier getPrefix() {
65             return shard.getPrefix();
66         }
67     }
68
69     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class);
70     private static final int DEFAULT_SUBMIT_QUEUE_SIZE = 1000;
71
72     private final DOMDataTreePrefixTable<ChildShardContext> childShardsTable = DOMDataTreePrefixTable.create();
73     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
74     private final Collection<InMemoryDOMDataTreeShardProducer> producers = new HashSet<>();
75     private final InMemoryDOMDataTreeShardChangePublisher shardChangePublisher;
76     private final ListeningExecutorService executor;
77     private final DOMDataTreeIdentifier prefix;
78     private final DataTree dataTree;
79
80     private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final Executor dataTreeChangeExecutor,
81                                      final int maxDataChangeListenerQueueSize, final int submitQueueSize) {
82         this.prefix = Preconditions.checkNotNull(prefix);
83
84         final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
85         this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier());
86
87         this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor,
88                 maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
89
90         final FastThreadPoolExecutor fte = new FastThreadPoolExecutor(1, submitQueueSize, "Shard[" + prefix + "]");
91         fte.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerWaitsPolicy());
92         this.executor = MoreExecutors.listeningDecorator(fte);
93     }
94
95     public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
96                                                   final Executor dataTreeChangeExecutor,
97                                                   final int maxDataChangeListenerQueueSize) {
98         return new InMemoryDOMDataTreeShard(id.toOptimized(), dataTreeChangeExecutor,
99                 maxDataChangeListenerQueueSize, DEFAULT_SUBMIT_QUEUE_SIZE);
100     }
101
102     public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
103                                                   final Executor dataTreeChangeExecutor,
104                                                   final int maxDataChangeListenerQueueSize,
105                                                   final int submitQueueSize) {
106         return new InMemoryDOMDataTreeShard(id.toOptimized(), dataTreeChangeExecutor,
107                 maxDataChangeListenerQueueSize, submitQueueSize);
108     }
109
110     @Override
111     public void onGlobalContextUpdated(final SchemaContext context) {
112         dataTree.setSchemaContext(context);
113     }
114
115     @Override
116     public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
117         Preconditions.checkArgument(child != this, "Attempted to attach child %s onto self", this);
118         reparentChildShards(prefix, child);
119
120         final ChildShardContext context = createContextFor(prefix, child);
121         childShards.put(prefix, context);
122         childShardsTable.store(prefix, context);
123         updateProducers();
124     }
125
126     @Override
127     public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
128         childShards.remove(prefix);
129         childShardsTable.remove(prefix);
130         updateProducers();
131     }
132
133     private void updateProducers() {
134         for (InMemoryDOMDataTreeShardProducer p : producers) {
135             p.setModificationFactory(createModificationFactory(p.getPrefixes()));
136         }
137     }
138
139     @VisibleForTesting
140     ShardDataModificationFactory createModificationFactory(final Collection<DOMDataTreeIdentifier> prefixes) {
141         final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affected = new HashMap<>();
142         for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
143             for (final ChildShardContext child : childShards.values()) {
144                 final DOMDataTreeIdentifier bindPath;
145                 if (producerPrefix.contains(child.getPrefix())) {
146                     bindPath = child.getPrefix();
147                 } else if (child.getPrefix().contains(producerPrefix)) {
148                     // Bound path is inside subshard
149                     bindPath = producerPrefix;
150                 } else {
151                     continue;
152                 }
153
154                 SubshardProducerSpecification spec = affected.get(child.getPrefix());
155                 if (spec == null) {
156                     spec = new SubshardProducerSpecification(child);
157                     affected.put(child.getPrefix(), spec);
158                 }
159                 spec.addPrefix(bindPath);
160             }
161         }
162
163         final ShardDataModificationFactoryBuilder builder = new ShardDataModificationFactoryBuilder(prefix);
164         for (final SubshardProducerSpecification spec : affected.values()) {
165             final ForeignShardModificationContext foreignContext =
166                     new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
167             builder.addSubshard(foreignContext);
168             builder.addSubshard(spec.getPrefix(), foreignContext);
169         }
170
171         return builder.build();
172     }
173
174     @Override
175     public InMemoryDOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> prefixes) {
176         for (final DOMDataTreeIdentifier prodPrefix : prefixes) {
177             Preconditions.checkArgument(prefix.contains(prodPrefix), "Prefix %s is not contained under shart root",
178                 prodPrefix, prefix);
179         }
180
181         final InMemoryDOMDataTreeShardProducer ret = new InMemoryDOMDataTreeShardProducer(this, prefixes,
182             createModificationFactory(prefixes));
183         producers.add(ret);
184         return ret;
185     }
186
187     void closeProducer(final InMemoryDOMDataTreeShardProducer producer) {
188         if (!producers.remove(producer)) {
189             LOG.warn("Producer {} not found in shard {}", producer, this);
190         }
191     }
192
193     @Nonnull
194     @Override
195     public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
196             @Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
197         return shardChangePublisher.registerTreeChangeListener(treeId, listener);
198     }
199
200     private void reparentChildShards(final DOMDataTreeIdentifier newChildPrefix, final DOMDataTreeShard newChild) {
201         final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren =
202                 childShards.entrySet().iterator();
203         final Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
204         while (actualChildren.hasNext()) {
205             final Entry<DOMDataTreeIdentifier, ChildShardContext> actualChild = actualChildren.next();
206             final DOMDataTreeIdentifier actualPrefix = actualChild.getKey();
207             Preconditions.checkArgument(!newChildPrefix.equals(actualPrefix),
208                     "Child shard with prefix %s already attached", newChildPrefix);
209             if (newChildPrefix.contains(actualPrefix)) {
210                 final ChildShardContext actualContext = actualChild.getValue();
211                 actualChildren.remove();
212                 newChild.onChildAttached(actualPrefix, actualContext.getShard());
213                 reparented.put(actualChild.getKey(), actualContext);
214                 childShardsTable.remove(actualPrefix);
215             }
216         }
217         updateProducersAndListeners(reparented);
218     }
219
220     private void updateProducersAndListeners(final Map<DOMDataTreeIdentifier, ChildShardContext> reparented) {
221         // FIXME: remove reparenting of producers, shards have to be registered from top to bottom
222         if (reparented.isEmpty()) {
223             //nothing was reparented no need to update anything
224             return;
225         }
226         throw new UnsupportedOperationException();
227     }
228
229     private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix,
230             final DOMDataTreeShard child) {
231         Preconditions.checkArgument(child instanceof WriteableDOMDataTreeShard,
232             "Child %s is not a writable shared", child);
233         return new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child);
234     }
235
236     private static TreeType treeTypeFor(final LogicalDatastoreType dsType) {
237         switch (dsType) {
238             case CONFIGURATION:
239                 return TreeType.CONFIGURATION;
240             case OPERATIONAL:
241                 return TreeType.OPERATIONAL;
242             default:
243                 throw new IllegalArgumentException("Unsupported Data Store type:" + dsType);
244         }
245     }
246
247     @VisibleForTesting
248     Map<DOMDataTreeIdentifier, DOMDataTreeShard> getChildShards() {
249         return ImmutableMap.copyOf(Maps.transformValues(childShards, ChildShardContext::getShard));
250     }
251
252     DataTreeSnapshot takeSnapshot() {
253         return dataTree.takeSnapshot();
254     }
255
256     InmemoryDOMDataTreeShardWriteTransaction createTransaction(final String transactionId,
257             final InMemoryDOMDataTreeShardProducer producer, final DataTreeSnapshot snapshot) {
258         Preconditions.checkArgument(snapshot instanceof CursorAwareDataTreeSnapshot);
259
260         return new InmemoryDOMDataTreeShardWriteTransaction(producer,
261             producer.getModificationFactory().createModification((CursorAwareDataTreeSnapshot)snapshot), dataTree,
262             shardChangePublisher, executor);
263     }
264 }