be27ddac52d8dd90d6f69c629990bdd5722d35da
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / legacy / sharded / adapter / ShardedDOMDataBrokerDelegatingReadWriteTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.sal.dom.broker.impl.legacy.sharded.adapter;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static com.google.common.base.Preconditions.checkState;
13
14 import com.google.common.base.Function;
15 import com.google.common.base.Optional;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.Lists;
18 import com.google.common.collect.Maps;
19 import com.google.common.util.concurrent.CheckedFuture;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import com.google.common.util.concurrent.SettableFuture;
25 import java.util.Map;
26 import java.util.Queue;
27 import javax.annotation.Nullable;
28 import javax.annotation.concurrent.NotThreadSafe;
29 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
35 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
36 import org.opendaylight.yangtools.yang.common.RpcResult;
37 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
44 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46
47 /**
48  * Read/write transaction that delegates write and initial read to {@link org.opendaylight.mdsal.dom.broker.ShardedDOMWriteTransactionAdapter}
49  * and {@link org.opendaylight.mdsal.dom.broker.ShardedDOMReadTransactionAdapter}
50  * respectively. These two in turn rely on shard aware implementation of {@link org.opendaylight.mdsal.dom.api.DOMDataTreeService}.
51  * <p>
52  * Since reading data distributed on different subshards is not guaranteed to
53  * return all relevant data, best effort is to try to operate only on single
54  * subtree in conceptual data tree. We define this subtree by first write
55  * operation performed on transaction. All next read and write operations
56  * should be performed just in this initial subtree.
57  */
58 // FIXME explicitly enforce just one subtree requirement
59 @NotThreadSafe
60 class ShardedDOMDataBrokerDelegatingReadWriteTransaction implements DOMDataReadWriteTransaction {
61     private static final ListenableFuture<RpcResult<TransactionStatus>> SUCCESS_FUTURE =
62             Futures.immediateFuture(RpcResultBuilder.success(TransactionStatus.COMMITED).build());
63
64     private final DOMDataReadOnlyTransaction readTxDelegate;
65     private final DOMDataWriteTransaction writeTxDelegate;
66     private final Object txIdentifier;
67     private final ImmutableMap<LogicalDatastoreType, Queue<Modification>> modificationHistoryMap;
68     private final ImmutableMap<LogicalDatastoreType, DataTreeSnapshot> snapshotMap;
69     private final Map<LogicalDatastoreType, ListenableFuture<Optional<NormalizedNode<?, ?>>>> initialReadMap;
70     private YangInstanceIdentifier root = null;
71
72     public ShardedDOMDataBrokerDelegatingReadWriteTransaction(final Object readWriteTxId, final SchemaContext ctx,
73                                                               final DOMDataReadOnlyTransaction readTxDelegate,
74                                                               final DOMDataWriteTransaction writeTxDelegate) {
75         this.readTxDelegate = checkNotNull(readTxDelegate);
76         this.writeTxDelegate = checkNotNull(writeTxDelegate);
77         this.txIdentifier = checkNotNull(readWriteTxId);
78         this.initialReadMap = Maps.newEnumMap(LogicalDatastoreType.class);
79
80         final InMemoryDataTreeFactory treeFactory = InMemoryDataTreeFactory.getInstance();
81         final ImmutableMap.Builder<LogicalDatastoreType, DataTreeSnapshot> snapshotMapBuilder = ImmutableMap.builder();
82         final ImmutableMap.Builder<LogicalDatastoreType, Queue<Modification>> modificationHistoryMapBuilder =
83                 ImmutableMap.builder();
84         for (final LogicalDatastoreType store : LogicalDatastoreType.values()) {
85             final DataTree tree = treeFactory.create(treeTypeForStore(store));
86             tree.setSchemaContext(ctx);
87             snapshotMapBuilder.put(store, tree.takeSnapshot());
88
89             modificationHistoryMapBuilder.put(store, Lists.newLinkedList());
90         }
91
92         modificationHistoryMap = modificationHistoryMapBuilder.build();
93         snapshotMap = snapshotMapBuilder.build();
94     }
95
96     @Override
97     public boolean cancel() {
98         readTxDelegate.close();
99         return writeTxDelegate.cancel();
100     }
101
102     @Override
103     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
104         if (root == null) {
105             initialRead(path);
106         }
107
108         modificationHistoryMap.get(store).add(new Modification(Modification.Operation.DELETE, path, null));
109         writeTxDelegate.delete(store, path);
110     }
111
112     @Override
113     public CheckedFuture<Void, TransactionCommitFailedException> submit() {
114         return writeTxDelegate.submit();
115     }
116
117     @Override
118     public ListenableFuture<RpcResult<TransactionStatus>> commit() {
119         return Futures.transformAsync(submit(), input -> SUCCESS_FUTURE, MoreExecutors.directExecutor());
120     }
121
122     @Override
123     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
124                                                                                    final YangInstanceIdentifier path) {
125         checkState(root != null, "A modify operation (put, merge or delete) must be performed prior to a read operation");
126         final SettableFuture<Optional<NormalizedNode<?, ?>>> readResult = SettableFuture.create();
127         final Queue<Modification> currentHistory = Lists.newLinkedList(modificationHistoryMap.get(store));
128         Futures.addCallback(initialReadMap.get(store), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
129             @Override
130             public void onSuccess(@Nullable final Optional<NormalizedNode<?, ?>> result) {
131                 final DataTreeModification mod = snapshotMap.get(store).newModification();
132                 if (result.isPresent()) {
133                     mod.write(path, result.get());
134                 }
135                 applyModificationHistoryToSnapshot(mod, currentHistory);
136                 readResult.set(mod.readNode(path));
137             }
138
139             @Override
140             public void onFailure(final Throwable t) {
141                 readResult.setException(t);
142             }
143         }, MoreExecutors.directExecutor());
144
145         return Futures.makeChecked(readResult, ReadFailedException.MAPPER);
146     }
147
148     @Override
149     public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
150                                                               final YangInstanceIdentifier path) {
151         checkState(root != null, "A modify operation (put, merge or delete) must be performed prior to an exists operation");
152         return Futures.makeChecked(Futures.transform(read(store, path),
153                 (Function<Optional<NormalizedNode<?, ?>>, Boolean>) Optional::isPresent),
154                 ReadFailedException.MAPPER);
155     }
156
157     @Override
158     public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
159                     final NormalizedNode<?, ?> data) {
160         if (root == null) {
161             initialRead(path);
162         }
163
164         modificationHistoryMap.get(store).add(new Modification(Modification.Operation.WRITE, path, data));
165         writeTxDelegate.put(store, path, data);
166     }
167
168     @Override
169     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
170                       final NormalizedNode<?, ?> data) {
171         if (root == null) {
172             initialRead(path);
173         }
174
175         modificationHistoryMap.get(store).add(new Modification(Modification.Operation.MERGE, path, data));
176         writeTxDelegate.merge(store, path, data);
177     }
178
179     @Override
180     public Object getIdentifier() {
181         return txIdentifier;
182     }
183
184     private void initialRead(final YangInstanceIdentifier path) {
185         root = path;
186
187         final InMemoryDataTreeFactory treeFactory = InMemoryDataTreeFactory.getInstance();
188         for (final LogicalDatastoreType store : LogicalDatastoreType.values()) {
189             initialReadMap.put(store, readTxDelegate.read(store, path));
190         }
191     }
192
193     private TreeType treeTypeForStore(final LogicalDatastoreType store) {
194         return store == LogicalDatastoreType.CONFIGURATION ? TreeType.CONFIGURATION : TreeType.OPERATIONAL;
195     }
196
197     private void applyModificationHistoryToSnapshot(final DataTreeModification dataTreeModification,
198                                                     final Queue<Modification> modificationHistory) {
199         while (!modificationHistory.isEmpty()) {
200             final Modification modification = modificationHistory.poll();
201             switch (modification.getOperation()) {
202                 case WRITE:
203                     dataTreeModification.write(modification.getPath(), modification.getData());
204                     break;
205                 case MERGE:
206                     dataTreeModification.merge(modification.getPath(), modification.getData());
207                     break;
208                 case DELETE:
209                     dataTreeModification.delete(modification.getPath());
210                     break;
211                 default:
212                     // NOOP
213             }
214         }
215     }
216
217     static class Modification {
218
219         enum Operation {
220             WRITE, MERGE, DELETE
221         }
222
223         private final NormalizedNode<?, ?> data;
224         private final YangInstanceIdentifier path;
225         private final Operation operation;
226
227         Modification(final Operation operation, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
228             this.data = data;
229             this.path = checkNotNull(path);
230             this.operation = checkNotNull(operation);
231         }
232
233         Operation getOperation() {
234             return operation;
235         }
236
237         YangInstanceIdentifier getPath() {
238             return path;
239         }
240
241         NormalizedNode<?, ?> getData() {
242             return data;
243         }
244     }
245 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.