Merge "Resolve Bug:522"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataBrokerImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.Collections;
14 import java.util.List;
15 import java.util.Map.Entry;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.atomic.AtomicLong;
19
20 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
24 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
25 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
28 import org.opendaylight.controller.sal.common.util.Rpcs;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
35 import org.opendaylight.yangtools.concepts.ListenerRegistration;
36 import org.opendaylight.yangtools.yang.common.RpcError;
37 import org.opendaylight.yangtools.yang.common.RpcResult;
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.google.common.base.Function;
44 import com.google.common.base.Optional;
45 import com.google.common.collect.ImmutableList;
46 import com.google.common.collect.ImmutableList.Builder;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.util.concurrent.Futures;
49 import com.google.common.util.concurrent.ListenableFuture;
50 import com.google.common.util.concurrent.ListeningExecutorService;
51
52 public class DOMDataBrokerImpl implements DOMDataBroker, AutoCloseable {
53
54     private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class);
55     private static final Logger COORDINATOR_LOG = LoggerFactory.getLogger(CommitCoordination.class);
56     private final ImmutableMap<LogicalDatastoreType, DOMStore> datastores;
57     private final ListeningExecutorService executor;
58     private final AtomicLong txNum = new AtomicLong();
59
60     public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
61             final ListeningExecutorService executor) {
62         super();
63         this.datastores = datastores;
64         this.executor = executor;
65     }
66
67     private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
68
69         @Override
70         public Boolean apply(final Iterable<Boolean> input) {
71
72             for (Boolean value : input) {
73                 if (value == false) {
74                     return Boolean.FALSE;
75                 }
76             }
77             return Boolean.TRUE;
78         }
79     };
80
81     @Override
82     public DOMDataReadTransaction newReadOnlyTransaction() {
83         ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadTransaction> builder = ImmutableMap.builder();
84         for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
85             builder.put(store.getKey(), store.getValue().newReadOnlyTransaction());
86         }
87         return new ReadOnlyTransactionImpl(newTransactionIdentifier(), builder.build());
88     }
89
90     private Object newTransactionIdentifier() {
91         return "DOM-" + txNum.getAndIncrement();
92     }
93
94     @Override
95     public DOMDataReadWriteTransaction newReadWriteTransaction() {
96         ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadWriteTransaction> builder = ImmutableMap.builder();
97         for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
98             builder.put(store.getKey(), store.getValue().newReadWriteTransaction());
99         }
100         return new ReadWriteTransactionImpl(newTransactionIdentifier(), builder.build(), this);
101     }
102
103     @Override
104     public DOMDataWriteTransaction newWriteOnlyTransaction() {
105         ImmutableMap.Builder<LogicalDatastoreType, DOMStoreWriteTransaction> builder = ImmutableMap.builder();
106         for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
107             builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction());
108         }
109         return new WriteTransactionImpl<DOMStoreWriteTransaction>(newTransactionIdentifier(), builder.build(), this);
110     }
111
112     @Override
113     public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
114             final InstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
115
116         DOMStore potentialStore = datastores.get(store);
117         checkState(potentialStore != null, "Requested logical data store is not available.");
118         return potentialStore.registerChangeListener(path, listener, triggeringScope);
119     }
120
121     private ListenableFuture<RpcResult<TransactionStatus>> submit(
122             final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
123         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
124         return executor.submit(new CommitCoordination(transaction));
125     }
126
127     private abstract static class AbstractCompositeTransaction<K, T extends DOMStoreTransaction> implements
128             AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
129
130         private final ImmutableMap<K, T> backingTxs;
131         private final Object identifier;
132
133         protected AbstractCompositeTransaction(final Object identifier, final ImmutableMap<K, T> backingTxs) {
134             this.identifier = checkNotNull(identifier, "Identifier should not be null");
135             this.backingTxs = checkNotNull(backingTxs, "Backing transactions should not be null");
136         }
137
138         protected T getSubtransaction(final K key) {
139             return backingTxs.get(key);
140         }
141
142         public Iterable<T> getSubtransactions() {
143             return backingTxs.values();
144         }
145
146         @Override
147         public Object getIdentifier() {
148             return identifier;
149         }
150
151         @Override
152         public void close() {
153             try {
154                 for (T subtransaction : backingTxs.values()) {
155                     subtransaction.close();
156                 }
157             } catch (Exception e) {
158                 throw new IllegalStateException("Uncaught exception occured during closing transaction.", e);
159             }
160         }
161
162     }
163
164     private static class ReadOnlyTransactionImpl extends
165             AbstractCompositeTransaction<LogicalDatastoreType, DOMStoreReadTransaction> implements
166             DOMDataReadTransaction {
167
168         protected ReadOnlyTransactionImpl(final Object identifier,
169                 final ImmutableMap<LogicalDatastoreType, DOMStoreReadTransaction> backingTxs) {
170             super(identifier, backingTxs);
171         }
172
173         @Override
174         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
175                 final InstanceIdentifier path) {
176             return getSubtransaction(store).read(path);
177         }
178
179     }
180
181     private static class WriteTransactionImpl<T extends DOMStoreWriteTransaction> extends
182             AbstractCompositeTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
183
184         private final DOMDataBrokerImpl broker;
185         private ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts;
186
187         protected WriteTransactionImpl(final Object identifier, final ImmutableMap<LogicalDatastoreType, T> backingTxs,
188                 final DOMDataBrokerImpl broker) {
189             super(identifier, backingTxs);
190             this.broker = broker;
191         }
192
193         public Iterable<DOMStoreThreePhaseCommitCohort> ready() {
194             checkState(cohorts == null, "Transaction was already marked as ready.");
195             ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
196             for (DOMStoreWriteTransaction subTx : getSubtransactions()) {
197                 cohortsBuilder.add(subTx.ready());
198             }
199             cohorts = cohortsBuilder.build();
200             return cohorts;
201         }
202
203         protected ImmutableList<DOMStoreThreePhaseCommitCohort> getCohorts() {
204             return cohorts;
205         }
206
207         @Override
208         public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
209             getSubtransaction(store).write(path, data);
210         }
211
212         @Override
213         public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
214             getSubtransaction(store).delete(path);
215         }
216
217         @Override
218         public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
219                 final NormalizedNode<?, ?> data) {
220             // TODO Auto-generated method stub
221             throw new UnsupportedOperationException("Not implemented yet.");
222         }
223
224         @Override
225         public void cancel() {
226             // TODO Auto-generated method stub
227
228         }
229
230         @Override
231         public ListenableFuture<RpcResult<TransactionStatus>> commit() {
232
233             ready();
234             return broker.submit(this);
235         }
236
237     }
238
239     private static class ReadWriteTransactionImpl extends WriteTransactionImpl<DOMStoreReadWriteTransaction> implements
240             DOMDataReadWriteTransaction {
241
242         protected ReadWriteTransactionImpl(final Object identifier,
243                 final ImmutableMap<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
244                 final DOMDataBrokerImpl broker) {
245             // super(identifier, backingTxs);
246             super(identifier, backingTxs, broker);
247         }
248
249         @Override
250         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
251                 final InstanceIdentifier path) {
252             return getSubtransaction(store).read(path);
253         }
254
255         @Override
256         public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
257                 final NormalizedNode<?, ?> data) {
258
259         }
260     }
261
262     private final class CommitCoordination implements Callable<RpcResult<TransactionStatus>> {
263
264         private final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction;
265
266         public CommitCoordination(final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
267             this.transaction = transaction;
268         }
269
270         @Override
271         public RpcResult<TransactionStatus> call() throws Exception {
272
273             try {
274                 Boolean canCommit = canCommit().get();
275
276                 if (canCommit) {
277                     try {
278                         preCommit().get();
279                         try {
280                             commit().get();
281                             COORDINATOR_LOG.debug("Tx: {} Is commited.", transaction.getIdentifier());
282                             return Rpcs.getRpcResult(true, TransactionStatus.COMMITED,
283                                     Collections.<RpcError> emptySet());
284
285                         } catch (InterruptedException | ExecutionException e) {
286                             COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e);
287                         }
288
289                     } catch (InterruptedException | ExecutionException e) {
290                         COORDINATOR_LOG.warn("Tx: {} Error during preCommit, starting Abort",
291                                 transaction.getIdentifier(), e);
292                     }
293                 } else {
294                     COORDINATOR_LOG.info("Tx: {} Did not pass canCommit phase.", transaction.getIdentifier());
295                     abort().get();
296                 }
297             } catch (InterruptedException | ExecutionException e) {
298                 COORDINATOR_LOG.warn("Tx: {} Error during canCommit, starting Abort", transaction.getIdentifier(), e);
299
300             }
301             try {
302                 abort().get();
303             } catch (InterruptedException | ExecutionException e) {
304                 COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e);
305             }
306             return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError> emptySet());
307         }
308
309         public ListenableFuture<Void> preCommit() {
310             COORDINATOR_LOG.debug("Transaction {}: PreCommit Started ", transaction.getIdentifier());
311             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
312             for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
313                 ops.add(cohort.preCommit());
314             }
315             return (ListenableFuture) Futures.allAsList(ops.build());
316         }
317
318         public ListenableFuture<Void> commit() {
319             COORDINATOR_LOG.debug("Transaction {}: Commit Started ", transaction.getIdentifier());
320             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
321             for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
322                 ops.add(cohort.commit());
323             }
324             return (ListenableFuture) Futures.allAsList(ops.build());
325         }
326
327         public ListenableFuture<Boolean> canCommit() {
328             COORDINATOR_LOG.debug("Transaction {}: CanCommit Started ", transaction.getIdentifier());
329             Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
330             for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
331                 canCommitOperations.add(cohort.canCommit());
332             }
333             ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
334             return Futures.transform(allCanCommits, AND_FUNCTION);
335         }
336
337         public ListenableFuture<Void> abort() {
338             COORDINATOR_LOG.debug("Transaction {}: Abort Started ", transaction.getIdentifier());
339             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
340             for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
341                 ops.add(cohort.abort());
342             }
343             return (ListenableFuture) Futures.allAsList(ops.build());
344         };
345
346     }
347
348     @Override
349     public void close() throws Exception {
350
351     }
352
353 }