Fix InMemory shard transaction chaining.
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InMemoryDOMDataTreeShard.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.dom.store.inmemory;
10
11 import com.google.common.annotations.Beta;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.concurrent.ExecutorService;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
28 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
31 import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
37 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
40
41 @Beta
42 public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener {
43
44     private static final int DEFAULT_SUBMIT_QUEUE_SIZE = 1000;
45
46     private static final class SubshardProducerSpecification {
47         private final Collection<DOMDataTreeIdentifier> prefixes = new ArrayList<>(1);
48         private final ChildShardContext shard;
49
50         SubshardProducerSpecification(final ChildShardContext subshard) {
51             this.shard = Preconditions.checkNotNull(subshard);
52         }
53
54         void addPrefix(final DOMDataTreeIdentifier prefix) {
55             prefixes.add(prefix);
56         }
57
58         DOMDataTreeShardProducer createProducer() {
59             return shard.getShard().createProducer(prefixes);
60         }
61
62         public DOMDataTreeIdentifier getPrefix() {
63             return shard.getPrefix();
64         }
65     }
66
67     private final DOMDataTreePrefixTable<ChildShardContext> childShardsTable = DOMDataTreePrefixTable.create();
68     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
69     private final DOMDataTreeIdentifier prefix;
70     private final DataTree dataTree;
71     private final InMemoryDOMDataTreeShardChangePublisher shardChangePublisher;
72     private final ListeningExecutorService executor;
73
74     private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor,
75                                      final int maxDataChangeListenerQueueSize, final int submitQueueSize) {
76         this.prefix = Preconditions.checkNotNull(prefix);
77
78         final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
79         this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier());
80
81         this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor,
82                 maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
83
84         final FastThreadPoolExecutor fte = new FastThreadPoolExecutor(1, submitQueueSize, "Shard[" + prefix + "]");
85         fte.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerWaitsPolicy());
86         this.executor = MoreExecutors.listeningDecorator(fte);
87     }
88
89     public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
90                                                   final ExecutorService dataTreeChangeExecutor,
91                                                   final int maxDataChangeListenerQueueSize) {
92         return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor,
93                 maxDataChangeListenerQueueSize, DEFAULT_SUBMIT_QUEUE_SIZE);
94     }
95
96     public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
97                                                   final ExecutorService dataTreeChangeExecutor,
98                                                   final int maxDataChangeListenerQueueSize,
99                                                   final int submitQueueSize) {
100         return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor,
101                 maxDataChangeListenerQueueSize, submitQueueSize);
102     }
103
104     @Override
105     public void onGlobalContextUpdated(final SchemaContext context) {
106         dataTree.setSchemaContext(context);
107     }
108
109     @Override
110     public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
111         Preconditions.checkArgument(child != this, "Attempted to attach child %s onto self", this);
112         reparentChildShards(prefix, child);
113         addChildShard(prefix, child);
114     }
115
116     @Override
117     public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
118         childShards.remove(prefix);
119         childShardsTable.remove(prefix);
120     }
121
122     @Override
123     public InMemoryDOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> prefixes) {
124         for (final DOMDataTreeIdentifier prodPrefix : prefixes) {
125             Preconditions.checkArgument(prefix.contains(prodPrefix), "Prefix %s is not contained under shart root",
126                 prodPrefix, prefix);
127         }
128         return new InMemoryDOMDataTreeShardProducer(this, prefixes);
129     }
130
131     @Nonnull
132     @Override
133     public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
134             @Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
135         return shardChangePublisher.registerTreeChangeListener(treeId, listener);
136     }
137
138     private void addChildShard(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
139         final ChildShardContext context = createContextFor(prefix, child);
140         childShards.put(prefix, context);
141         childShardsTable.store(prefix, context);
142     }
143
144     private void reparentChildShards(final DOMDataTreeIdentifier newChildPrefix, final DOMDataTreeShard newChild) {
145         final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren =
146                 childShards.entrySet().iterator();
147         final Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
148         while (actualChildren.hasNext()) {
149             final Entry<DOMDataTreeIdentifier, ChildShardContext> actualChild = actualChildren.next();
150             final DOMDataTreeIdentifier actualPrefix = actualChild.getKey();
151             Preconditions.checkArgument(!newChildPrefix.equals(actualPrefix),
152                     "Child shard with prefix %s already attached", newChildPrefix);
153             if (newChildPrefix.contains(actualPrefix)) {
154                 final ChildShardContext actualContext = actualChild.getValue();
155                 actualChildren.remove();
156                 newChild.onChildAttached(actualPrefix, actualContext.getShard());
157                 reparented.put(actualChild.getKey(), actualContext);
158                 childShardsTable.remove(actualPrefix);
159             }
160         }
161         updateProducersAndListeners(reparented);
162     }
163
164     private void updateProducersAndListeners(final Map<DOMDataTreeIdentifier, ChildShardContext> reparented) {
165         // FIXME: remove reparenting of producers, shards have to be registered from top to bottom
166         if (reparented.isEmpty()) {
167             //nothing was reparented no need to update anything
168             return;
169         }
170         throw new UnsupportedOperationException();
171     }
172
173     private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix,
174             final DOMDataTreeShard child) {
175         Preconditions.checkArgument(child instanceof WriteableDOMDataTreeShard,
176             "Child %s is not a writable shared", child);
177         return new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child);
178     }
179
180     private static TreeType treeTypeFor(final LogicalDatastoreType dsType) {
181         switch (dsType) {
182             case CONFIGURATION:
183                 return TreeType.CONFIGURATION;
184             case OPERATIONAL:
185                 return TreeType.OPERATIONAL;
186             default:
187                 throw new IllegalArgumentException("Unsupported Data Store type:" + dsType);
188         }
189     }
190
191     @VisibleForTesting
192     Map<DOMDataTreeIdentifier, DOMDataTreeShard> getChildShards() {
193         final Map<DOMDataTreeIdentifier, DOMDataTreeShard> ret = new HashMap<>();
194         for (final Entry<DOMDataTreeIdentifier, ChildShardContext> entry : childShards.entrySet()) {
195             ret.put(entry.getKey(), entry.getValue().getShard());
196         }
197         return ret;
198     }
199
200     DataTreeSnapshot takeSnapshot() {
201         return dataTree.takeSnapshot();
202     }
203
204     InmemoryDOMDataTreeShardWriteTransaction createTransaction(final String transactionId,
205                                                                final InMemoryDOMDataTreeShardProducer producer,
206                                                                final Collection<DOMDataTreeIdentifier> prefixes,
207                                                                final DataTreeSnapshot snapshot) {
208
209         return createTxForSnapshot(producer, prefixes, (CursorAwareDataTreeSnapshot) snapshot);
210     }
211
212     private InmemoryDOMDataTreeShardWriteTransaction createTxForSnapshot(
213             final InMemoryDOMDataTreeShardProducer producer,
214             final Collection<DOMDataTreeIdentifier> prefixes,
215             final CursorAwareDataTreeSnapshot snapshot) {
216
217         final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
218         for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
219             for (final ChildShardContext maybeAffected : childShards.values()) {
220                 final DOMDataTreeIdentifier bindPath;
221                 if (producerPrefix.contains(maybeAffected.getPrefix())) {
222                     bindPath = maybeAffected.getPrefix();
223                 } else if (maybeAffected.getPrefix().contains(producerPrefix)) {
224                     // Bound path is inside subshard
225                     bindPath = producerPrefix;
226                 } else {
227                     continue;
228                 }
229
230                 SubshardProducerSpecification spec = affectedSubshards.get(maybeAffected.getPrefix());
231                 if (spec == null) {
232                     spec = new SubshardProducerSpecification(maybeAffected);
233                     affectedSubshards.put(maybeAffected.getPrefix(), spec);
234                 }
235                 spec.addPrefix(bindPath);
236             }
237         }
238
239         final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix, snapshot);
240         final ShardDataModificationBuilder builder = new ShardDataModificationBuilder(rootContext);
241         for (final SubshardProducerSpecification spec : affectedSubshards.values()) {
242             final ForeignShardModificationContext foreignContext =
243                     new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
244             builder.addSubshard(foreignContext);
245             builder.addSubshard(spec.getPrefix(), foreignContext);
246         }
247
248         return new InmemoryDOMDataTreeShardWriteTransaction(producer, builder.build(),
249                 dataTree, shardChangePublisher, executor);
250     }
251
252 }