2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
13 import java.util.Collections;
14 import java.util.List;
15 import java.util.Map.Entry;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.atomic.AtomicLong;
20 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
24 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
25 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
28 import org.opendaylight.controller.sal.common.util.Rpcs;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
35 import org.opendaylight.yangtools.concepts.ListenerRegistration;
36 import org.opendaylight.yangtools.yang.common.RpcError;
37 import org.opendaylight.yangtools.yang.common.RpcResult;
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 import com.google.common.base.Function;
44 import com.google.common.base.Optional;
45 import com.google.common.collect.ImmutableList;
46 import com.google.common.collect.ImmutableList.Builder;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.util.concurrent.Futures;
49 import com.google.common.util.concurrent.ListenableFuture;
50 import com.google.common.util.concurrent.ListeningExecutorService;
52 public class DOMDataBrokerImpl implements DOMDataBroker, AutoCloseable {
54 private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class);
55 private static final Logger COORDINATOR_LOG = LoggerFactory.getLogger(CommitCoordination.class);
56 private final ImmutableMap<LogicalDatastoreType, DOMStore> datastores;
57 private final ListeningExecutorService executor;
58 private final AtomicLong txNum = new AtomicLong();
60 public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
61 final ListeningExecutorService executor) {
63 this.datastores = datastores;
64 this.executor = executor;
67 private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
70 public Boolean apply(final Iterable<Boolean> input) {
72 for (Boolean value : input) {
82 public DOMDataReadTransaction newReadOnlyTransaction() {
83 ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadTransaction> builder = ImmutableMap.builder();
84 for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
85 builder.put(store.getKey(), store.getValue().newReadOnlyTransaction());
87 return new ReadOnlyTransactionImpl(newTransactionIdentifier(), builder.build());
90 private Object newTransactionIdentifier() {
91 return "DOM-" + txNum.getAndIncrement();
95 public DOMDataReadWriteTransaction newReadWriteTransaction() {
96 ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadWriteTransaction> builder = ImmutableMap.builder();
97 for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
98 builder.put(store.getKey(), store.getValue().newReadWriteTransaction());
100 return new ReadWriteTransactionImpl(newTransactionIdentifier(), builder.build(), this);
104 public DOMDataWriteTransaction newWriteOnlyTransaction() {
105 ImmutableMap.Builder<LogicalDatastoreType, DOMStoreWriteTransaction> builder = ImmutableMap.builder();
106 for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
107 builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction());
109 return new WriteTransactionImpl<DOMStoreWriteTransaction>(newTransactionIdentifier(), builder.build(), this);
113 public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
114 final InstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
116 DOMStore potentialStore = datastores.get(store);
117 checkState(potentialStore != null, "Requested logical data store is not available.");
118 return potentialStore.registerChangeListener(path, listener, triggeringScope);
121 private ListenableFuture<RpcResult<TransactionStatus>> submit(
122 final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
123 LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
124 return executor.submit(new CommitCoordination(transaction));
127 private abstract static class AbstractCompositeTransaction<K, T extends DOMStoreTransaction> implements
128 AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
130 private final ImmutableMap<K, T> backingTxs;
131 private final Object identifier;
133 protected AbstractCompositeTransaction(final Object identifier, final ImmutableMap<K, T> backingTxs) {
134 this.identifier = checkNotNull(identifier, "Identifier should not be null");
135 this.backingTxs = checkNotNull(backingTxs, "Backing transactions should not be null");
138 protected T getSubtransaction(final K key) {
139 return backingTxs.get(key);
142 public Iterable<T> getSubtransactions() {
143 return backingTxs.values();
147 public Object getIdentifier() {
152 public void close() {
154 for (T subtransaction : backingTxs.values()) {
155 subtransaction.close();
157 } catch (Exception e) {
158 throw new IllegalStateException("Uncaught exception occured during closing transaction.", e);
164 private static class ReadOnlyTransactionImpl extends
165 AbstractCompositeTransaction<LogicalDatastoreType, DOMStoreReadTransaction> implements
166 DOMDataReadTransaction {
168 protected ReadOnlyTransactionImpl(final Object identifier,
169 final ImmutableMap<LogicalDatastoreType, DOMStoreReadTransaction> backingTxs) {
170 super(identifier, backingTxs);
174 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
175 final InstanceIdentifier path) {
176 return getSubtransaction(store).read(path);
181 private static class WriteTransactionImpl<T extends DOMStoreWriteTransaction> extends
182 AbstractCompositeTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
184 private final DOMDataBrokerImpl broker;
185 private ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts;
187 protected WriteTransactionImpl(final Object identifier, final ImmutableMap<LogicalDatastoreType, T> backingTxs,
188 final DOMDataBrokerImpl broker) {
189 super(identifier, backingTxs);
190 this.broker = broker;
193 public Iterable<DOMStoreThreePhaseCommitCohort> ready() {
194 checkState(cohorts == null, "Transaction was already marked as ready.");
195 ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
196 for (DOMStoreWriteTransaction subTx : getSubtransactions()) {
197 cohortsBuilder.add(subTx.ready());
199 cohorts = cohortsBuilder.build();
203 protected ImmutableList<DOMStoreThreePhaseCommitCohort> getCohorts() {
208 public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
209 getSubtransaction(store).write(path, data);
213 public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
214 getSubtransaction(store).delete(path);
218 public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
219 final NormalizedNode<?, ?> data) {
220 // TODO Auto-generated method stub
221 throw new UnsupportedOperationException("Not implemented yet.");
225 public void cancel() {
226 // TODO Auto-generated method stub
231 public ListenableFuture<RpcResult<TransactionStatus>> commit() {
234 return broker.submit(this);
239 private static class ReadWriteTransactionImpl extends WriteTransactionImpl<DOMStoreReadWriteTransaction> implements
240 DOMDataReadWriteTransaction {
242 protected ReadWriteTransactionImpl(final Object identifier,
243 final ImmutableMap<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
244 final DOMDataBrokerImpl broker) {
245 // super(identifier, backingTxs);
246 super(identifier, backingTxs, broker);
250 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
251 final InstanceIdentifier path) {
252 return getSubtransaction(store).read(path);
256 public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
257 final NormalizedNode<?, ?> data) {
262 private final class CommitCoordination implements Callable<RpcResult<TransactionStatus>> {
264 private final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction;
266 public CommitCoordination(final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
267 this.transaction = transaction;
271 public RpcResult<TransactionStatus> call() throws Exception {
274 Boolean canCommit = canCommit().get();
281 COORDINATOR_LOG.debug("Tx: {} Is commited.", transaction.getIdentifier());
282 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED,
283 Collections.<RpcError> emptySet());
285 } catch (InterruptedException | ExecutionException e) {
286 COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e);
289 } catch (InterruptedException | ExecutionException e) {
290 COORDINATOR_LOG.warn("Tx: {} Error during preCommit, starting Abort",
291 transaction.getIdentifier(), e);
294 COORDINATOR_LOG.info("Tx: {} Did not pass canCommit phase.", transaction.getIdentifier());
297 } catch (InterruptedException | ExecutionException e) {
298 COORDINATOR_LOG.warn("Tx: {} Error during canCommit, starting Abort", transaction.getIdentifier(), e);
303 } catch (InterruptedException | ExecutionException e) {
304 COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e);
306 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError> emptySet());
309 public ListenableFuture<Void> preCommit() {
310 COORDINATOR_LOG.debug("Transaction {}: PreCommit Started ", transaction.getIdentifier());
311 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
312 for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
313 ops.add(cohort.preCommit());
315 return (ListenableFuture) Futures.allAsList(ops.build());
318 public ListenableFuture<Void> commit() {
319 COORDINATOR_LOG.debug("Transaction {}: Commit Started ", transaction.getIdentifier());
320 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
321 for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
322 ops.add(cohort.commit());
324 return (ListenableFuture) Futures.allAsList(ops.build());
327 public ListenableFuture<Boolean> canCommit() {
328 COORDINATOR_LOG.debug("Transaction {}: CanCommit Started ", transaction.getIdentifier());
329 Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
330 for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
331 canCommitOperations.add(cohort.canCommit());
333 ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
334 return Futures.transform(allCanCommits, AND_FUNCTION);
337 public ListenableFuture<Void> abort() {
338 COORDINATOR_LOG.debug("Transaction {}: Abort Started ", transaction.getIdentifier());
339 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
340 for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
341 ops.add(cohort.abort());
343 return (ListenableFuture) Futures.allAsList(ops.build());
349 public void close() throws Exception {