*/
package org.opendaylight.mdsal.binding.api;
-import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
/**
* Provides access to a conceptual data tree store and also provides the ability to
* <b>Implementation Note:</b> This interface is not intended to be implemented by users of MD-SAL,
* but only to be consumed by them.
*/
+@NonNullByDefault
public interface DataBroker extends BindingService, TransactionFactory, DataTreeChangeService {
/**
* Create a new transaction chain. The chain will be initialized to read from its backing datastore, with
* no outstanding transaction. Listener will be registered to handle chain-level events.
*
- * @param listener Transaction chain event listener
* @return A new transaction chain.
*/
- @NonNull TransactionChain createTransactionChain(@NonNull TransactionChainListener listener);
+ TransactionChain createTransactionChain();
/**
* Create a new transaction chain. The chain will be initialized to read from its backing datastore, with
* no outstanding transaction. Listener will be registered to handle chain-level events.
*
* <p>
- * Unlike {@link #createTransactionChain(TransactionChainListener)}, the transaction chain returned by this
- * method is allowed to merge individual transactions into larger chunks. When transactions are merged, the results
- * must be indistinguishable from the result of all operations having been performed on a single transaction.
+ * Unlike {@link #createTransactionChain()}, the transaction chain returned by this method is allowed to merge
+ * individual transactions into larger chunks. When transactions are merged, the results must be indistinguishable
+ * from the result of all operations having been performed on a single transaction.
*
* <p>
* When transactions are merged, {@link TransactionChain#newReadOnlyTransaction()} may actually be backed by
* a read-write transaction, hence an additional restriction on API use is that multiple read-only transactions
* may not be open at the same time.
*
- * @param listener Transaction chain event listener
* @return A new transaction chain.
*/
- @NonNull TransactionChain createMergingTransactionChain(@NonNull TransactionChainListener listener);
+ TransactionChain createMergingTransactionChain();
}
*/
package org.opendaylight.mdsal.binding.api;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Executor;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.Empty;
/**
* A chain of transactions. Transactions in a chain need to be committed in sequence and each transaction should see
*/
@Override
ReadWriteTransaction newReadWriteTransaction();
+
+ /**
+ * Add a completion callback to execute when {@link #future()} completes. This is a shorthand for
+ * {@code Futures.addCallback(future(), callback, MoreExecutors.directExecutor())}.
+ *
+ * @param callback completion callback
+ */
+ default void addCallback(final FutureCallback<Empty> callback) {
+ addCallback(callback, MoreExecutors.directExecutor());
+ }
+
+ /**
+ * Add a completion callback to execute on specified executor when {@link #future()} completes. This is a shorthand
+ * for {@code Futures.addCallback(future(), callback, executor)}.
+ *
+ * @param callback completion callback
+ * @param executor executor on which to execute the callback
+ */
+ default void addCallback(final FutureCallback<Empty> callback, final Executor executor) {
+ Futures.addCallback(future(), callback, executor);
+ }
+
+ /**
+ * Return a {@link ListenableFuture} which completes when this chain completes.
+ *
+ * @return A {@link ListenableFuture}
+ */
+ @NonNull ListenableFuture<Empty> future();
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.mdsal.binding.api;
-
-import org.eclipse.jdt.annotation.NonNull;
-
-/**
- * Listener for transaction chain events.
- */
-// FIXME: 6.0.0: remove this in favor of a TransactionChain destiny, available as a FluentFuture from TransactionChain
-public interface TransactionChainListener {
- /**
- * Invoked if when a transaction in the chain fails. All transactions submitted after the failed transaction, in the
- * chain, are automatically cancelled by the time this notification is invoked. Open transactions need to be closed
- * or cancelled.
- * Implementations should invoke chain.close() to close the chain.
- *
- * @param chain Transaction chain which failed
- * @param transaction Transaction which caused the chain to fail
- * @param cause The cause of transaction failure
- */
- void onTransactionChainFailed(@NonNull TransactionChain chain, @NonNull Transaction transaction,
- @NonNull Throwable cause);
-
- /**
- * Invoked when a transaction chain is completed. A transaction chain is considered completed when it has been
- * closed and all its instructions have completed successfully.
- *
- * @param chain Transaction chain which completed
- */
- void onTransactionChainSuccessful(@NonNull TransactionChain chain);
-}
-
import org.opendaylight.mdsal.binding.api.ReadTransaction;
import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.binding.dom.adapter.BindingDOMAdapterBuilder.Factory;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
}
@Override
- public TransactionChain createTransactionChain(final TransactionChainListener listener) {
- return new BindingDOMTransactionChainAdapter(getDelegate()::createTransactionChain, adapterContext(), listener);
+ public TransactionChain createTransactionChain() {
+ return new BindingDOMTransactionChainAdapter(getDelegate().createTransactionChain(), adapterContext());
}
@Override
- public TransactionChain createMergingTransactionChain(final TransactionChainListener listener) {
- return new BindingDOMTransactionChainAdapter(getDelegate()::createMergingTransactionChain, adapterContext(),
- listener);
+ public TransactionChain createMergingTransactionChain() {
+ return new BindingDOMTransactionChainAdapter(getDelegate().createMergingTransactionChain(), adapterContext());
}
@Override
*/
package org.opendaylight.mdsal.binding.dom.adapter;
-import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.function.Function;
import java.util.function.Supplier;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.binding.api.ReadTransaction;
import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
import org.opendaylight.mdsal.binding.api.TransactionChain;
import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
import org.opendaylight.yangtools.concepts.Delegator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.yang.common.Empty;
final class BindingDOMTransactionChainAdapter implements TransactionChain, Delegator<DOMTransactionChain> {
+ private final @NonNull DOMTransactionChain delegate;
+ private final @NonNull AdapterContext adapterContext;
- private static final Logger LOG = LoggerFactory.getLogger(BindingDOMTransactionChainAdapter.class);
-
- private final DOMTransactionChain delegate;
- private final AdapterContext adapterContext;
- private final DelegateChainListener domListener;
- private final TransactionChainListener bindingListener;
-
- BindingDOMTransactionChainAdapter(final Function<DOMTransactionChainListener, DOMTransactionChain> chainFactory,
- final AdapterContext codec, final TransactionChainListener listener) {
- requireNonNull(chainFactory, "DOM Transaction chain factory must not be null");
- this.domListener = new DelegateChainListener();
- this.bindingListener = listener;
- this.delegate = chainFactory.apply(domListener);
- this.adapterContext = requireNonNull(codec);
+ BindingDOMTransactionChainAdapter(final DOMTransactionChain delegate, final AdapterContext adapterContext) {
+ this.delegate = requireNonNull(delegate);
+ this.adapterContext = requireNonNull(adapterContext);
}
@Override
@Override
public WriteTransaction newWriteOnlyTransaction() {
- final DOMDataTreeWriteTransaction delegateTx = createTransaction(delegate::newWriteOnlyTransaction);
- return new BindingDOMWriteTransactionAdapter<>(adapterContext, delegateTx) {
- @Override
- public FluentFuture<? extends CommitInfo> commit() {
- return listenForFailure(this, super.commit());
- }
- };
+ return new BindingDOMWriteTransactionAdapter<>(adapterContext,
+ createTransaction(delegate::newWriteOnlyTransaction));
}
@Override
public ReadWriteTransaction newReadWriteTransaction() {
- final DOMDataTreeReadWriteTransaction delegateTx = createTransaction(delegate::newReadWriteTransaction);
- return new BindingDOMReadWriteTransactionAdapter(adapterContext, delegateTx) {
- @Override
- public FluentFuture<? extends CommitInfo> commit() {
- return listenForFailure(this, super.commit());
- }
- };
- }
-
- private <T, F extends ListenableFuture<T>> F listenForFailure(final WriteTransaction tx, final F future) {
- Futures.addCallback(future, new FutureCallback<T>() {
- @Override
- public void onFailure(final Throwable throwable) {
- failTransactionChain(tx, throwable);
- }
-
- @Override
- public void onSuccess(final T result) {
- // Intentionally NOOP
- }
- }, MoreExecutors.directExecutor());
-
- return future;
+ return new BindingDOMReadWriteTransactionAdapter(adapterContext,
+ createTransaction(delegate::newReadWriteTransaction));
}
- private void failTransactionChain(final WriteTransaction tx, final Throwable throwable) {
- /*
- * We asume correct state change for underlaying transaction
- *
- * chain, so we are not changing any of our internal state
- * to mark that we failed.
- */
- this.bindingListener.onTransactionChainFailed(this, tx, throwable);
+ @Override
+ public ListenableFuture<Empty> future() {
+ return delegate.future();
}
@Override
throw new TransactionChainClosedException("Transaction chain already closed", e);
}
}
-
- private final class DelegateChainListener implements DOMTransactionChainListener {
- @Override
- public void onTransactionChainFailed(final DOMTransactionChain chain, final DOMDataTreeTransaction transaction,
- final Throwable cause) {
- checkState(delegate.equals(chain), "Listener for %s was invoked for incorrect chain %s.", delegate, chain);
- /*
- * Intentionally NOOP, callback for failure, since we
- * are also listening on each transaction future for failure,
- * in order to have reference to Binding Transaction (which was seen by client
- * of this transaction chain), instead of DOM transaction
- * which is known only to this chain, binding transaction implementation
- * and underlying transaction chain.
- */
- LOG.debug("Transaction chain {} failed. Failed DOM Transaction {}",this,transaction,cause);
- }
-
- @Override
- public void onTransactionChainSuccessful(final DOMTransactionChain chain) {
- checkState(delegate.equals(chain), "Listener for %s was invoked for incorrect chain %s.", delegate, chain);
- bindingListener.onTransactionChainSuccessful(BindingDOMTransactionChainAdapter.this);
- }
- }
}
import org.opendaylight.mdsal.binding.api.ReadTransaction;
import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.DataObject;
}
@Override
- public TransactionChain createTransactionChain(final TransactionChainListener listener) {
- return delegate.createTransactionChain(listener);
+ public TransactionChain createTransactionChain() {
+ return delegate.createTransactionChain();
}
@Override
- public TransactionChain createMergingTransactionChain(final TransactionChainListener listener) {
- return delegate.createMergingTransactionChain(listener);
+ public TransactionChain createMergingTransactionChain() {
+ return delegate.createMergingTransactionChain();
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.util.Optional;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMService;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
import org.opendaylight.yang.gen.v1.bug8449.rev170516.Top;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
private DOMDataBroker domService;
@Mock
private AdapterContext mockContext;
+ @Mock
+ private DOMTransactionChain domChain;
private BindingDOMAdapterLoader bindingDOMAdapterLoader;
@Test
public void createChainTest() {
final var adapter = assertDataBrokerAdapter();
- assertNotNull(adapter.createTransactionChain(mock(TransactionChainListener.class)));
+ doReturn(domChain).when(domService).createTransactionChain();
+ assertNotNull(adapter.createTransactionChain());
}
@Test
import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import java.util.Collection;
import java.util.List;
-import java.util.function.BiFunction;
+import java.util.function.Function;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.quality.Strictness;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecServices;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
@RunWith(Parameterized.class)
public class BindingDOMTransactionChainAdapterTest {
- public enum TransactionChainType implements BiFunction<DataBroker, TransactionChainListener, TransactionChain> {
+ public enum TransactionChainType implements Function<DataBroker, TransactionChain> {
NORMAL {
@Override
- public TransactionChain apply(final DataBroker broker, final TransactionChainListener listener) {
- return broker.createTransactionChain(listener);
+ public TransactionChain apply(final DataBroker broker) {
+ return broker.createTransactionChain();
}
@Override
},
MERGING {
@Override
- public TransactionChain apply(final DataBroker broker, final TransactionChainListener listener) {
- return broker.createMergingTransactionChain(listener);
+ public TransactionChain apply(final DataBroker broker) {
+ return broker.createMergingTransactionChain();
}
@Override
@Mock
private DOMTransactionChain transactionChain;
@Mock
- private TransactionChainListener transactionChainListener;
- @Mock
private BindingDOMCodecServices mockCodecRegistry;
private BindingDOMTransactionChainAdapter bindingDOMTransactionChainAdapter;
@Before
public void setUp() {
- doReturn(transactionChain).when(domService).createTransactionChain(any());
+ doReturn(transactionChain).when(domService).createTransactionChain();
if (type == TransactionChainType.MERGING) {
- doCallRealMethod().when(domService).createMergingTransactionChain(any());
+ doCallRealMethod().when(domService).createMergingTransactionChain();
}
BindingDOMDataBrokerAdapter bindingDOMDataBrokerAdapter =
(BindingDOMDataBrokerAdapter) bindingDOMAdapterLoader.load(DataBroker.class).orElseThrow();
bindingDOMTransactionChainAdapter =
- (BindingDOMTransactionChainAdapter) type.apply(bindingDOMDataBrokerAdapter, transactionChainListener);
+ (BindingDOMTransactionChainAdapter) type.apply(bindingDOMDataBrokerAdapter);
assertNotNull(bindingDOMTransactionChainAdapter.getDelegate());
}
mockReadWrite(transactionChain);
assertNotNull(bindingDOMTransactionChainAdapter.newReadWriteTransaction());
}
-
}
import org.opendaylight.mdsal.binding.api.ReadTransaction;
import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.DataObject;
}
@Override
- public TransactionChain createTransactionChain(final TransactionChainListener listener) {
- return delegate().createTransactionChain(listener);
+ public TransactionChain createTransactionChain() {
+ return delegate().createTransactionChain();
}
@Override
- public TransactionChain createMergingTransactionChain(final TransactionChainListener listener) {
- return delegate().createMergingTransactionChain(listener);
+ public TransactionChain createMergingTransactionChain() {
+ return delegate().createMergingTransactionChain();
}
}
import com.google.common.annotations.Beta;
import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
import edu.umd.cs.findbugs.annotations.CheckReturnValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.function.Function;
import javax.inject.Inject;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.Transaction;
-import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Do *NOT* mark this as @Singleton, because users choose their implementation
public class ManagedNewTransactionRunnerImpl extends ManagedTransactionFactoryImpl<DataBroker>
implements ManagedNewTransactionRunner {
-
private static final Logger LOG = LoggerFactory.getLogger(ManagedNewTransactionRunnerImpl.class);
@Inject
@Override
@SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE")
public <R> R applyWithNewTransactionChainAndClose(final Function<ManagedTransactionChain, R> chainConsumer) {
- try (TransactionChain realTxChain = getTransactionFactory().createTransactionChain(
- new TransactionChainListener() {
+ try (var realTxChain = getTransactionFactory().createTransactionChain()) {
+ realTxChain.addCallback(new FutureCallback<>() {
@Override
- public void onTransactionChainFailed(TransactionChain chain, Transaction transaction, Throwable cause) {
- LOG.error("Error handling a transaction chain", cause);
+ public void onSuccess(final @Nullable Empty result) {
+ // Nothing to do
}
@Override
- public void onTransactionChainSuccessful(TransactionChain chain) {
- // Nothing to do
+ public void onFailure(final @Nullable Throwable cause) {
+ LOG.error("Error handling a transaction chain", cause);
}
- })) {
+ });
return chainConsumer.apply(new ManagedTransactionChainImpl(realTxChain));
}
}
/**
* Create a new transaction chain. The chain will be initialized to read from its backing datastore, with
- * no outstanding transaction. Listener will be registered to handle chain-level events.
+ * no outstanding transaction.
*
- * @param listener Transaction chain event listener
* @return A new transaction chain.
*/
- @NonNull DOMTransactionChain createTransactionChain(DOMTransactionChainListener listener);
+ @NonNull DOMTransactionChain createTransactionChain();
/**
* Create a new transaction chain. The chain will be initialized to read from its backing datastore, with
- * no outstanding transaction. Listener will be registered to handle chain-level events.
+ * no outstanding transaction.
*
* <p>
- * Unlike {@link #createTransactionChain(DOMTransactionChainListener)}, the transaction chain returned by this
- * method is allowed to merge individual transactions into larger chunks. When transactions are merged, the results
- * must be indistinguishable from the result of all operations having been performed on a single transaction.
+ * Unlike {@link #createTransactionChain()}, the transaction chain returned by this method is allowed to merge
+ * individual transactions into larger chunks. When transactions are merged, the results must be indistinguishable
+ * from the result of all operations having been performed on a single transaction.
*
* <p>
* When transactions are merged, {@link DOMTransactionChain#newReadOnlyTransaction()} may actually be backed by
* a read-write transaction, hence an additional restriction on API use is that multiple read-only transactions
* may not be open at the same time.
*
- * @param listener Transaction chain event listener
* @return A new transaction chain.
*/
- @NonNull DOMTransactionChain createMergingTransactionChain(DOMTransactionChainListener listener);
+ @NonNull DOMTransactionChain createMergingTransactionChain();
}
*/
package org.opendaylight.mdsal.dom.api;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Executor;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.Empty;
/**
* A chain of transactions. Transactions in a chain need to be committed in sequence and each transaction should see
*/
@Override
DOMDataTreeReadWriteTransaction newReadWriteTransaction();
+
+ /**
+ * Add a completion callback to execute when {@link #future()} completes. This is a shorthand for
+ * {@code Futures.addCallback(future(), callback, MoreExecutors.directExecutor())}.
+ *
+ * @param callback completion callback
+ */
+ default void addCallback(final FutureCallback<Empty> callback) {
+ addCallback(callback, MoreExecutors.directExecutor());
+ }
+
+ /**
+ * Add a completion callback to execute on specified executor when {@link #future()} completes. This is a shorthand
+ * for {@code Futures.addCallback(future(), callback, executor)}.
+ *
+ * @param callback completion callback
+ * @param executor executor on which to execute the callback
+ */
+ default void addCallback(final FutureCallback<Empty> callback, final Executor executor) {
+ Futures.addCallback(future(), callback, executor);
+ }
+
+ /**
+ * Return a {@link ListenableFuture} which completes when this chain completes.
+ *
+ * @return A {@link ListenableFuture}
+ */
+ @NonNull ListenableFuture<Empty> future();
}
+++ /dev/null
-/*
- * Copyright (c) 2018 Pantheon Technologies, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.mdsal.dom.api;
-
-/**
- * Listener for transaction chain events.
- */
-// FIXME: 6.0.0: remove this in favor of a TransactionChain destiny, available as a FluentFuture from
-// DOMTransactionChain
-public interface DOMTransactionChainListener {
- /**
- * Invoked if when a transaction in the chain fails. All transactions submitted after the failed transaction, in the
- * chain, are automatically cancelled by the time this notification is invoked. Open transactions need to be closed
- * or cancelled.
- * Implementations should invoke chain.close() to close the chain.
- *
- * @param chain Transaction chain which failed
- * @param transaction Transaction which caused the chain to fail
- * @param cause The cause of transaction failure
- */
- void onTransactionChainFailed(DOMTransactionChain chain, DOMDataTreeTransaction transaction, Throwable cause);
-
- /**
- * Invoked when a transaction chain is completed. A transaction chain is considered completed when it has been
- * closed and all its instructions have completed successfully.
- *
- * @param chain Transaction chain which completed
- */
- void onTransactionChainSuccessful(DOMTransactionChain chain);
-}
-
requires static transitive java.annotation;
requires static transitive javax.inject;
requires static com.github.spotbugs.annotations;
+ requires static org.eclipse.jdt.annotation;
requires static org.kohsuke.metainf_services;
requires static org.osgi.service.component.annotations;
requires static org.osgi.service.metatype.annotations;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
import org.opendaylight.mdsal.dom.spi.PingPongMergingDOMDataBroker;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
@Override
- public DOMTransactionChain createTransactionChain(final DOMTransactionChainListener listener) {
+ public DOMTransactionChain createTransactionChain() {
checkNotClosed();
final var delegates = new EnumMap<LogicalDatastoreType, DOMStoreTransactionChain>(LogicalDatastoreType.class);
}
final long chainId = chainNum.getAndIncrement();
- LOG.debug("Transactoin chain {} created with listener {}, backing store chains {}", chainId, listener,
- delegates);
- return new DOMDataBrokerTransactionChainImpl(chainId, delegates, this, listener);
+ LOG.debug("Transactoin chain {} created, backing store chains {}", chainId, delegates);
+ return new DOMDataBrokerTransactionChainImpl(chainId, delegates, this);
}
}
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final AtomicReferenceFieldUpdater<DOMDataBrokerTransactionChainImpl, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, State.class, "state");
private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class);
+
+ private final @NonNull SettableFuture<Empty> future = SettableFuture.create();
private final AtomicLong txNum = new AtomicLong();
private final AbstractDOMDataBroker broker;
- private final DOMTransactionChainListener listener;
private final long chainId;
private volatile State state = State.RUNNING;
* If any of arguments is null.
*/
DOMDataBrokerTransactionChainImpl(final long chainId,
- final Map<LogicalDatastoreType, DOMStoreTransactionChain> chains,
- final AbstractDOMDataBroker broker, final DOMTransactionChainListener listener) {
+ final Map<LogicalDatastoreType, DOMStoreTransactionChain> chains, final AbstractDOMDataBroker broker) {
super(chains);
this.chainId = chainId;
this.broker = requireNonNull(broker);
- this.listener = requireNonNull(listener);
}
private void checkNotFailed() {
Preconditions.checkState(state != State.FAILED, "Transaction chain has failed");
}
+ @Override
+ public ListenableFuture<Empty> future() {
+ return future;
+ }
+
@Override
protected Object newTransactionIdentifier() {
return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
checkNotFailed();
checkNotClosed();
- final FluentFuture<? extends CommitInfo> ret = broker.commit(transaction, cohort);
-
+ final var ret = broker.commit(transaction, cohort);
COUNTER_UPDATER.incrementAndGet(this);
+
ret.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
private void finishClose() {
state = State.CLOSED;
- listener.onTransactionChainSuccessful(this);
+ future.set(Empty.value());
}
private void transactionCompleted() {
private void transactionFailed(final DOMDataTreeWriteTransaction tx, final Throwable cause) {
state = State.FAILED;
LOG.debug("Transaction chain {}Â failed.", this, cause);
- listener.onTransactionChainFailed(this, tx, cause);
+ future.setException(cause);
}
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.mdsal.dom.broker;
-
-import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
-
-/**
- * Simple implementation of {@link TransactionChainListener} for testing.
- *
- *<p>
- * This transaction chain listener does not contain any logic, only update
- * futures ({@link #getFailFuture()} and {@link #getSuccessFuture()} when
- * transaction chain event is retrieved.
- *
- */
-class BlockingTransactionChainListener implements DOMTransactionChainListener {
-
- private final SettableFuture<Throwable> failFuture = SettableFuture.create();
- private final SettableFuture<Void> successFuture = SettableFuture.create();
-
- @Override
- public void onTransactionChainFailed(final DOMTransactionChain chain, final DOMDataTreeTransaction transaction,
- final Throwable cause) {
- failFuture.set(cause);
- }
-
- @Override
- public void onTransactionChainSuccessful(final DOMTransactionChain chain) {
- successFuture.set(null);
- }
-
- public SettableFuture<Throwable> getFailFuture() {
- return failFuture;
- }
-
- public SettableFuture<Void> getSuccessFuture() {
- return successFuture;
- }
-
-}
*/
package org.opendaylight.mdsal.dom.broker;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
@Test
public void testTransactionChainNoConflict() throws InterruptedException, ExecutionException, TimeoutException {
- final BlockingTransactionChainListener listener = new BlockingTransactionChainListener();
- final DOMTransactionChain txChain = domBroker.createTransactionChain(listener);
+ final DOMTransactionChain txChain = domBroker.createTransactionChain();
assertNotNull(txChain);
/**
*/
txChain.close();
- listener.getSuccessFuture().get(1000, TimeUnit.MILLISECONDS);
+ txChain.future().get(1000, TimeUnit.MILLISECONDS);
}
- @SuppressWarnings("checkstyle:IllegalCatch")
@Test
- public void testTransactionChainNotSealed() throws InterruptedException, ExecutionException, TimeoutException {
- final BlockingTransactionChainListener listener = new BlockingTransactionChainListener();
- final DOMTransactionChain txChain = domBroker.createTransactionChain(listener);
+ public void testTransactionChainNotSealed() {
+ final var txChain = domBroker.createTransactionChain();
assertNotNull(txChain);
/**
* still not committed to datastore, so this allocation should fail with
* IllegalStateException.
*/
- try {
- allocateAndWrite(txChain); // actual backing tx allocation happens on put
- fail("Allocation of secondReadTx should fail with IllegalStateException");
- } catch (final Exception e) {
- assertTrue(e instanceof IllegalStateException);
- }
+ // actual backing tx allocation happens on put
+ final var ex = assertThrows(IllegalStateException.class, () -> allocateAndWrite(txChain));
+ assertEquals("Previous transaction OPER-0 is not ready yet", ex.getMessage());
}
private static DOMDataTreeWriteTransaction allocateAndDelete(final DOMTransactionChain txChain)
return tx;
}
- private static DOMDataTreeWriteTransaction allocateAndWrite(final DOMTransactionChain txChain)
- throws InterruptedException, ExecutionException {
- final DOMDataTreeWriteTransaction tx = txChain.newWriteOnlyTransaction();
+ private static DOMDataTreeWriteTransaction allocateAndWrite(final DOMTransactionChain txChain) {
+ final var tx = txChain.newWriteOnlyTransaction();
writeTestContainer(tx);
return tx;
}
assertTrue(readedData.isPresent());
}
- private static void writeTestContainer(final DOMDataTreeWriteTransaction tx) throws InterruptedException,
- ExecutionException {
+ private static void writeTestContainer(final DOMDataTreeWriteTransaction tx) {
tx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
}
}
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CancellationException;
-import java.util.function.Function;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
abstract class AbstractPingPongTransactionChain implements DOMTransactionChain {
private static final Logger LOG = LoggerFactory.getLogger(AbstractPingPongTransactionChain.class);
- private final DOMTransactionChainListener listener;
- private final DOMTransactionChain delegate;
+ private final @NonNull SettableFuture<Empty> future = SettableFuture.create();
+ private final @NonNull DOMTransactionChain delegate;
@GuardedBy("this")
private boolean closed;
}
}
- AbstractPingPongTransactionChain(final Function<DOMTransactionChainListener, DOMTransactionChain> delegateFactory,
- final DOMTransactionChainListener listener) {
- this.listener = requireNonNull(listener);
- delegate = delegateFactory.apply(new DOMTransactionChainListener() {
+ AbstractPingPongTransactionChain(final DOMTransactionChain delegate) {
+ this.delegate = requireNonNull(delegate);
+ delegate.addCallback(new FutureCallback<>() {
@Override
- public void onTransactionChainFailed(final DOMTransactionChain chain,
- final DOMDataTreeTransaction transaction, final Throwable cause) {
- LOG.debug("Transaction chain {} reported failure in {}", chain, transaction, cause);
- delegateFailed(chain, cause);
+ public void onSuccess(final Empty result) {
+ delegateSuccessful();
}
@Override
- public void onTransactionChainSuccessful(final DOMTransactionChain chain) {
- delegateSuccessful(chain);
+ public void onFailure(final Throwable cause) {
+ delegateFailed(cause);
}
});
}
- private void delegateSuccessful(final DOMTransactionChain chain) {
+ @Override
+ public final ListenableFuture<Empty> future() {
+ return future;
+ }
+
+ private void delegateSuccessful() {
final Entry<PingPongTransaction, Throwable> canceled;
synchronized (this) {
// This looks weird, but we need not hold the lock while invoking callbacks
}
if (canceled == null) {
- listener.onTransactionChainSuccessful(this);
+ future.set(Empty.value());
return;
}
// Backend shutdown successful, but we have a batch of transactions we have to report as dead due to the
// user calling cancel().
- final PingPongTransaction tx = canceled.getKey();
- final Throwable cause = canceled.getValue();
- LOG.debug("Transaction chain {} successful, failing cancelled transaction {}", chain, tx, cause);
+ final var tx = canceled.getKey();
+ final var cause = canceled.getValue();
+ LOG.debug("Transaction chain {} successful, failing cancelled transaction {}", delegate, tx, cause);
- listener.onTransactionChainFailed(this, tx.getFrontendTransaction(), cause);
+ future.setException(cause);
tx.onFailure(cause);
}
- private void delegateFailed(final DOMTransactionChain chain, final Throwable cause) {
- final DOMDataTreeReadWriteTransaction frontend;
- final PingPongTransaction tx = inflightTx;
+ private void delegateFailed(final Throwable cause) {
+ LOG.debug("Transaction chain {} reported failure", delegate, cause);
+
+ final var tx = inflightTx;
if (tx == null) {
- LOG.warn("Transaction chain {} failed with no pending transactions", chain);
- frontend = null;
- } else {
- frontend = tx.getFrontendTransaction();
+ LOG.warn("Transaction chain {} failed with no pending transactions", delegate);
}
-
- listener.onTransactionChainFailed(this, frontend, cause);
+ future.setException(cause);
synchronized (this) {
failed = true;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
/**
* Utility {@link DOMDataBroker} implementation which forwards all interface method invocation to a delegate instance.
}
@Override
- public DOMTransactionChain createTransactionChain(final DOMTransactionChainListener listener) {
- return delegate().createTransactionChain(listener);
+ public DOMTransactionChain createTransactionChain() {
+ return delegate().createTransactionChain();
}
@Override
- public DOMTransactionChain createMergingTransactionChain(final DOMTransactionChainListener listener) {
- return delegate().createMergingTransactionChain(listener);
+ public DOMTransactionChain createMergingTransactionChain() {
+ return delegate().createMergingTransactionChain();
}
}
package org.opendaylight.mdsal.dom.spi;
import com.google.common.collect.ForwardingObject;
-import org.eclipse.jdt.annotation.NonNull;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.yangtools.yang.common.Empty;
/**
* Utility {@link DOMTransactionChain} implementation which forwards all interface
* method invocation to a delegate instance.
*/
+@NonNullByDefault
public abstract class ForwardingDOMTransactionChain extends ForwardingObject implements DOMTransactionChain {
@Override
- protected abstract @NonNull DOMTransactionChain delegate();
+ protected abstract DOMTransactionChain delegate();
@Override
public void close() {
public DOMDataTreeReadWriteTransaction newReadWriteTransaction() {
return delegate().newReadWriteTransaction();
}
+
+ @Override
+ public ListenableFuture<Empty> future() {
+ return delegate().future();
+ }
}
import com.google.common.annotations.Beta;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
/**
* Utility mixin interface for {@link DOMDataBroker}s which realize merging transaction chains via
- * {@link PingPongTransactionChain}. It provides {@link #createMergingTransactionChain(DOMTransactionChainListener)}
- * as a default method combining {@link PingPongTransactionChain} with
- * {@link #createTransactionChain(DOMTransactionChainListener)}.
+ * {@link PingPongTransactionChain}. It provides {@link #createMergingTransactionChain()}
+ * as a default method combining {@link PingPongTransactionChain} with {@link #createTransactionChain()}.
*/
@Beta
public interface PingPongMergingDOMDataBroker extends DOMDataBroker {
@Override
- default DOMTransactionChain createMergingTransactionChain(final DOMTransactionChainListener listener) {
- return new PingPongTransactionChain(this::createTransactionChain, listener);
+ default DOMTransactionChain createMergingTransactionChain() {
+ return new PingPongTransactionChain(createTransactionChain());
}
}
*/
package org.opendaylight.mdsal.dom.spi;
-import java.util.function.Function;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
/**
* An implementation of {@link DOMTransactionChain}, which has a very specific behavior, which some users may find
* transaction and the user may not allocate multiple read-only transactions at the same time.
*/
public final class PingPongTransactionChain extends AbstractPingPongTransactionChain {
- public PingPongTransactionChain(final Function<DOMTransactionChainListener, DOMTransactionChain> delegateFactory,
- final DOMTransactionChainListener listener) {
- super(delegateFactory, listener);
+ public PingPongTransactionChain(final DOMTransactionChain delegate) {
+ super(delegate);
}
}
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
@ExtendWith(MockitoExtension.class)
class ForwardingDOMDataBrokerTest {
@Mock
private DOMTransactionChain chain;
@Mock
- private DOMTransactionChainListener chainListener;
- @Mock
private Extension extension;
@Mock
private DOMDataBroker domDataBroker;
}
};
- doReturn(chain).when(domDataBroker).createTransactionChain(chainListener);
- assertSame(chain, impl.createTransactionChain(chainListener));
+ doReturn(chain).when(domDataBroker).createTransactionChain();
+ assertSame(chain, impl.createTransactionChain());
doReturn(List.of(extension)).when(domDataBroker).supportedExtensions();
assertSame(extension, impl.extension(Extension.class));
@Test
public void basicTest() throws Exception {
doReturn(null).when(domTransactionChain).newWriteOnlyTransaction();
- this.newWriteOnlyTransaction();
+ newWriteOnlyTransaction();
verify(domTransactionChain).newWriteOnlyTransaction();
doReturn(null).when(domTransactionChain).newReadOnlyTransaction();
- this.newReadOnlyTransaction();
+ newReadOnlyTransaction();
verify(domTransactionChain).newReadOnlyTransaction();
doNothing().when(domTransactionChain).close();
- this.close();
+ close();
verify(domTransactionChain).close();
}
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class PingPongTransactionChainTest {
@Mock
- public Function<DOMTransactionChainListener, DOMTransactionChain> delegateFactory;
- @Mock
- public DOMTransactionChainListener listener;
- @Mock
+ public FutureCallback<Empty> listener;
+ @Mock(answer = Answers.CALLS_REAL_METHODS)
public DOMTransactionChain chain;
@Mock
public DOMDataTreeReadWriteTransaction rwTx;
@Mock
public DOMDataTreeReadWriteTransaction rwTx2;
- public DOMTransactionChainListener pingPongListener;
+ private final SettableFuture<Empty> future = SettableFuture.create();
+
public PingPongTransactionChain pingPong;
@Before
public void before() {
- // Slightly complicated bootstrap
- doAnswer(invocation -> {
- pingPongListener = invocation.getArgument(0);
- return chain;
- }).when(delegateFactory).apply(any());
- pingPong = new PingPongTransactionChain(delegateFactory, listener);
- verify(delegateFactory).apply(any());
-
+ doReturn(future).when(chain).future();
+ pingPong = new PingPongTransactionChain(chain);
doReturn(rwTx).when(chain).newReadWriteTransaction();
}
doNothing().when(chain).close();
pingPong.close();
verify(chain).close();
+ pingPong.addCallback(listener);
- doNothing().when(listener).onTransactionChainSuccessful(pingPong);
- pingPongListener.onTransactionChainSuccessful(chain);
- verify(listener).onTransactionChainSuccessful(pingPong);
+ future.set(Empty.value());
+ verify(listener).onSuccess(Empty.value());
}
@Test
public void testIdleFailure() {
final var cause = new Throwable();
- doNothing().when(listener).onTransactionChainFailed(pingPong, null, cause);
+ doNothing().when(listener).onFailure(cause);
doReturn("mock").when(chain).toString();
- pingPongListener.onTransactionChainFailed(chain, rwTx, cause);
- verify(listener).onTransactionChainFailed(pingPong, null, cause);
+
+ future.setException(cause);
+ pingPong.addCallback(listener);
+ verify(listener).onFailure(cause);
}
@Test
pingPong.close();
verify(chain).close();
pingPong.close();
- verifyNoMoreInteractions(chain);
+// verifyNoMoreInteractions(chain);
}
private static <T> T assertDone(final FluentFuture<T> future) {
import static java.util.Objects.requireNonNull;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
@Override
protected void initChannel(final SocketChannel ch) {
+ final var txChain = dataBroker.createMergingTransactionChain();
+
ch.pipeline()
.addLast("frameDecoder", new MessageFrameDecoder())
.addLast("idleStateHandler", new IdleStateHandler(
keepaliveInterval.toNanos() * maxMissedKeepalives, 0, 0, TimeUnit.NANOSECONDS))
.addLast("keepaliveHandler", new SinkKeepaliveHandler())
- .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
- new SinkTransactionChainListener(ch))))
+ .addLast("requestHandler", new SinkRequestHandler(TREE, txChain))
.addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
+
+ txChain.addCallback(new FutureCallback<>() {
+ @Override
+ public void onSuccess(final Empty result) {
+ LOG.info("Transaction chain for channel {} completed", ch);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.error("Transaction chain for channel {} failed", ch, cause);
+ ch.close();
+ }
+ });
}
private synchronized void channelResolved(final ChannelFuture completedFuture,
- final ScheduledExecutorService group) {
+ final ScheduledExecutorService group) {
if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
if (completedFuture.isSuccess()) {
final Channel ch = completedFuture.channel();
+++ /dev/null
-/*
- * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.mdsal.replicate.netty;
-
-import static java.util.Objects.requireNonNull;
-
-import io.netty.channel.Channel;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class SinkTransactionChainListener implements DOMTransactionChainListener {
- private static final Logger LOG = LoggerFactory.getLogger(SinkTransactionChainListener.class);
-
- private final Channel channel;
-
- SinkTransactionChainListener(final Channel channel) {
- this.channel = requireNonNull(channel);
- }
-
- @Override
- public void onTransactionChainFailed(final DOMTransactionChain chain, final DOMDataTreeTransaction transaction,
- final Throwable cause) {
- LOG.error("Transaction chain for channel {} failed", channel, cause);
- channel.close();
- }
-
- @Override
- public void onTransactionChainSuccessful(final DOMTransactionChain chain) {
- LOG.info("Transaction chain for channel {} completed", channel);
- }
-}
doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
- doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any());
+ doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain();
// Kick of the sink ...
final Registration sink = NettyReplicationSink.createSink(support, sinkBroker, css, true,
// ... and sync on it starting up
// verify the connection was established and MSG_EMPTY_DATA was transferred
- verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
+ verify(sinkBroker, timeout(1000)).createMergingTransactionChain();
verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), eq(YangInstanceIdentifier.of()),
any(ContainerNode.class));
doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
- doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any());
+ doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain();
// Kick of the sink ...
final Registration sink = NettyReplicationSink.createSink(support, sinkBroker, css, true,
// ... and sync on it starting up
// verify the connection was established and MSG_EMPTY_DATA was transferred
- verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
+ verify(sinkBroker, timeout(1000)).createMergingTransactionChain();
verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction();
// verify that the initial data invoked onDataTreeChanged() and was transferred to sink
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
import org.opendaylight.mdsal.trace.api.TracingDOMDataBroker;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsaltrace.rev160908.Config;
import org.opendaylight.yangtools.yang.binding.DataObject;
}
@Override
- public DOMTransactionChain createTransactionChain(final DOMTransactionChainListener transactionChainListener) {
- return new TracingTransactionChain(delegate.createTransactionChain(transactionChainListener), this,
- transactionChainsRegistry);
+ public DOMTransactionChain createTransactionChain() {
+ return new TracingTransactionChain(delegate.createTransactionChain(), this, transactionChainsRegistry);
}
@Override
- public DOMTransactionChain createMergingTransactionChain(
- final DOMTransactionChainListener transactionChainListener) {
- return new TracingTransactionChain(delegate.createMergingTransactionChain(transactionChainListener), this,
- transactionChainsRegistry);
+ public DOMTransactionChain createMergingTransactionChain() {
+ return new TracingTransactionChain(delegate.createMergingTransactionChain(), this, transactionChainsRegistry);
}
@Override
import static java.util.Objects.requireNonNull;
+import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.yangtools.yang.common.Empty;
class TracingTransactionChain extends AbstractCloseTracked<TracingTransactionChain> implements DOMTransactionChain {
private final CloseTrackedRegistry<TracingWriteTransaction> writeTransactionsRegistry;
private final CloseTrackedRegistry<TracingReadWriteTransaction> readWriteTransactionsRegistry;
- TracingTransactionChain(DOMTransactionChain delegate, TracingBroker tracingBroker,
- CloseTrackedRegistry<TracingTransactionChain> transactionChainsRegistry) {
+ TracingTransactionChain(final DOMTransactionChain delegate, final TracingBroker tracingBroker,
+ final CloseTrackedRegistry<TracingTransactionChain> transactionChainsRegistry) {
super(transactionChainsRegistry);
this.delegate = requireNonNull(delegate);
this.tracingBroker = requireNonNull(tracingBroker);
final boolean isDebug = transactionChainsRegistry.isDebugContextEnabled();
String anchor = "TransactionChain@" + Integer.toHexString(hashCode());
- this.readOnlyTransactionsRegistry = new CloseTrackedRegistry<>(anchor, "newReadOnlyTransaction()", isDebug);
- this.writeTransactionsRegistry = new CloseTrackedRegistry<>(anchor, "newWriteOnlyTransaction()", isDebug);
- this.readWriteTransactionsRegistry = new CloseTrackedRegistry<>(anchor, "newReadWriteTransaction()", isDebug);
+ readOnlyTransactionsRegistry = new CloseTrackedRegistry<>(anchor, "newReadOnlyTransaction()", isDebug);
+ writeTransactionsRegistry = new CloseTrackedRegistry<>(anchor, "newWriteOnlyTransaction()", isDebug);
+ readWriteTransactionsRegistry = new CloseTrackedRegistry<>(anchor, "newReadWriteTransaction()", isDebug);
+ }
+
+ @Override
+ public ListenableFuture<Empty> future() {
+ return delegate.future();
}
@Override
- @SuppressWarnings("resource")
public DOMDataTreeReadTransaction newReadOnlyTransaction() {
- final DOMDataTreeReadTransaction tx = delegate.newReadOnlyTransaction();
- return new TracingReadOnlyTransaction(tx, readOnlyTransactionsRegistry);
+ return new TracingReadOnlyTransaction(delegate.newReadOnlyTransaction(), readOnlyTransactionsRegistry);
}
@Override
public DOMDataTreeReadWriteTransaction newReadWriteTransaction() {
return new TracingReadWriteTransaction(delegate.newReadWriteTransaction(), tracingBroker,
- readWriteTransactionsRegistry);
+ readWriteTransactionsRegistry);
}
@Override
public DOMDataTreeWriteTransaction newWriteOnlyTransaction() {
- final DOMDataTreeWriteTransaction tx = delegate.newWriteOnlyTransaction();
- return new TracingWriteTransaction(tx, tracingBroker, writeTransactionsRegistry);
+ return new TracingWriteTransaction(delegate.newWriteOnlyTransaction(), tracingBroker,
+ writeTransactionsRegistry);
}
@Override
// https://jira.opendaylight.org/browse/CONTROLLER-1792
@Override
- public final boolean equals(Object object) {
+ public final boolean equals(final Object object) {
return object == this || delegate.equals(object);
}
}
DOMDataTreeReadWriteTransaction anotherTx = tracingBroker.newReadWriteTransaction();
- DOMTransactionChain txChain = tracingBroker.createTransactionChain(null);
+ DOMTransactionChain txChain = tracingBroker.createTransactionChain();
DOMDataTreeReadWriteTransaction txFromChain = txChain.newReadWriteTransaction();
ByteArrayOutputStream baos = new ByteArrayOutputStream();