2 * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.infra;
10 import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
12 import com.google.common.annotations.Beta;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.function.Function;
18 import javax.inject.Inject;
19 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
20 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
21 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
26 import org.opendaylight.infrautils.utils.function.InterruptibleCheckedConsumer;
27 import org.opendaylight.infrautils.utils.function.InterruptibleCheckedFunction;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
32 * Implementation of {@link ManagedNewTransactionRunner}. This is based on {@link ManagedTransactionFactoryImpl} but
33 * re-implements operations based on read-write transactions to cancel transactions which don't end up making any
34 * changes to the datastore.
37 // Do *NOT* mark this as @Singleton, because users choose Impl; and as long as this in API, because of https://wiki.opendaylight.org/view/BestPractices/DI_Guidelines#Nota_Bene
38 public class ManagedNewTransactionRunnerImpl extends ManagedTransactionFactoryImpl
39 implements ManagedNewTransactionRunner {
41 private static final Logger LOG = LoggerFactory.getLogger(ManagedNewTransactionRunnerImpl.class);
43 private final DataBroker broker;
46 public ManagedNewTransactionRunnerImpl(DataBroker broker) {
52 @SuppressWarnings("checkstyle:IllegalCatch")
53 public <E extends Exception> ListenableFuture<Void>
54 callWithNewWriteOnlyTransactionAndSubmit(InterruptibleCheckedConsumer<WriteTransaction, E> txConsumer) {
55 WriteTransaction realTx = broker.newWriteOnlyTransaction();
56 WriteTransaction wrappedTx = new NonSubmitCancelableWriteTransaction(realTx);
58 txConsumer.accept(wrappedTx);
59 return realTx.submit();
60 // catch Exception for both the <E extends Exception> thrown by accept() as well as any RuntimeException
61 } catch (Exception e) {
62 if (!realTx.cancel()) {
63 LOG.error("Transaction.cancel() return false - this should never happen (here)");
65 return immediateFailedFuture(e);
70 @SuppressWarnings("checkstyle:IllegalCatch")
71 public <E extends Exception> ListenableFuture<Void>
72 callWithNewReadWriteTransactionAndSubmit(InterruptibleCheckedConsumer<ReadWriteTransaction, E> txConsumer) {
73 ReadWriteTransaction realTx = broker.newReadWriteTransaction();
74 WriteTrackingReadWriteTransaction wrappedTx = new NonSubmitCancelableReadWriteTransaction(realTx);
76 txConsumer.accept(wrappedTx);
77 if (wrappedTx.isWritten()) {
78 // The transaction contains changes, commit it
79 return realTx.submit();
81 // The transaction only handled reads, cancel it
83 return Futures.immediateCheckedFuture(null);
85 // catch Exception for both the <E extends Exception> thrown by accept() as well as any RuntimeException
86 } catch (Exception e) {
87 if (!realTx.cancel()) {
88 LOG.error("Transaction.cancel() returned false, which should never happen here");
90 return immediateFailedFuture(e);
95 @SuppressWarnings("checkstyle:IllegalCatch")
96 public <D extends Datastore, E extends Exception> FluentFuture<Void>
97 callWithNewReadWriteTransactionAndSubmit(Class<D> datastoreType,
98 InterruptibleCheckedConsumer<TypedReadWriteTransaction<D>, E> txConsumer) {
99 ReadWriteTransaction realTx = broker.newReadWriteTransaction();
100 WriteTrackingTypedReadWriteTransactionImpl<D> wrappedTx =
101 new WriteTrackingTypedReadWriteTransactionImpl<>(datastoreType, realTx);
103 txConsumer.accept(wrappedTx);
104 if (wrappedTx.isWritten()) {
105 // The transaction contains changes, commit it
106 return realTx.commit().transform(v -> null, MoreExecutors.directExecutor());
108 // The transaction only handled reads, cancel it
110 return FluentFuture.from(Futures.immediateFuture(null));
112 // catch Exception for both the <E extends Exception> thrown by accept() as well as any RuntimeException
113 } catch (Exception e) {
114 if (!realTx.cancel()) {
115 LOG.error("Transaction.cancel() returned false, which should never happen here");
117 return FluentFuture.from(immediateFailedFuture(e));
122 @SuppressWarnings("checkstyle:IllegalCatch")
123 public <D extends Datastore, E extends Exception, R> FluentFuture<R> applyWithNewReadWriteTransactionAndSubmit(
124 Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadWriteTransaction<D>, R, E> txFunction) {
125 ReadWriteTransaction realTx = broker.newReadWriteTransaction();
126 WriteTrackingTypedReadWriteTransactionImpl<D> wrappedTx =
127 new WriteTrackingTypedReadWriteTransactionImpl<>(datastoreType, realTx);
129 R result = txFunction.apply(wrappedTx);
130 if (wrappedTx.isWritten()) {
131 // The transaction contains changes, commit it
132 return realTx.commit().transform(v -> result, MoreExecutors.directExecutor());
134 // The transaction only handled reads, cancel it
136 return FluentFuture.from(Futures.immediateFuture(result));
138 // catch Exception for both the <E extends Exception> thrown by accept() as well as any RuntimeException
139 } catch (Exception e) {
140 if (!realTx.cancel()) {
141 LOG.error("Transaction.cancel() returned false, which should never happen here");
143 return FluentFuture.from(immediateFailedFuture(e));
148 public <R> R applyWithNewTransactionChainAndClose(Function<ManagedTransactionChain, R> chainConsumer) {
149 try (BindingTransactionChain realTxChain = broker.createTransactionChain(new TransactionChainListener() {
151 public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction,
153 LOG.error("Error handling a transaction chain", cause);
157 public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
161 return chainConsumer.apply(new ManagedTransactionChainImpl(realTxChain));