/* * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.mdsal.binding.util; import static java.util.Objects.requireNonNull; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.function.Function; import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; import org.opendaylight.mdsal.common.api.ReadFailedException; /** * Implementation of {@link ManagedNewTransactionRunner} with automatic transparent retries on transaction failure * ({@link OptimisticLockFailedException} on write transactions and {@link ReadFailedException} on read transactions * will cause the operation constructing the transaction to be re-run). * This is a package local private internal class; end-users use the {@link RetryingManagedNewTransactionRunner}. * @see RetryingManagedNewTransactionRunner * * @author Michael Vorburger.ch & Stephen Kitt, with input from Tom Pantelis re. catchingAsync & direct Executor */ // intentionally package local class RetryingManagedNewTransactionRunnerImpl implements ManagedNewTransactionRunner { // NB: The RetryingManagedNewTransactionRunnerTest is in mdsalutil-testutils's src/test, not this project's // duplicated in SingleTransactionDataBroker private static final int DEFAULT_RETRIES = 3; private final int maxRetries; private final ManagedNewTransactionRunner delegate; private final Executor executor; RetryingManagedNewTransactionRunnerImpl(final ManagedNewTransactionRunner delegate) { this(delegate, MoreExecutors.directExecutor(), DEFAULT_RETRIES); } RetryingManagedNewTransactionRunnerImpl(final ManagedNewTransactionRunner delegate, final int maxRetries) { this(delegate, MoreExecutors.directExecutor(), maxRetries); } RetryingManagedNewTransactionRunnerImpl(final ManagedNewTransactionRunner delegate, final Executor executor, final int maxRetries) { this.delegate = requireNonNull(delegate, "delegate must not be null"); this.executor = requireNonNull(executor, "executor must not be null"); this.maxRetries = maxRetries; } @Override public R applyInterruptiblyWithNewReadOnlyTransactionAndClose( final D datastore, final InterruptibleCheckedFunction, R, E> txFunction) throws E, InterruptedException { return applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txFunction, maxRetries); } @SuppressWarnings("checkstyle:IllegalCatch") private R applyInterruptiblyWithNewReadOnlyTransactionAndClose( final D datastore, final InterruptibleCheckedFunction, R, E> txFunction, final int tries) throws E, InterruptedException { try { return delegate.applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txFunction); } catch (Exception e) { if (isRetriableException(e) && tries - 1 > 0) { return applyInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txFunction, tries - 1); } else { throw e; } } } @Override public R applyWithNewReadOnlyTransactionAndClose(final D datastore, final CheckedFunction, R, E> txFunction) throws E { return applyWithNewReadOnlyTransactionAndClose(datastore, txFunction, maxRetries); } @SuppressWarnings("checkstyle:IllegalCatch") private R applyWithNewReadOnlyTransactionAndClose(final D datastore, final CheckedFunction, R, E> txFunction, final int tries) throws E { try { return delegate.applyWithNewReadOnlyTransactionAndClose(datastore, txFunction); } catch (Exception e) { if (isRetriableException(e) && tries - 1 > 0) { return applyWithNewReadOnlyTransactionAndClose(datastore, txFunction, tries - 1); } else { throw e; } } } @Override public FluentFuture applyWithNewReadWriteTransactionAndSubmit( final D datastore, final InterruptibleCheckedFunction, R, E> txFunction) { return applyWithNewReadWriteTransactionAndSubmit(datastore, txFunction, maxRetries); } private FluentFuture applyWithNewReadWriteTransactionAndSubmit( final D datastore, final InterruptibleCheckedFunction, R, E> txRunner, final int tries) { FluentFuture future = requireNonNull( delegate.applyWithNewReadWriteTransactionAndSubmit(datastore, txRunner), "delegate.callWithNewReadWriteTransactionAndSubmit() == null"); return future.catchingAsync(Exception.class, exception -> { if (isRetriableException(exception) && tries - 1 > 0) { return applyWithNewReadWriteTransactionAndSubmit(datastore, txRunner, tries - 1); } else { throw exception; } }, executor); } @Override public R applyWithNewTransactionChainAndClose(final Function chainConsumer) { throw new UnsupportedOperationException("The retrying transaction manager doesn't support transaction chains"); } @Override public void callInterruptiblyWithNewReadOnlyTransactionAndClose( final D datastore, final InterruptibleCheckedConsumer, E> txConsumer) throws E, InterruptedException { callInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txConsumer, maxRetries); } @SuppressWarnings("checkstyle:IllegalCatch") private void callInterruptiblyWithNewReadOnlyTransactionAndClose( final D datastore, final InterruptibleCheckedConsumer, E> txConsumer, final int tries) throws E, InterruptedException { try { delegate.callInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txConsumer); } catch (Exception e) { if (isRetriableException(e) && tries - 1 > 0) { callInterruptiblyWithNewReadOnlyTransactionAndClose(datastore, txConsumer, tries - 1); } else { throw e; } } } @Override public void callWithNewReadOnlyTransactionAndClose(final D datastore, final CheckedConsumer, E> txConsumer) throws E { callWithNewReadOnlyTransactionAndClose(datastore, txConsumer, maxRetries); } @SuppressWarnings("checkstyle:IllegalCatch") private void callWithNewReadOnlyTransactionAndClose(final D datastore, final CheckedConsumer, E> txConsumer, final int tries) throws E { try { delegate.callWithNewReadOnlyTransactionAndClose(datastore, txConsumer); } catch (Exception e) { if (isRetriableException(e) && tries - 1 > 0) { callWithNewReadOnlyTransactionAndClose(datastore, txConsumer, tries - 1); } else { throw e; } } } @Override public FluentFuture callWithNewReadWriteTransactionAndSubmit(final D datastore, final InterruptibleCheckedConsumer, E> txConsumer) { return callWithNewReadWriteTransactionAndSubmit(datastore, txConsumer, maxRetries); } private FluentFuture callWithNewReadWriteTransactionAndSubmit( final D datastore, final InterruptibleCheckedConsumer, E> txRunner, final int tries) { return (FluentFuture) requireNonNull( delegate.callWithNewReadWriteTransactionAndSubmit(datastore, txRunner), "delegate.callWithNewWriteOnlyTransactionAndSubmit() == null") .catchingAsync(Exception.class, exception -> { // as per AsyncWriteTransaction.submit()'s JavaDoc re. retries if (isRetriableException(exception) && tries - 1 > 0) { return callWithNewReadWriteTransactionAndSubmit(datastore, txRunner, tries - 1); } else { // out of retries, so propagate the exception throw exception; } }, executor); } @Override public FluentFuture callWithNewWriteOnlyTransactionAndSubmit(final D datastore, final InterruptibleCheckedConsumer, E> txConsumer) { return callWithNewWriteOnlyTransactionAndSubmit(datastore, txConsumer, maxRetries); } private FluentFuture callWithNewWriteOnlyTransactionAndSubmit( final D datastore, final InterruptibleCheckedConsumer, E> txRunner, final int tries) { return (FluentFuture) requireNonNull( delegate.callWithNewWriteOnlyTransactionAndSubmit(datastore, txRunner), "delegate.callWithNewWriteOnlyTransactionAndSubmit() == null") .catchingAsync(OptimisticLockFailedException.class, optimisticLockFailedException -> { // as per AsyncWriteTransaction.submit()'s JavaDoc re. retries if (tries - 1 > 0) { return callWithNewWriteOnlyTransactionAndSubmit(datastore, txRunner, tries - 1); } else { // out of retries, so propagate the OptimisticLockFailedException throw optimisticLockFailedException; } }, executor); } private boolean isRetriableException(final Throwable throwable) { return throwable instanceof OptimisticLockFailedException || throwable instanceof ReadFailedException || throwable instanceof ExecutionException && isRetriableException(throwable.getCause()); } }