2 * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.binding.util;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Executor;
16 import java.util.function.Function;
17 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
18 import org.opendaylight.mdsal.common.api.ReadFailedException;
21 * Implementation of {@link ManagedNewTransactionRunner} with automatic transparent retries on transaction failure
22 * ({@link OptimisticLockFailedException} on write transactions and {@link ReadFailedException} on read transactions
23 * will cause the operation constructing the transaction to be re-run).
24 * This is a package local private internal class; end-users use the {@link RetryingManagedNewTransactionRunner}.
25 * @see RetryingManagedNewTransactionRunner
27 * @author Michael Vorburger.ch & Stephen Kitt, with input from Tom Pantelis re. catchingAsync & direct Executor
29 // intentionally package local
30 class RetryingManagedNewTransactionRunnerImpl implements ManagedNewTransactionRunner {
32 // NB: The RetryingManagedNewTransactionRunnerTest is in mdsalutil-testutils's src/test, not this project's
34 // duplicated in SingleTransactionDataBroker
35 private static final int DEFAULT_RETRIES = 3;
37 private final int maxRetries;
39 private final ManagedNewTransactionRunner delegate;
41 private final Executor executor;
43 RetryingManagedNewTransactionRunnerImpl(ManagedNewTransactionRunner delegate) {
44 this(delegate, MoreExecutors.directExecutor(), DEFAULT_RETRIES);
47 RetryingManagedNewTransactionRunnerImpl(ManagedNewTransactionRunner delegate, int maxRetries) {
48 this(delegate, MoreExecutors.directExecutor(), maxRetries);
51 RetryingManagedNewTransactionRunnerImpl(ManagedNewTransactionRunner delegate, Executor executor, int maxRetries) {
52 this.delegate = requireNonNull(delegate, "delegate must not be null");
53 this.executor = requireNonNull(executor, "executor must not be null");
54 this.maxRetries = maxRetries;
58 public <D extends Datastore, E extends Exception, R> R applyInterruptiblyWithNewReadOnlyTransactionAndClose(
59 Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadTransaction<D>, R, E> txFunction)
60 throws E, InterruptedException {
61 return applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, maxRetries);
64 @SuppressWarnings("checkstyle:IllegalCatch")
65 private <R, D extends Datastore, E extends Exception> R applyInterruptiblyWithNewReadOnlyTransactionAndClose(
66 Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadTransaction<D>, R, E> txFunction,
67 int tries) throws E, InterruptedException {
69 return delegate.applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction);
70 } catch (Exception e) {
71 if (isRetriableException(e) && tries - 1 > 0) {
72 return applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, tries - 1);
80 public <D extends Datastore, E extends Exception, R> R applyWithNewReadOnlyTransactionAndClose(
81 Class<D> datastoreType, CheckedFunction<TypedReadTransaction<D>, R, E> txFunction) throws E {
82 return applyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, maxRetries);
85 @SuppressWarnings("checkstyle:IllegalCatch")
86 private <R, D extends Datastore, E extends Exception> R applyWithNewReadOnlyTransactionAndClose(
87 Class<D> datastoreType, CheckedFunction<TypedReadTransaction<D>, R, E> txFunction, int tries) throws E {
89 return delegate.applyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction);
90 } catch (Exception e) {
91 if (isRetriableException(e) && tries - 1 > 0) {
92 return applyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, tries - 1);
100 public <D extends Datastore, E extends Exception, R> FluentFuture<R> applyWithNewReadWriteTransactionAndSubmit(
101 Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadWriteTransaction<D>, R, E> txFunction) {
102 return applyWithNewReadWriteTransactionAndSubmit(datastoreType, txFunction, maxRetries);
105 private <D extends Datastore, E extends Exception, R> FluentFuture<R> applyWithNewReadWriteTransactionAndSubmit(
106 Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadWriteTransaction<D>, R, E> txRunner,
108 FluentFuture<R> future = requireNonNull(
109 delegate.applyWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner),
110 "delegate.callWithNewReadWriteTransactionAndSubmit() == null");
111 return future.catchingAsync(Exception.class, exception -> {
112 if (isRetriableException(exception) && tries - 1 > 0) {
113 return applyWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner, tries - 1);
121 public <R> R applyWithNewTransactionChainAndClose(Function<ManagedTransactionChain, R> chainConsumer) {
122 throw new UnsupportedOperationException("The retrying transaction manager doesn't support transaction chains");
126 public <D extends Datastore, E extends Exception> void callInterruptiblyWithNewReadOnlyTransactionAndClose(
127 Class<D> datastoreType, InterruptibleCheckedConsumer<TypedReadTransaction<D>, E> txConsumer)
128 throws E, InterruptedException {
129 callInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, maxRetries);
132 @SuppressWarnings("checkstyle:IllegalCatch")
133 private <D extends Datastore, E extends Exception> void callInterruptiblyWithNewReadOnlyTransactionAndClose(
134 Class<D> datastoreType, InterruptibleCheckedConsumer<TypedReadTransaction<D>, E> txConsumer, int tries)
135 throws E, InterruptedException {
137 delegate.callInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer);
138 } catch (Exception e) {
139 if (isRetriableException(e) && tries - 1 > 0) {
140 callInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, tries - 1);
148 public <D extends Datastore, E extends Exception> void callWithNewReadOnlyTransactionAndClose(
149 Class<D> datastoreType, CheckedConsumer<TypedReadTransaction<D>, E> txConsumer) throws E {
150 callWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, maxRetries);
153 @SuppressWarnings("checkstyle:IllegalCatch")
154 private <D extends Datastore, E extends Exception> void callWithNewReadOnlyTransactionAndClose(
155 Class<D> datastoreType, CheckedConsumer<TypedReadTransaction<D>, E> txConsumer, int tries) throws E {
157 delegate.callWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer);
158 } catch (Exception e) {
159 if (isRetriableException(e) && tries - 1 > 0) {
160 callWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, tries - 1);
168 public <D extends Datastore, E extends Exception> FluentFuture<? extends Object>
169 callWithNewReadWriteTransactionAndSubmit(
170 Class<D> datastoreType, InterruptibleCheckedConsumer<TypedReadWriteTransaction<D>, E> txConsumer) {
171 return callWithNewReadWriteTransactionAndSubmit(datastoreType, txConsumer, maxRetries);
174 private <D extends Datastore, E extends Exception, T> FluentFuture<T>
175 callWithNewReadWriteTransactionAndSubmit(Class<D> datastoreType,
176 InterruptibleCheckedConsumer<TypedReadWriteTransaction<D>, E> txRunner, int tries) {
178 return (FluentFuture<T>) requireNonNull(
179 delegate.callWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner),
180 "delegate.callWithNewWriteOnlyTransactionAndSubmit() == null")
181 .catchingAsync(Exception.class, exception -> {
182 // as per AsyncWriteTransaction.submit()'s JavaDoc re. retries
183 if (isRetriableException(exception) && tries - 1 > 0) {
184 return callWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner, tries - 1);
186 // out of retries, so propagate the exception
193 public <D extends Datastore, E extends Exception> FluentFuture<? extends Object>
194 callWithNewWriteOnlyTransactionAndSubmit(Class<D> datastoreType,
195 InterruptibleCheckedConsumer<TypedWriteTransaction<D>, E> txConsumer) {
196 return callWithNewWriteOnlyTransactionAndSubmit(datastoreType, txConsumer, maxRetries);
199 private <D extends Datastore, E extends Exception, T> FluentFuture<T>
200 callWithNewWriteOnlyTransactionAndSubmit(Class<D> datastoreType,
201 InterruptibleCheckedConsumer<TypedWriteTransaction<D>, E> txRunner, int tries) {
203 return (FluentFuture<T>) requireNonNull(
204 delegate.callWithNewWriteOnlyTransactionAndSubmit(datastoreType, txRunner),
205 "delegate.callWithNewWriteOnlyTransactionAndSubmit() == null")
206 .catchingAsync(OptimisticLockFailedException.class, optimisticLockFailedException -> {
207 // as per AsyncWriteTransaction.submit()'s JavaDoc re. retries
209 return callWithNewWriteOnlyTransactionAndSubmit(datastoreType, txRunner, tries - 1);
211 // out of retries, so propagate the OptimisticLockFailedException
212 throw optimisticLockFailedException;
217 private boolean isRetriableException(Throwable throwable) {
218 return throwable instanceof OptimisticLockFailedException || throwable instanceof ReadFailedException || (
219 throwable instanceof ExecutionException && isRetriableException(throwable.getCause()));