d6f44bab17fb1d41c9b79488cf0957fc721f4e3b
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / ShardedDOMDataTreeWriteTransaction.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.ImmutableSet;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.Collection;
16 import java.util.Deque;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.LinkedList;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Set;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.atomic.AtomicLong;
26 import javax.annotation.Nonnull;
27 import javax.annotation.concurrent.GuardedBy;
28 import javax.annotation.concurrent.NotThreadSafe;
29 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
34 import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardProducer;
35 import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardWriteTransaction;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 @NotThreadSafe
43 final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAwareTransaction {
44     private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTreeWriteTransaction.class);
45     private static final AtomicLong COUNTER = new AtomicLong();
46     private final Map<DOMDataTreeIdentifier, DOMDataTreeShardWriteTransaction> idToTransaction;
47     private final ShardedDOMDataTreeProducer producer;
48     private final String identifier;
49     private final Set<YangInstanceIdentifier> childBoundaries = new HashSet<>();
50     @GuardedBy("this")
51     private boolean closed =  false;
52
53     @GuardedBy("this")
54     private DOMDataTreeWriteCursor openCursor;
55
56     ShardedDOMDataTreeWriteTransaction(final ShardedDOMDataTreeProducer producer,
57                                        final Map<DOMDataTreeIdentifier, DOMDataTreeShardProducer> idToProducer,
58                                        final Map<DOMDataTreeIdentifier, DOMDataTreeProducer> childProducers) {
59         this.producer = Preconditions.checkNotNull(producer);
60         idToTransaction = new HashMap<>();
61         Preconditions.checkNotNull(idToProducer).forEach((id, prod) -> idToTransaction.put(id, prod.createTransaction()));
62         this.identifier = "SHARDED-DOM-" + COUNTER.getAndIncrement();
63         childProducers.forEach((id, prod) -> childBoundaries.add(id.getRootIdentifier()));
64     }
65
66     // FIXME: use atomic operations
67     @GuardedBy("this")
68     private DOMDataTreeShardWriteTransaction lookup(final DOMDataTreeIdentifier prefix) {
69         for (final Entry<DOMDataTreeIdentifier, DOMDataTreeShardWriteTransaction> e : idToTransaction.entrySet()) {
70             if (e.getKey().contains(prefix)) {
71                 Preconditions.checkArgument(!producer.isDelegatedToChild(prefix),
72                         "Path %s is delegated to child producer.",
73                         prefix);
74                 return e.getValue();
75             }
76         }
77         throw new IllegalArgumentException(String.format("Path %s is not accessible from transaction %s", prefix, this));
78     }
79
80     @Override
81     public String getIdentifier() {
82         return identifier;
83     }
84
85     @Override
86     public synchronized boolean cancel() {
87         if (closed) {
88             return false;
89         }
90
91         LOG.debug("Cancelling transaction {}", identifier);
92         if (openCursor != null) {
93             openCursor.close();
94         }
95         for (final DOMDataTreeShardWriteTransaction tx : ImmutableSet.copyOf(idToTransaction.values())) {
96             tx.close();
97         }
98
99         closed = true;
100         producer.cancelTransaction(this);
101         return true;
102     }
103
104     @Override
105     public synchronized DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
106         Preconditions.checkState(!closed, "Transaction is closed already");
107         Preconditions.checkState(openCursor == null, "There is still a cursor open");
108         final DOMDataTreeShardWriteTransaction lookup = lookup(prefix);
109         openCursor = new DelegatingCursor(lookup.createCursor(prefix), prefix);
110         return openCursor;
111     }
112
113     @Override
114     public synchronized CheckedFuture<Void, TransactionCommitFailedException> submit() {
115         Preconditions.checkState(!closed, "Transaction %s is already closed", identifier);
116         Preconditions.checkState(openCursor == null, "Cannot submit transaction while there is a cursor open");
117
118         final Set<DOMDataTreeShardWriteTransaction> txns = ImmutableSet.copyOf(idToTransaction.values());
119         for (final DOMDataTreeShardWriteTransaction tx : txns) {
120             tx.ready();
121         }
122         producer.transactionSubmitted(this);
123         try {
124             return Futures.immediateCheckedFuture(new SubmitCoordinationTask(identifier, txns).call());
125         } catch (final TransactionCommitFailedException e) {
126             return Futures.immediateFailedCheckedFuture(e);
127         }
128     }
129
130     synchronized void cursorClosed() {
131         openCursor = null;
132     }
133
134     private class DelegatingCursor implements DOMDataTreeWriteCursor {
135
136         private final DOMDataTreeWriteCursor delegate;
137         private final Deque<PathArgument> path = new LinkedList<>();
138
139         public DelegatingCursor(final DOMDataTreeWriteCursor delegate, final DOMDataTreeIdentifier rootPosition) {
140             this.delegate = delegate;
141             path.addAll(rootPosition.getRootIdentifier().getPathArguments());
142         }
143
144         @Override
145         public void enter(@Nonnull final PathArgument child) {
146             checkAvailable(child);
147             path.push(child);
148             delegate.enter(child);
149         }
150
151         @Override
152         public void enter(@Nonnull final PathArgument... path) {
153             for (final PathArgument pathArgument : path) {
154                 enter(pathArgument);
155             }
156         }
157
158         @Override
159         public void enter(@Nonnull final Iterable<PathArgument> path) {
160             for (final PathArgument pathArgument : path) {
161                 enter(pathArgument);
162             }
163         }
164
165         @Override
166         public void exit() {
167             path.pop();
168             delegate.exit();
169         }
170
171         @Override
172         public void exit(final int depth) {
173             for (int i = 0; i < depth; i++) {
174                 path.pop();
175             }
176             delegate.exit(depth);
177         }
178
179         @Override
180         public void close() {
181             delegate.close();
182             cursorClosed();
183         }
184
185         @Override
186         public void delete(final PathArgument child) {
187             checkAvailable(child);
188             delegate.delete(child);
189         }
190
191         @Override
192         public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
193             checkAvailable(child);
194             delegate.merge(child, data);
195         }
196
197         @Override
198         public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
199             checkAvailable(child);
200             delegate.write(child, data);
201         }
202
203         void checkAvailable(final PathArgument child) {
204             path.add(child);
205             final YangInstanceIdentifier yid = YangInstanceIdentifier.create(path);
206             childBoundaries.forEach(id -> {
207                 if (id.contains(yid)) {
208                     path.removeLast();
209                     throw new IllegalArgumentException("Path {" + yid + "} is not available to this cursor since it's already claimed by a child producer");
210                 }
211             });
212             path.removeLast();
213         }
214     }
215
216     private static class SubmitCoordinationTask implements Callable<Void> {
217
218         private static final Logger LOG = LoggerFactory.getLogger(SubmitCoordinationTask.class);
219
220         private final String identifier;
221         private final Collection<DOMDataTreeShardWriteTransaction> transactions;
222
223         SubmitCoordinationTask(final String identifier,
224                                     final Collection<DOMDataTreeShardWriteTransaction> transactions) {
225             this.identifier = identifier;
226             this.transactions = transactions;
227         }
228
229         @Override
230         public Void call() throws TransactionCommitFailedException {
231
232             try {
233                 LOG.debug("Producer {}, submit started", identifier);
234                 submitBlocking();
235
236                 return null;
237             } catch (final TransactionCommitFailedException e) {
238                 LOG.warn("Failure while submitting transaction for producer {}", identifier, e);
239                 //FIXME abort here
240                 throw e;
241             }
242         }
243
244         void submitBlocking() throws TransactionCommitFailedException {
245             for (final ListenableFuture<?> commit : submitAll()) {
246                 try {
247                     commit.get();
248                 } catch (InterruptedException | ExecutionException e) {
249                     throw new TransactionCommitFailedException("Submit failed", e);
250                 }
251             }
252         }
253
254         private ListenableFuture<?>[] submitAll() {
255             final ListenableFuture<?>[] ops = new ListenableFuture<?>[transactions.size()];
256             int i = 0;
257             for (final DOMDataTreeShardWriteTransaction tx : transactions) {
258                 ops[i++] = tx.submit();
259             }
260             return ops;
261         }
262     }
263 }