412bcbea68d574a820663b0dc2019169fadd9429
[mdsal.git] / binding / mdsal-binding-util / src / main / java / org / opendaylight / mdsal / binding / util / RetryingManagedNewTransactionRunnerImpl.java
1 /*
2  * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.binding.util;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Executor;
16 import java.util.function.Function;
17 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
18 import org.opendaylight.mdsal.common.api.ReadFailedException;
19
20 /**
21  * Implementation of {@link ManagedNewTransactionRunner} with automatic transparent retries on transaction failure
22  * ({@link OptimisticLockFailedException} on write transactions and {@link ReadFailedException} on read transactions
23  * will cause the operation constructing the transaction to be re-run).
24  * This is a package local private internal class; end-users use the {@link RetryingManagedNewTransactionRunner}.
25  * @see RetryingManagedNewTransactionRunner
26  *
27  * @author Michael Vorburger.ch & Stephen Kitt, with input from Tom Pantelis re. catchingAsync & direct Executor
28  */
29 // intentionally package local
30 class RetryingManagedNewTransactionRunnerImpl implements ManagedNewTransactionRunner {
31
32     // NB: The RetryingManagedNewTransactionRunnerTest is in mdsalutil-testutils's src/test, not this project's
33
34     private static final int DEFAULT_RETRIES = 3; // duplicated in SingleTransactionDataBroker
35
36     private final int maxRetries;
37
38     private final ManagedNewTransactionRunner delegate;
39
40     private final Executor executor;
41
42     RetryingManagedNewTransactionRunnerImpl(ManagedNewTransactionRunner delegate) {
43         this(delegate, MoreExecutors.directExecutor(), DEFAULT_RETRIES);
44     }
45
46     RetryingManagedNewTransactionRunnerImpl(ManagedNewTransactionRunner delegate, int maxRetries) {
47         this(delegate, MoreExecutors.directExecutor(), maxRetries);
48     }
49
50     RetryingManagedNewTransactionRunnerImpl(ManagedNewTransactionRunner delegate, Executor executor, int maxRetries) {
51         this.delegate = requireNonNull(delegate, "delegate must not be null");
52         this.executor = requireNonNull(executor, "executor must not be null");
53         this.maxRetries = maxRetries;
54     }
55
56     @Override
57     public <D extends Datastore, E extends Exception, R> R applyInterruptiblyWithNewReadOnlyTransactionAndClose(
58             Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadTransaction<D>, R, E> txFunction)
59             throws E, InterruptedException {
60         return applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, maxRetries);
61     }
62
63     @SuppressWarnings("checkstyle:IllegalCatch")
64     private <R, D extends Datastore, E extends Exception> R applyInterruptiblyWithNewReadOnlyTransactionAndClose(
65             Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadTransaction<D>, R, E> txFunction,
66             int tries) throws E, InterruptedException {
67         try {
68             return delegate.applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction);
69         } catch (Exception e) {
70             if (isRetriableException(e) && tries - 1 > 0) {
71                 return applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, tries - 1);
72             } else {
73                 throw e;
74             }
75         }
76     }
77
78     @Override
79     public <D extends Datastore, E extends Exception, R> R applyWithNewReadOnlyTransactionAndClose(
80             Class<D> datastoreType, CheckedFunction<TypedReadTransaction<D>, R, E> txFunction) throws E {
81         return applyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, maxRetries);
82     }
83
84     @SuppressWarnings("checkstyle:IllegalCatch")
85     private <R, D extends Datastore, E extends Exception> R applyWithNewReadOnlyTransactionAndClose(
86         Class<D> datastoreType, CheckedFunction<TypedReadTransaction<D>, R, E> txFunction, int tries) throws E {
87         try {
88             return delegate.applyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction);
89         } catch (Exception e) {
90             if (isRetriableException(e) && tries - 1 > 0) {
91                 return applyWithNewReadOnlyTransactionAndClose(datastoreType, txFunction, tries - 1);
92             } else {
93                 throw e;
94             }
95         }
96     }
97
98     @Override
99     public <D extends Datastore, E extends Exception, R> FluentFuture<R> applyWithNewReadWriteTransactionAndSubmit(
100             Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadWriteTransaction<D>, R, E> txFunction) {
101         return applyWithNewReadWriteTransactionAndSubmit(datastoreType, txFunction, maxRetries);
102     }
103
104     private <D extends Datastore, E extends Exception, R> FluentFuture<R> applyWithNewReadWriteTransactionAndSubmit(
105             Class<D> datastoreType, InterruptibleCheckedFunction<TypedReadWriteTransaction<D>, R, E> txRunner,
106             int tries) {
107         FluentFuture<R> future = requireNonNull(
108             delegate.applyWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner),
109             "delegate.callWithNewReadWriteTransactionAndSubmit() == null");
110         return future.catchingAsync(Exception.class, exception -> {
111             if (isRetriableException(exception) && tries - 1 > 0) {
112                 return applyWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner, tries - 1);
113             } else {
114                 throw exception;
115             }
116         }, executor);
117     }
118
119     @Override
120     public <R> R applyWithNewTransactionChainAndClose(Function<ManagedTransactionChain, R> chainConsumer) {
121         throw new UnsupportedOperationException("The retrying transaction manager doesn't support transaction chains");
122     }
123
124     @Override
125     public <D extends Datastore, E extends Exception> void callInterruptiblyWithNewReadOnlyTransactionAndClose(
126             Class<D> datastoreType, InterruptibleCheckedConsumer<TypedReadTransaction<D>, E> txConsumer)
127             throws E, InterruptedException {
128         callInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, maxRetries);
129     }
130
131     @SuppressWarnings("checkstyle:IllegalCatch")
132     private <D extends Datastore, E extends Exception> void callInterruptiblyWithNewReadOnlyTransactionAndClose(
133             Class<D> datastoreType, InterruptibleCheckedConsumer<TypedReadTransaction<D>, E> txConsumer, int tries)
134             throws E, InterruptedException {
135         try {
136             delegate.callInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer);
137         } catch (Exception e) {
138             if (isRetriableException(e) && tries - 1 > 0) {
139                 callInterruptiblyWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, tries - 1);
140             } else {
141                 throw e;
142             }
143         }
144     }
145
146     @Override
147     public <D extends Datastore, E extends Exception> void callWithNewReadOnlyTransactionAndClose(
148             Class<D> datastoreType, CheckedConsumer<TypedReadTransaction<D>, E> txConsumer) throws E {
149         callWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, maxRetries);
150     }
151
152     @SuppressWarnings("checkstyle:IllegalCatch")
153     private <D extends Datastore, E extends Exception> void callWithNewReadOnlyTransactionAndClose(
154             Class<D> datastoreType, CheckedConsumer<TypedReadTransaction<D>, E> txConsumer, int tries) throws E {
155         try {
156             delegate.callWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer);
157         } catch (Exception e) {
158             if (isRetriableException(e) && tries - 1 > 0) {
159                 callWithNewReadOnlyTransactionAndClose(datastoreType, txConsumer, tries - 1);
160             } else {
161                 throw e;
162             }
163         }
164     }
165
166     @Override
167     public <D extends Datastore, E extends Exception> FluentFuture<? extends Object>
168         callWithNewReadWriteTransactionAndSubmit(
169             Class<D> datastoreType, InterruptibleCheckedConsumer<TypedReadWriteTransaction<D>, E> txConsumer) {
170         return callWithNewReadWriteTransactionAndSubmit(datastoreType, txConsumer, maxRetries);
171     }
172
173     private <D extends Datastore, E extends Exception, T> FluentFuture<T>
174         callWithNewReadWriteTransactionAndSubmit(Class<D> datastoreType,
175             InterruptibleCheckedConsumer<TypedReadWriteTransaction<D>, E> txRunner, int tries) {
176
177         return (FluentFuture<T>) requireNonNull(
178             delegate.callWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner),
179             "delegate.callWithNewWriteOnlyTransactionAndSubmit() == null")
180             .catchingAsync(Exception.class, exception -> {
181                 // as per AsyncWriteTransaction.submit()'s JavaDoc re. retries
182                 if (isRetriableException(exception) && tries - 1 > 0) {
183                     return callWithNewReadWriteTransactionAndSubmit(datastoreType, txRunner, tries - 1);
184                 } else {
185                     // out of retries, so propagate the exception
186                     throw exception;
187                 }
188             }, executor);
189     }
190
191     @Override
192     public <D extends Datastore, E extends Exception> FluentFuture<? extends Object>
193         callWithNewWriteOnlyTransactionAndSubmit(Class<D> datastoreType,
194             InterruptibleCheckedConsumer<TypedWriteTransaction<D>, E> txConsumer) {
195         return callWithNewWriteOnlyTransactionAndSubmit(datastoreType, txConsumer, maxRetries);
196     }
197
198     private <D extends Datastore, E extends Exception, T> FluentFuture<T>
199         callWithNewWriteOnlyTransactionAndSubmit(Class<D> datastoreType,
200             InterruptibleCheckedConsumer<TypedWriteTransaction<D>, E> txRunner, int tries) {
201
202         return (FluentFuture<T>) requireNonNull(
203             delegate.callWithNewWriteOnlyTransactionAndSubmit(datastoreType, txRunner),
204             "delegate.callWithNewWriteOnlyTransactionAndSubmit() == null")
205             .catchingAsync(OptimisticLockFailedException.class, optimisticLockFailedException -> {
206                 // as per AsyncWriteTransaction.submit()'s JavaDoc re. retries
207                 if (tries - 1 > 0) {
208                     return callWithNewWriteOnlyTransactionAndSubmit(datastoreType, txRunner, tries - 1);
209                 } else {
210                     // out of retries, so propagate the OptimisticLockFailedException
211                     throw optimisticLockFailedException;
212                 }
213             }, executor);
214     }
215
216     private boolean isRetriableException(Throwable throwable) {
217         return throwable instanceof OptimisticLockFailedException || throwable instanceof ReadFailedException || (
218             throwable instanceof ExecutionException && isRetriableException(throwable.getCause()));
219     }
220 }