2 * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.binding.util;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Executor;
16 import java.util.function.Function;
17 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
18 import org.opendaylight.mdsal.common.api.ReadFailedException;
21 * Implementation of {@link ManagedNewTransactionRunner} with automatic transparent retries on transaction failure
22 * ({@link OptimisticLockFailedException} on write transactions and {@link ReadFailedException} on read transactions
23 * will cause the operation constructing the transaction to be re-run).
24 * This is a package local private internal class; end-users use the {@link RetryingManagedNewTransactionRunner}.
25 * @see RetryingManagedNewTransactionRunner
27 * @author Michael Vorburger.ch & Stephen Kitt, with input from Tom Pantelis re. catchingAsync & direct Executor
29 // intentionally package local
30 class RetryingManagedNewTransactionRunnerImpl implements ManagedNewTransactionRunner {
32 // NB: The RetryingManagedNewTransactionRunnerTest is in mdsalutil-testutils's src/test, not this project's
34 private static final int DEFAULT_RETRIES = 3; // duplicated in SingleTransactionDataBroker
36 private final int maxRetries;
38 private final ManagedNewTransactionRunner delegate;
40 private final Executor executor;
42 RetryingManagedNewTransactionRunnerImpl(ManagedNewTransactionRunner delegate) {
43 this(delegate, MoreExecutors.directExecutor(), DEFAULT_RETRIES);
46 RetryingManagedNewTransactionRunnerImpl(ManagedNewTransactionRunner delegate, int maxRetries) {
47 this(delegate, MoreExecutors.directExecutor(), maxRetries);
50 RetryingManagedNewTransactionRunnerImpl(ManagedNewTransactionRunner delegate, Executor executor, int maxRetries) {
51 this.delegate = requireNonNull(delegate, "delegate must not be null");
52 this.executor = requireNonNull(executor, "executor must not be null");
53 this.maxRetries = maxRetries;
57 public <D extends Datastore, E extends Exception, R> R applyInterruptiblyWithNewReadOnlyTransactionAndClose(
58 Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadTransaction<D>, R, E> txFunction)
59 throws E, InterruptedException {
60 return applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, maxRetries);
63 @SuppressWarnings("checkstyle:IllegalCatch")
64 private <R, D extends Datastore, E extends Exception> R applyInterruptiblyWithNewReadOnlyTransactionAndClose(
65 Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadTransaction<D>, R, E> txFunction,
66 int tries) throws E, InterruptedException {
68 return delegate.applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction);
69 } catch (Exception e) {
70 if (isRetriableException(e) && tries - 1 > 0) {
71 return applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, tries - 1);
79 public <D extends Datastore, E extends Exception, R> R applyWithNewReadOnlyTransactionAndClose(
80 Class<D> datastoreType, CheckedFunction<TypedReadTransaction<D>, R, E> txFunction) throws E {
81 return applyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, maxRetries);
84 @SuppressWarnings("checkstyle:IllegalCatch")
85 private <R, D extends Datastore, E extends Exception> R applyWithNewReadOnlyTransactionAndClose(
86 Class<D> datastoreType, CheckedFunction<TypedReadTransaction<D>, R, E> txFunction, int tries) throws E {
88 return delegate.applyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction);
89 } catch (Exception e) {
90 if (isRetriableException(e) && tries - 1 > 0) {
91 return applyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, tries - 1);
99 public <D extends Datastore, E extends Exception, R> FluentFuture<R> applyWithNewReadWriteTransactionAndSubmit(
100 Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadWriteTransaction<D>, R, E> txFunction) {
101 return applyWithNewReadWriteTransactionAndSubmit(datastoreType, txFunction, maxRetries);
104 private <D extends Datastore, E extends Exception, R> FluentFuture<R> applyWithNewReadWriteTransactionAndSubmit(
105 Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadWriteTransaction<D>, R, E> txRunner,
107 FluentFuture<R> future = requireNonNull(
108 delegate.applyWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner),
109 "delegate.callWithNewReadWriteTransactionAndSubmit() == null");
110 return future.catchingAsync(Exception.class, exception -> {
111 if (isRetriableException(exception) && tries - 1 > 0) {
112 return applyWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner, tries - 1);
120 public <R> R applyWithNewTransactionChainAndClose(Function<ManagedTransactionChain, R> chainConsumer) {
121 throw new UnsupportedOperationException("The retrying transaction manager doesn't support transaction chains");
125 public <D extends Datastore, E extends Exception> void callInterruptiblyWithNewReadOnlyTransactionAndClose(
126 Class<D> datastoreType, InterruptibleCheckedConsumer<TypedReadTransaction<D>, E> txConsumer)
127 throws E, InterruptedException {
128 callInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, maxRetries);
131 @SuppressWarnings("checkstyle:IllegalCatch")
132 private <D extends Datastore, E extends Exception> void callInterruptiblyWithNewReadOnlyTransactionAndClose(
133 Class<D> datastoreType, InterruptibleCheckedConsumer<TypedReadTransaction<D>, E> txConsumer, int tries)
134 throws E, InterruptedException {
136 delegate.callInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer);
137 } catch (Exception e) {
138 if (isRetriableException(e) && tries - 1 > 0) {
139 callInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, tries - 1);
147 public <D extends Datastore, E extends Exception> void callWithNewReadOnlyTransactionAndClose(
148 Class<D> datastoreType, CheckedConsumer<TypedReadTransaction<D>, E> txConsumer) throws E {
149 callWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, maxRetries);
152 @SuppressWarnings("checkstyle:IllegalCatch")
153 private <D extends Datastore, E extends Exception> void callWithNewReadOnlyTransactionAndClose(
154 Class<D> datastoreType, CheckedConsumer<TypedReadTransaction<D>, E> txConsumer, int tries) throws E {
156 delegate.callWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer);
157 } catch (Exception e) {
158 if (isRetriableException(e) && tries - 1 > 0) {
159 callWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, tries - 1);
167 public <D extends Datastore, E extends Exception> FluentFuture<? extends Object>
168 callWithNewReadWriteTransactionAndSubmit(
169 Class<D> datastoreType, InterruptibleCheckedConsumer<TypedReadWriteTransaction<D>, E> txConsumer) {
170 return callWithNewReadWriteTransactionAndSubmit(datastoreType, txConsumer, maxRetries);
173 private <D extends Datastore, E extends Exception, T> FluentFuture<T>
174 callWithNewReadWriteTransactionAndSubmit(Class<D> datastoreType,
175 InterruptibleCheckedConsumer<TypedReadWriteTransaction<D>, E> txRunner, int tries) {
177 return (FluentFuture<T>) requireNonNull(
178 delegate.callWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner),
179 "delegate.callWithNewWriteOnlyTransactionAndSubmit() == null")
180 .catchingAsync(Exception.class, exception -> {
181 // as per AsyncWriteTransaction.submit()'s JavaDoc re. retries
182 if (isRetriableException(exception) && tries - 1 > 0) {
183 return callWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner, tries - 1);
185 // out of retries, so propagate the exception
192 public <D extends Datastore, E extends Exception> FluentFuture<? extends Object>
193 callWithNewWriteOnlyTransactionAndSubmit(Class<D> datastoreType,
194 InterruptibleCheckedConsumer<TypedWriteTransaction<D>, E> txConsumer) {
195 return callWithNewWriteOnlyTransactionAndSubmit(datastoreType, txConsumer, maxRetries);
198 private <D extends Datastore, E extends Exception, T> FluentFuture<T>
199 callWithNewWriteOnlyTransactionAndSubmit(Class<D> datastoreType,
200 InterruptibleCheckedConsumer<TypedWriteTransaction<D>, E> txRunner, int tries) {
202 return (FluentFuture<T>) requireNonNull(
203 delegate.callWithNewWriteOnlyTransactionAndSubmit(datastoreType, txRunner),
204 "delegate.callWithNewWriteOnlyTransactionAndSubmit() == null")
205 .catchingAsync(OptimisticLockFailedException.class, optimisticLockFailedException -> {
206 // as per AsyncWriteTransaction.submit()'s JavaDoc re. retries
208 return callWithNewWriteOnlyTransactionAndSubmit(datastoreType, txRunner, tries - 1);
210 // out of retries, so propagate the OptimisticLockFailedException
211 throw optimisticLockFailedException;
216 private boolean isRetriableException(Throwable throwable) {
217 return throwable instanceof OptimisticLockFailedException || throwable instanceof ReadFailedException || (
218 throwable instanceof ExecutionException && isRetriableException(throwable.getCause()));