2 * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.infra;
10 import com.google.common.annotations.Beta;
11 import com.google.common.util.concurrent.FluentFuture;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import edu.umd.cs.findbugs.annotations.CheckReturnValue;
15 import java.util.function.Function;
16 import javax.inject.Inject;
17 import org.opendaylight.infrautils.utils.function.InterruptibleCheckedConsumer;
18 import org.opendaylight.infrautils.utils.function.InterruptibleCheckedFunction;
19 import org.opendaylight.mdsal.binding.api.DataBroker;
20 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
21 import org.opendaylight.mdsal.binding.api.Transaction;
22 import org.opendaylight.mdsal.binding.api.TransactionChain;
23 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
24 import org.opendaylight.mdsal.binding.api.WriteTransaction;
25 import org.opendaylight.mdsal.common.api.CommitInfo;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
31 * Implementation of {@link ManagedNewTransactionRunner}. This is based on {@link ManagedTransactionFactoryImpl} but
32 * re-implements operations based on (read-)write transactions to cancel transactions which don't end up making any
33 * changes to the datastore.
36 // Do *NOT* mark this as @Singleton, because users choose Impl; and as long as this in API, because of https://wiki.opendaylight.org/view/BestPractices/DI_Guidelines#Nota_Bene
37 public class ManagedNewTransactionRunnerImpl extends ManagedTransactionFactoryImpl
38 implements ManagedNewTransactionRunner {
40 private static final Logger LOG = LoggerFactory.getLogger(ManagedNewTransactionRunnerImpl.class);
42 private final DataBroker broker;
45 public ManagedNewTransactionRunnerImpl(DataBroker broker) {
52 public <E extends Exception> ListenableFuture<Void>
53 callWithNewWriteOnlyTransactionAndSubmit(InterruptibleCheckedConsumer<WriteTransaction, E> txConsumer) {
54 return callWithNewTransactionAndSubmit(Datastore.class, broker::newWriteOnlyTransaction,
55 (datastoreType, realTx) -> new NonSubmitCancelableWriteTransaction(realTx),
56 txConsumer::accept, (realTx, wrappedTx) -> realTx.commit());
61 public <D extends Datastore, E extends Exception> FluentFuture<Void> callWithNewWriteOnlyTransactionAndSubmit(
62 Class<D> datastoreType, InterruptibleCheckedConsumer<TypedWriteTransaction<D>, E> txConsumer) {
63 return callWithNewTransactionAndSubmit(datastoreType, broker::newWriteOnlyTransaction,
64 WriteTrackingTypedWriteTransactionImpl::new, txConsumer::accept, this::commit);
69 public <E extends Exception> ListenableFuture<Void>
70 callWithNewReadWriteTransactionAndSubmit(InterruptibleCheckedConsumer<ReadWriteTransaction, E> txConsumer) {
71 return callWithNewTransactionAndSubmit(Datastore.class, broker::newReadWriteTransaction,
72 (datastoreType, realTx) -> new NonSubmitCancelableReadWriteTransaction(realTx), txConsumer::accept,
78 public <D extends Datastore, E extends Exception> FluentFuture<Void>
79 callWithNewReadWriteTransactionAndSubmit(Class<D> datastoreType,
80 InterruptibleCheckedConsumer<TypedReadWriteTransaction<D>, E> txConsumer) {
81 return callWithNewTransactionAndSubmit(datastoreType, broker::newReadWriteTransaction,
82 WriteTrackingTypedReadWriteTransactionImpl::new, txConsumer::accept, this::commit);
87 public <D extends Datastore, E extends Exception, R> FluentFuture<R> applyWithNewReadWriteTransactionAndSubmit(
88 Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadWriteTransaction<D>, R, E> txFunction) {
89 return super.applyWithNewTransactionAndSubmit(datastoreType, broker::newReadWriteTransaction,
90 WriteTrackingTypedReadWriteTransactionImpl::new, txFunction::apply, this::commit);
94 public <R> R applyWithNewTransactionChainAndClose(Function<ManagedTransactionChain, R> chainConsumer) {
95 try (TransactionChain realTxChain = broker.createTransactionChain(new TransactionChainListener() {
97 public void onTransactionChainFailed(TransactionChain chain, Transaction transaction,
99 LOG.error("Error handling a transaction chain", cause);
103 public void onTransactionChainSuccessful(TransactionChain chain) {
107 return chainConsumer.apply(new ManagedTransactionChainImpl(realTxChain));
112 private FluentFuture<? extends CommitInfo> commit(WriteTransaction realTx, WriteTrackingTransaction wrappedTx) {
113 if (wrappedTx.isWritten()) {
114 // The transaction contains changes, commit it
115 return realTx.commit();
117 // The transaction only handled reads, cancel it
119 return FluentFuture.from(Futures.immediateFuture(CommitInfo.empty()));