2 * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.binding.util;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Executor;
16 import java.util.function.Function;
17 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
18 import org.opendaylight.mdsal.common.api.ReadFailedException;
21 * Implementation of {@link ManagedNewTransactionRunner} with automatic transparent retries on transaction failure
22 * ({@link OptimisticLockFailedException} on write transactions and {@link ReadFailedException} on read transactions
23 * will cause the operation constructing the transaction to be re-run).
24 * This is a package local private internal class; end-users use the {@link RetryingManagedNewTransactionRunner}.
25 * @see RetryingManagedNewTransactionRunner
27 * @author Michael Vorburger.ch & Stephen Kitt, with input from Tom Pantelis re. catchingAsync & direct Executor
29 // intentionally package local
30 class RetryingManagedNewTransactionRunnerImpl implements ManagedNewTransactionRunner {
32 // NB: The RetryingManagedNewTransactionRunnerTest is in mdsalutil-testutils's src/test, not this project's
34 // duplicated in SingleTransactionDataBroker
35 private static final int DEFAULT_RETRIES = 3;
37 private final int maxRetries;
39 private final ManagedNewTransactionRunner delegate;
41 private final Executor executor;
43 RetryingManagedNewTransactionRunnerImpl(final ManagedNewTransactionRunner delegate) {
44 this(delegate, MoreExecutors.directExecutor(), DEFAULT_RETRIES);
47 RetryingManagedNewTransactionRunnerImpl(final ManagedNewTransactionRunner delegate, final int maxRetries) {
48 this(delegate, MoreExecutors.directExecutor(), maxRetries);
51 RetryingManagedNewTransactionRunnerImpl(final ManagedNewTransactionRunner delegate, final Executor executor,
52 final int maxRetries) {
53 this.delegate = requireNonNull(delegate, "delegate must not be null");
54 this.executor = requireNonNull(executor, "executor must not be null");
55 this.maxRetries = maxRetries;
59 public <D extends Datastore, E extends Exception, R> R applyInterruptiblyWithNewReadOnlyTransactionAndClose(
60 final D datastore, final InterruptibleCheckedFunction<TypedReadTransaction<D>, R, E> txFunction)
61 throws E, InterruptedException {
62 return applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txFunction, maxRetries);
65 @SuppressWarnings("checkstyle:IllegalCatch")
66 private <R, D extends Datastore, E extends Exception> R applyInterruptiblyWithNewReadOnlyTransactionAndClose(
67 final D datastore, final InterruptibleCheckedFunction<TypedReadTransaction<D>, R, E> txFunction,
68 final int tries) throws E, InterruptedException {
70 return delegate.applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txFunction);
71 } catch (Exception e) {
72 if (isRetriableException(e) && tries - 1 > 0) {
73 return applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txFunction, tries - 1);
81 public <D extends Datastore, E extends Exception, R> R applyWithNewReadOnlyTransactionAndClose(final D datastore,
82 final CheckedFunction<TypedReadTransaction<D>, R, E> txFunction) throws E {
83 return applyWithNewReadOnlyTransactionAndClose(datastore, txFunction, maxRetries);
86 @SuppressWarnings("checkstyle:IllegalCatch")
87 private <R, D extends Datastore, E extends Exception> R applyWithNewReadOnlyTransactionAndClose(final D datastore,
88 final CheckedFunction<TypedReadTransaction<D>, R, E> txFunction, final int tries) throws E {
90 return delegate.applyWithNewReadOnlyTransactionAndClose(datastore, txFunction);
91 } catch (Exception e) {
92 if (isRetriableException(e) && tries - 1 > 0) {
93 return applyWithNewReadOnlyTransactionAndClose(datastore, txFunction, tries - 1);
101 public <D extends Datastore, E extends Exception, R> FluentFuture<R> applyWithNewReadWriteTransactionAndSubmit(
102 final D datastore, final InterruptibleCheckedFunction<TypedReadWriteTransaction<D>, R, E> txFunction) {
103 return applyWithNewReadWriteTransactionAndSubmit(datastore, txFunction, maxRetries);
106 private <D extends Datastore, E extends Exception, R> FluentFuture<R> applyWithNewReadWriteTransactionAndSubmit(
107 final D datastore, final InterruptibleCheckedFunction<TypedReadWriteTransaction<D>, R, E> txRunner,
109 FluentFuture<R> future = requireNonNull(
110 delegate.applyWithNewReadWriteTransactionAndSubmit(datastore, txRunner),
111 "delegate.callWithNewReadWriteTransactionAndSubmit() == null");
112 return future.catchingAsync(Exception.class, exception -> {
113 if (isRetriableException(exception) && tries - 1 > 0) {
114 return applyWithNewReadWriteTransactionAndSubmit(datastore, txRunner, tries - 1);
122 public <R> R applyWithNewTransactionChainAndClose(final Function<ManagedTransactionChain, R> chainConsumer) {
123 throw new UnsupportedOperationException("The retrying transaction manager doesn't support transaction chains");
127 public <D extends Datastore, E extends Exception> void callInterruptiblyWithNewReadOnlyTransactionAndClose(
128 final D datastore, final InterruptibleCheckedConsumer<TypedReadTransaction<D>, E> txConsumer)
129 throws E, InterruptedException {
130 callInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txConsumer, maxRetries);
133 @SuppressWarnings("checkstyle:IllegalCatch")
134 private <D extends Datastore, E extends Exception> void callInterruptiblyWithNewReadOnlyTransactionAndClose(
135 final D datastore, final InterruptibleCheckedConsumer<TypedReadTransaction<D>, E> txConsumer,
136 final int tries) throws E, InterruptedException {
138 delegate.callInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txConsumer);
139 } catch (Exception e) {
140 if (isRetriableException(e) && tries - 1 > 0) {
141 callInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txConsumer, tries - 1);
149 public <D extends Datastore, E extends Exception> void callWithNewReadOnlyTransactionAndClose(final D datastore,
150 final CheckedConsumer<TypedReadTransaction<D>, E> txConsumer) throws E {
151 callWithNewReadOnlyTransactionAndClose(datastore, txConsumer, maxRetries);
154 @SuppressWarnings("checkstyle:IllegalCatch")
155 private <D extends Datastore, E extends Exception> void callWithNewReadOnlyTransactionAndClose(final D datastore,
156 final CheckedConsumer<TypedReadTransaction<D>, E> txConsumer, final int tries) throws E {
158 delegate.callWithNewReadOnlyTransactionAndClose(datastore, txConsumer);
159 } catch (Exception e) {
160 if (isRetriableException(e) && tries - 1 > 0) {
161 callWithNewReadOnlyTransactionAndClose(datastore, txConsumer, tries - 1);
169 public <D extends Datastore, E extends Exception>
170 FluentFuture<? extends Object> callWithNewReadWriteTransactionAndSubmit(final D datastore,
171 final InterruptibleCheckedConsumer<TypedReadWriteTransaction<D>, E> txConsumer) {
172 return callWithNewReadWriteTransactionAndSubmit(datastore, txConsumer, maxRetries);
175 private <D extends Datastore, E extends Exception, T> FluentFuture<T> callWithNewReadWriteTransactionAndSubmit(
176 final D datastore, final InterruptibleCheckedConsumer<TypedReadWriteTransaction<D>, E> txRunner,
178 return (FluentFuture<T>) requireNonNull(
179 delegate.callWithNewReadWriteTransactionAndSubmit(datastore, txRunner),
180 "delegate.callWithNewWriteOnlyTransactionAndSubmit() == null")
181 .catchingAsync(Exception.class, exception -> {
182 // as per AsyncWriteTransaction.submit()'s JavaDoc re. retries
183 if (isRetriableException(exception) && tries - 1 > 0) {
184 return callWithNewReadWriteTransactionAndSubmit(datastore, txRunner, tries - 1);
186 // out of retries, so propagate the exception
193 public <D extends Datastore, E extends Exception>
194 FluentFuture<? extends Object> callWithNewWriteOnlyTransactionAndSubmit(final D datastore,
195 final InterruptibleCheckedConsumer<TypedWriteTransaction<D>, E> txConsumer) {
196 return callWithNewWriteOnlyTransactionAndSubmit(datastore, txConsumer, maxRetries);
199 private <D extends Datastore, E extends Exception, T> FluentFuture<T> callWithNewWriteOnlyTransactionAndSubmit(
200 final D datastore, final InterruptibleCheckedConsumer<TypedWriteTransaction<D>, E> txRunner,
202 return (FluentFuture<T>) requireNonNull(
203 delegate.callWithNewWriteOnlyTransactionAndSubmit(datastore, txRunner),
204 "delegate.callWithNewWriteOnlyTransactionAndSubmit() == null")
205 .catchingAsync(OptimisticLockFailedException.class, optimisticLockFailedException -> {
206 // as per AsyncWriteTransaction.submit()'s JavaDoc re. retries
208 return callWithNewWriteOnlyTransactionAndSubmit(datastore, txRunner, tries - 1);
210 // out of retries, so propagate the OptimisticLockFailedException
211 throw optimisticLockFailedException;
216 private boolean isRetriableException(final Throwable throwable) {
217 return throwable instanceof OptimisticLockFailedException || throwable instanceof ReadFailedException
218 || throwable instanceof ExecutionException && isRetriableException(throwable.getCause());