Merge "Bug 499: Added support for change listeners."
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataBrokerImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.List;
14 import java.util.Map.Entry;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.ExecutionException;
17
18 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
21 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
22 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
23 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
24 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.concepts.ListenerRegistration;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.google.common.base.Function;
40 import com.google.common.base.Optional;
41 import com.google.common.collect.ImmutableList;
42 import com.google.common.collect.ImmutableList.Builder;
43 import com.google.common.collect.ImmutableMap;
44 import com.google.common.util.concurrent.Futures;
45 import com.google.common.util.concurrent.ListenableFuture;
46 import com.google.common.util.concurrent.ListeningExecutorService;
47
48 public class DOMDataBrokerImpl implements DOMDataBroker {
49
50     private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class);
51     private static final Logger COORDINATOR_LOG = LoggerFactory.getLogger(CommitCoordination.class);
52     private final ImmutableMap<LogicalDatastoreType, DOMStore> datastores;
53     private final ListeningExecutorService executor;
54
55     public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
56             final ListeningExecutorService executor) {
57         super();
58         this.datastores = datastores;
59         this.executor = executor;
60     }
61
62     private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
63
64         @Override
65         public Boolean apply(final Iterable<Boolean> input) {
66
67             for (Boolean value : input) {
68                 if (value == false) {
69                     return Boolean.FALSE;
70                 }
71             }
72             return Boolean.TRUE;
73         }
74     };
75
76     @Override
77     public DOMDataReadTransaction newReadOnlyTransaction() {
78         ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadTransaction> builder = ImmutableMap.builder();
79         for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
80             builder.put(store.getKey(), store.getValue().newReadOnlyTransaction());
81         }
82         return new ReadOnlyTransactionImpl(newTransactionIdentifier(), builder.build());
83     }
84
85     private Object newTransactionIdentifier() {
86         return new Object();
87     }
88
89     @Override
90     public DOMDataReadWriteTransaction newReadWriteTransaction() {
91         ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadWriteTransaction> builder = ImmutableMap.builder();
92         for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
93             builder.put(store.getKey(), store.getValue().newReadWriteTransaction());
94         }
95         return new ReadWriteTransactionImpl(newTransactionIdentifier(), builder.build(), this);
96     }
97
98     @Override
99     public DOMDataWriteTransaction newWriteOnlyTransaction() {
100         ImmutableMap.Builder<LogicalDatastoreType, DOMStoreWriteTransaction> builder = ImmutableMap.builder();
101         for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
102             builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction());
103         }
104         return new WriteTransactionImpl<DOMStoreWriteTransaction>(newTransactionIdentifier(), builder.build(), this);
105     }
106
107     @Override
108     public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
109             final InstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
110
111         DOMStore potentialStore = datastores.get(store);
112         checkState(potentialStore != null, "Requested logical data store is not available.");
113         return potentialStore.registerChangeListener(path, listener, triggeringScope);
114     }
115
116     private ListenableFuture<RpcResult<TransactionStatus>> submit(
117             final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
118         return executor.submit(new CommitCoordination(transaction));
119     }
120
121     private abstract static class AbstractCompositeTransaction<K, T extends DOMStoreTransaction> implements
122             AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
123
124         private final ImmutableMap<K, T> backingTxs;
125         private final Object identifier;
126
127         protected AbstractCompositeTransaction(final Object identifier, final ImmutableMap<K, T> backingTxs) {
128             this.identifier = checkNotNull(identifier, "Identifier should not be null");
129             this.backingTxs = checkNotNull(backingTxs, "Backing transactions should not be null");
130         }
131
132         protected T getSubtransaction(final K key) {
133             return backingTxs.get(key);
134         }
135
136         public Iterable<T> getSubtransactions() {
137             return backingTxs.values();
138         }
139
140         @Override
141         public Object getIdentifier() {
142             return identifier;
143         }
144
145         @Override
146         public void close() {
147             try {
148                 for (T subtransaction : backingTxs.values()) {
149                     subtransaction.close();
150                 }
151             } catch (Exception e) {
152                 throw new IllegalStateException("Uncaught exception occured during closing transaction.", e);
153             }
154         }
155
156     }
157
158     private static class ReadOnlyTransactionImpl extends
159             AbstractCompositeTransaction<LogicalDatastoreType, DOMStoreReadTransaction> implements
160             DOMDataReadTransaction {
161
162         protected ReadOnlyTransactionImpl(final Object identifier,
163                 final ImmutableMap<LogicalDatastoreType, DOMStoreReadTransaction> backingTxs) {
164             super(identifier, backingTxs);
165         }
166
167         @Override
168         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
169                 final InstanceIdentifier path) {
170             return getSubtransaction(store).read(path);
171         }
172
173     }
174
175     private static class WriteTransactionImpl<T extends DOMStoreWriteTransaction> extends
176             AbstractCompositeTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
177
178         private final DOMDataBrokerImpl broker;
179         private ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts;
180
181         protected WriteTransactionImpl(final Object identifier, final ImmutableMap<LogicalDatastoreType, T> backingTxs,
182                 final DOMDataBrokerImpl broker) {
183             super(identifier, backingTxs);
184             this.broker = broker;
185         }
186
187         public Iterable<DOMStoreThreePhaseCommitCohort> ready() {
188             checkState(cohorts == null, "Transaction was already marked as ready.");
189             ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
190             for (DOMStoreWriteTransaction subTx : getSubtransactions()) {
191                 cohortsBuilder.add(subTx.ready());
192             }
193             cohorts = cohortsBuilder.build();
194             return cohorts;
195         }
196
197         protected ImmutableList<DOMStoreThreePhaseCommitCohort> getCohorts() {
198             return cohorts;
199         }
200
201         @Override
202         public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
203             getSubtransaction(store).write(path, data);
204         }
205
206         @Override
207         public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
208             getSubtransaction(store).delete(path);
209         }
210
211         @Override
212         public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
213                 final NormalizedNode<?, ?> data) {
214             // TODO Auto-generated method stub
215             throw new UnsupportedOperationException("Not implemented yet.");
216         }
217
218         @Override
219         public void cancel() {
220             // TODO Auto-generated method stub
221
222         }
223
224         @Override
225         public ListenableFuture<RpcResult<TransactionStatus>> commit() {
226
227             ready();
228             return broker.submit(this);
229         }
230
231     }
232
233     private static class ReadWriteTransactionImpl extends WriteTransactionImpl<DOMStoreReadWriteTransaction> implements
234             DOMDataReadWriteTransaction {
235
236         protected ReadWriteTransactionImpl(final Object identifier,
237                 final ImmutableMap<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
238                 final DOMDataBrokerImpl broker) {
239             // super(identifier, backingTxs);
240             super(identifier, backingTxs, broker);
241         }
242
243         @Override
244         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
245                 final InstanceIdentifier path) {
246             return getSubtransaction(store).read(path);
247         }
248     }
249
250     private final class CommitCoordination implements Callable<RpcResult<TransactionStatus>> {
251
252         private final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction;
253
254         public CommitCoordination(final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
255             this.transaction = transaction;
256         }
257
258         @Override
259         public RpcResult<TransactionStatus> call() throws Exception {
260
261             Boolean canCommit = canCommit().get();
262             try {
263                 if (canCommit) {
264                     try {
265                         preCommit().get();
266                         try {
267                             commit().get();
268                             return null;
269                         } catch (InterruptedException | ExecutionException e) {
270                             COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e);
271                         }
272
273                     } catch (InterruptedException | ExecutionException e) {
274                         COORDINATOR_LOG.warn("Tx: {} Error during preCommit, starting Abort",
275                                 transaction.getIdentifier(), e);
276                     }
277                 } else {
278                     abort().get();
279                 }
280             } catch (InterruptedException | ExecutionException e) {
281                 COORDINATOR_LOG.warn("Tx: {} Error during canCommit, starting Abort", transaction.getIdentifier(), e);
282
283             }
284             try {
285                 abort().get();
286             } catch (InterruptedException | ExecutionException e) {
287                 COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e);
288             }
289             return null;
290         }
291
292         public ListenableFuture<Void> preCommit() {
293             COORDINATOR_LOG.debug("Transaction {}: PreCommit Started ", transaction.getIdentifier());
294             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
295             for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
296                 ops.add(cohort.preCommit());
297             }
298             return (ListenableFuture) Futures.allAsList(ops.build());
299         }
300
301         public ListenableFuture<Void> commit() {
302             COORDINATOR_LOG.debug("Transaction {}: Commit Started ", transaction.getIdentifier());
303             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
304             for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
305                 ops.add(cohort.commit());
306             }
307             return (ListenableFuture) Futures.allAsList(ops.build());
308         }
309
310         public ListenableFuture<Boolean> canCommit() {
311             COORDINATOR_LOG.debug("Transaction {}: CanCommit Started ", transaction.getIdentifier());
312             Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
313             for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
314                 canCommitOperations.add(cohort.canCommit());
315             }
316             ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
317             return Futures.transform(allCanCommits, AND_FUNCTION);
318         }
319
320         public ListenableFuture<Void> abort() {
321             COORDINATOR_LOG.debug("Transaction {}: Abort Started ", transaction.getIdentifier());
322             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
323             for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
324                 ops.add(cohort.abort());
325             }
326             return (ListenableFuture) Futures.allAsList(ops.build());
327         };
328
329     }
330
331 }