BUG-3128: Extract abstract shard implementations.
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InMemoryDOMDataTreeShard.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.dom.store.inmemory;
10
11 import com.google.common.annotations.Beta;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Maps;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.concurrent.Executor;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
31 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
32 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
33 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
36 import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
42 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 @Beta
49 public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener {
50     private static final class SubshardProducerSpecification {
51         private final Collection<DOMDataTreeIdentifier> prefixes = new ArrayList<>(1);
52         private final ChildShardContext shard;
53
54         SubshardProducerSpecification(final ChildShardContext subshard) {
55             this.shard = Preconditions.checkNotNull(subshard);
56         }
57
58         void addPrefix(final DOMDataTreeIdentifier prefix) {
59             prefixes.add(prefix);
60         }
61
62         DOMDataTreeShardProducer createProducer() {
63             return shard.getShard().createProducer(prefixes);
64         }
65
66         DOMDataTreeIdentifier getPrefix() {
67             return shard.getPrefix();
68         }
69     }
70
71     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class);
72     private static final int DEFAULT_SUBMIT_QUEUE_SIZE = 1000;
73
74     private final DOMDataTreePrefixTable<ChildShardContext> childShardsTable = DOMDataTreePrefixTable.create();
75     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
76     private final Collection<InMemoryDOMDataTreeShardProducer> producers = new HashSet<>();
77     private final InMemoryDOMDataTreeShardChangePublisher shardChangePublisher;
78     private final ListeningExecutorService executor;
79     private final DOMDataTreeIdentifier prefix;
80     private final DataTree dataTree;
81
82     private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final Executor dataTreeChangeExecutor,
83                                      final int maxDataChangeListenerQueueSize, final int submitQueueSize) {
84         this.prefix = Preconditions.checkNotNull(prefix);
85
86         final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
87         this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier());
88
89         this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor,
90                 maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
91
92         final FastThreadPoolExecutor fte = new FastThreadPoolExecutor(1, submitQueueSize, "Shard[" + prefix + "]");
93         fte.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerWaitsPolicy());
94         this.executor = MoreExecutors.listeningDecorator(fte);
95     }
96
97     public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
98                                                   final Executor dataTreeChangeExecutor,
99                                                   final int maxDataChangeListenerQueueSize) {
100         return new InMemoryDOMDataTreeShard(id.toOptimized(), dataTreeChangeExecutor,
101                 maxDataChangeListenerQueueSize, DEFAULT_SUBMIT_QUEUE_SIZE);
102     }
103
104     public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
105                                                   final Executor dataTreeChangeExecutor,
106                                                   final int maxDataChangeListenerQueueSize,
107                                                   final int submitQueueSize) {
108         return new InMemoryDOMDataTreeShard(id.toOptimized(), dataTreeChangeExecutor,
109                 maxDataChangeListenerQueueSize, submitQueueSize);
110     }
111
112     @Override
113     public void onGlobalContextUpdated(final SchemaContext context) {
114         dataTree.setSchemaContext(context);
115     }
116
117     @Override
118     public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
119         Preconditions.checkArgument(child != this, "Attempted to attach child %s onto self", this);
120         reparentChildShards(prefix, child);
121
122         final ChildShardContext context = createContextFor(prefix, child);
123         childShards.put(prefix, context);
124         childShardsTable.store(prefix, context);
125         updateProducers();
126     }
127
128     @Override
129     public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
130         childShards.remove(prefix);
131         childShardsTable.remove(prefix);
132         updateProducers();
133     }
134
135     private void updateProducers() {
136         for (InMemoryDOMDataTreeShardProducer p : producers) {
137             p.setModificationFactory(createModificationFactory(p.getPrefixes()));
138         }
139     }
140
141     @VisibleForTesting
142     InMemoryShardDataModificationFactory createModificationFactory(final Collection<DOMDataTreeIdentifier> prefixes) {
143         final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affected = new HashMap<>();
144         for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
145             for (final ChildShardContext child : childShards.values()) {
146                 final DOMDataTreeIdentifier bindPath;
147                 if (producerPrefix.contains(child.getPrefix())) {
148                     bindPath = child.getPrefix();
149                 } else if (child.getPrefix().contains(producerPrefix)) {
150                     // Bound path is inside subshard
151                     bindPath = producerPrefix;
152                 } else {
153                     continue;
154                 }
155
156                 SubshardProducerSpecification spec = affected.get(child.getPrefix());
157                 if (spec == null) {
158                     spec = new SubshardProducerSpecification(child);
159                     affected.put(child.getPrefix(), spec);
160                 }
161                 spec.addPrefix(bindPath);
162             }
163         }
164
165         final InmemoryShardDataModificationFactoryBuilder builder =
166                 new InmemoryShardDataModificationFactoryBuilder(prefix);
167         for (final SubshardProducerSpecification spec : affected.values()) {
168             final ForeignShardModificationContext foreignContext =
169                     new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
170             builder.addSubshard(foreignContext);
171             builder.addSubshard(spec.getPrefix(), foreignContext);
172         }
173
174         return builder.build();
175     }
176
177     @Override
178     public InMemoryDOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> prefixes) {
179         for (final DOMDataTreeIdentifier prodPrefix : prefixes) {
180             Preconditions.checkArgument(prefix.contains(prodPrefix), "Prefix %s is not contained under shart root",
181                     prodPrefix, prefix);
182         }
183
184         final InMemoryDOMDataTreeShardProducer ret = new InMemoryDOMDataTreeShardProducer(this, prefixes,
185                 createModificationFactory(prefixes));
186         producers.add(ret);
187         return ret;
188     }
189
190     void closeProducer(final InMemoryDOMDataTreeShardProducer producer) {
191         if (!producers.remove(producer)) {
192             LOG.warn("Producer {} not found in shard {}", producer, this);
193         }
194     }
195
196     @Nonnull
197     @Override
198     public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
199             @Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
200         return shardChangePublisher.registerTreeChangeListener(treeId, listener);
201     }
202
203     private void reparentChildShards(final DOMDataTreeIdentifier newChildPrefix, final DOMDataTreeShard newChild) {
204         final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren =
205                 childShards.entrySet().iterator();
206         final Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
207         while (actualChildren.hasNext()) {
208             final Entry<DOMDataTreeIdentifier, ChildShardContext> actualChild = actualChildren.next();
209             final DOMDataTreeIdentifier actualPrefix = actualChild.getKey();
210             Preconditions.checkArgument(!newChildPrefix.equals(actualPrefix),
211                     "Child shard with prefix %s already attached", newChildPrefix);
212             if (newChildPrefix.contains(actualPrefix)) {
213                 final ChildShardContext actualContext = actualChild.getValue();
214                 actualChildren.remove();
215                 newChild.onChildAttached(actualPrefix, actualContext.getShard());
216                 reparented.put(actualChild.getKey(), actualContext);
217                 childShardsTable.remove(actualPrefix);
218             }
219         }
220         updateProducersAndListeners(reparented);
221     }
222
223     private void updateProducersAndListeners(final Map<DOMDataTreeIdentifier, ChildShardContext> reparented) {
224         // FIXME: remove reparenting of producers, shards have to be registered from top to bottom
225         if (reparented.isEmpty()) {
226             //nothing was reparented no need to update anything
227             return;
228         }
229         throw new UnsupportedOperationException();
230     }
231
232     private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix,
233                                                       final DOMDataTreeShard child) {
234         Preconditions.checkArgument(child instanceof WriteableDOMDataTreeShard,
235                 "Child %s is not a writable shared", child);
236         return new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child);
237     }
238
239     private static TreeType treeTypeFor(final LogicalDatastoreType dsType) {
240         switch (dsType) {
241             case CONFIGURATION:
242                 return TreeType.CONFIGURATION;
243             case OPERATIONAL:
244                 return TreeType.OPERATIONAL;
245             default:
246                 throw new IllegalArgumentException("Unsupported Data Store type:" + dsType);
247         }
248     }
249
250     @VisibleForTesting
251     Map<DOMDataTreeIdentifier, DOMDataTreeShard> getChildShards() {
252         return ImmutableMap.copyOf(Maps.transformValues(childShards, ChildShardContext::getShard));
253     }
254
255     DataTreeSnapshot takeSnapshot() {
256         return dataTree.takeSnapshot();
257     }
258
259     InmemoryDOMDataTreeShardWriteTransaction createTransaction(final String transactionId,
260             final InMemoryDOMDataTreeShardProducer producer, final DataTreeSnapshot snapshot) {
261         Preconditions.checkArgument(snapshot instanceof CursorAwareDataTreeSnapshot);
262
263         return new InmemoryDOMDataTreeShardWriteTransaction(producer,
264                 producer.getModificationFactory().createModification((CursorAwareDataTreeSnapshot) snapshot), dataTree,
265                 shardChangePublisher, executor);
266     }
267 }