import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.openjdk.jmh.annotations.BenchmarkMode;
LogicalDatastoreType.OPERATIONAL, (DOMStore)operStore,
LogicalDatastoreType.CONFIGURATION, configStore);
- domBroker = new DOMDataBrokerImpl(datastores, executor);
+ domBroker = new SerializedDOMDataBroker(datastores, executor);
schemaContext = BenchmarkModel.createTestContext();
configStore.onGlobalContextUpdated(schemaContext);
operStore.onGlobalContextUpdated(schemaContext);
import org.opendaylight.controller.md.sal.binding.impl.ForwardedBindingDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
}
public DOMDataBroker createDOMDataBroker() {
- return new DOMDataBrokerImpl(getDatastores(), getCommitCoordinatorExecutor());
+ return new SerializedDOMDataBroker(getDatastores(), getCommitCoordinatorExecutor());
}
public ListeningExecutorService getCommitCoordinatorExecutor() {
import org.opendaylight.controller.md.sal.binding.impl.ForwardedBindingDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.binding.test.util.MockSchemaService;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
}
public DOMDataBroker createDOMDataBroker() {
- return new DOMDataBrokerImpl(getDatastores(), getCommitCoordinatorExecutor());
+ return new SerializedDOMDataBroker(getDatastores(), getCommitCoordinatorExecutor());
}
public ListeningExecutorService getCommitCoordinatorExecutor() {
import org.opendaylight.controller.md.sal.binding.impl.ForwardedBindingDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.broker.impl.compat.BackwardsCompatibleDataBroker;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
.put(LogicalDatastoreType.CONFIGURATION, configStore)
.build();
- newDOMDataBroker = new DOMDataBrokerImpl(newDatastores, executor);
+ newDOMDataBroker = new SerializedDOMDataBroker(newDatastores, executor);
biCompatibleBroker = new BackwardsCompatibleDataBroker(newDOMDataBroker,mockSchemaService);
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
-import org.opendaylight.controller.md.sal.dom.broker.impl.DOMConcurrentDataCommitCoordinator;
-import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
-import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataCommitCoordinatorImpl;
-import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataCommitExecutor;
+import org.opendaylight.controller.md.sal.dom.broker.impl.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.broker.impl.AbstractDOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
final List<AbstractMXBean> mBeans = Lists.newArrayList();
- DOMDataCommitExecutor commitCoordinator;
- DurationStatisticsTracker commitStatsTracker = null;
+ final DurationStatisticsTracker commitStatsTracker;
+ final AbstractDOMDataBroker broker;
- if(getAllowConcurrentCommits()) {
- DOMConcurrentDataCommitCoordinator coordinator =
- new DOMConcurrentDataCommitCoordinator(listenableFutureExecutor);
- commitStatsTracker = coordinator.getCommitStatsTracker();
- commitCoordinator = coordinator;
+ if (getAllowConcurrentCommits()) {
+ final ConcurrentDOMDataBroker cdb = new ConcurrentDOMDataBroker(datastores, listenableFutureExecutor);
+ commitStatsTracker = cdb.getCommitStatsTracker();
+ broker = cdb;
} else {
/*
* We use a single-threaded executor for commits with a bounded queue capacity. If the
ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
getMaxDataBrokerCommitQueueSize(), "WriteTxCommit");
- DOMDataCommitCoordinatorImpl coordinator = new DOMDataCommitCoordinatorImpl(
+ SerializedDOMDataBroker sdb = new SerializedDOMDataBroker(datastores,
new DeadlockDetectingListeningExecutorService(commitExecutor,
TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER,
listenableFutureExecutor));
-
- commitStatsTracker = coordinator.getCommitStatsTracker();
- commitCoordinator = coordinator;
+ commitStatsTracker = sdb.getCommitStatsTracker();
+ broker = sdb;
final AbstractMXBean commitExecutorStatsMXBean =
ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats",
}
}
- DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, commitCoordinator);
-
if(commitStatsTracker != null) {
final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl(
commitStatsTracker, JMX_BEAN_TYPE);
mBeans.add(commitFutureStatsMXBean);
}
- newDataBroker.setCloseable(new AutoCloseable() {
+ broker.setCloseable(new AutoCloseable() {
@Override
public void close() {
for(AbstractMXBean mBean: mBeans) {
}
});
- return newDataBroker;
+ return broker;
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractDOMDataBroker extends AbstractDOMForwardedTransactionFactory<DOMStore> implements DOMDataBroker, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDOMDataBroker.class);
+
+ private final AtomicLong txNum = new AtomicLong();
+ private final AtomicLong chainNum = new AtomicLong();
+ private volatile AutoCloseable closeable;
+
+ protected AbstractDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores) {
+ super(datastores);
+ }
+
+ public void setCloseable(final AutoCloseable closeable) {
+ this.closeable = closeable;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+
+ if(closeable != null) {
+ try {
+ closeable.close();
+ } catch(Exception e) {
+ LOG.debug("Error closing instance", e);
+ }
+ }
+ }
+
+ @Override
+ protected Object newTransactionIdentifier() {
+ return "DOM-" + txNum.getAndIncrement();
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
+
+ DOMStore potentialStore = getTxFactories().get(store);
+ checkState(potentialStore != null, "Requested logical data store is not available.");
+ return potentialStore.registerChangeListener(path, listener, triggeringScope);
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+ checkNotClosed();
+
+ final Map<LogicalDatastoreType, DOMStoreTransactionChain> backingChains = new EnumMap<>(LogicalDatastoreType.class);
+ for (Entry<LogicalDatastoreType, DOMStore> entry : getTxFactories().entrySet()) {
+ backingChains.put(entry.getKey(), entry.getValue().createTransactionChain());
+ }
+
+ final long chainId = chainNum.getAndIncrement();
+ LOG.debug("Transactoin chain {} created with listener {}, backing store chains {}", chainId, listener,
+ backingChains);
+ return new DOMDataBrokerTransactionChainImpl(chainId, backingChains, this, listener);
+ }
+}
package org.opendaylight.controller.md.sal.dom.broker.impl;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
import java.util.EnumMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
* @param <T>
* Type of {@link DOMStoreTransactionFactory} factory.
*/
-abstract class AbstractDOMForwardedTransactionFactory<T extends DOMStoreTransactionFactory> implements DOMDataCommitImplementation, AutoCloseable {
+abstract class AbstractDOMForwardedTransactionFactory<T extends DOMStoreTransactionFactory> implements AutoCloseable {
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<AbstractDOMForwardedTransactionFactory> UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractDOMForwardedTransactionFactory.class, "closed");
*/
protected abstract Object newTransactionIdentifier();
+ /**
+ * User-supplied implementation of {@link DOMDataWriteTransaction#submit()}
+ * for transaction.
+ *
+ * Callback invoked when {@link DOMDataWriteTransaction#submit()} is invoked
+ * on transaction created by this factory.
+ *
+ * @param transaction
+ * Transaction on which {@link DOMDataWriteTransaction#commit()}
+ * was invoked.
+ * @param cohorts
+ * Iteratable of cohorts for subtransactions associated with
+ * the transaction being committed.
+ * @return a CheckedFuture. if commit coordination on cohorts finished successfully,
+ * nothing is returned from the Future, On failure,
+ * the Future fails with a {@link TransactionCommitFailedException}.
+ */
+ protected abstract CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
+ final Iterable<DOMStoreThreePhaseCommitCohort> cohorts);
+
/**
* Creates a new composite read-only transaction
*
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.util.DurationStatisticsTracker;
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
*
* @author Thomas Pantelis
*/
-public class DOMConcurrentDataCommitCoordinator implements DOMDataCommitExecutor {
-
+public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
+ private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
private static final String CAN_COMMIT = "CAN_COMMIT";
private static final String PRE_COMMIT = "PRE_COMMIT";
private static final String COMMIT = "COMMIT";
- private static final Logger LOG = LoggerFactory.getLogger(DOMConcurrentDataCommitCoordinator.class);
-
private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
/**
*/
private final ExecutorService internalFutureCallbackExecutor = new SimpleSameThreadExecutor();
- public DOMConcurrentDataCommitCoordinator(ExecutorService listenableFutureExecutor) {
+ public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, ExecutorService listenableFutureExecutor) {
+ super(datastores);
this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor);
}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import static com.google.common.base.Preconditions.checkState;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-import java.util.EnumMap;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DOMStore> implements DOMDataBroker,
- AutoCloseable {
-
- private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class);
-
- private final DOMDataCommitExecutor coordinator;
- private final AtomicLong txNum = new AtomicLong();
- private final AtomicLong chainNum = new AtomicLong();
- private volatile AutoCloseable closeable;
-
- public DOMDataBrokerImpl(final Map<LogicalDatastoreType, DOMStore> datastores,
- final ListeningExecutorService executor) {
- this(datastores, new DOMDataCommitCoordinatorImpl(executor));
- }
-
- public DOMDataBrokerImpl(final Map<LogicalDatastoreType, DOMStore> datastores,
- final DOMDataCommitExecutor coordinator) {
- super(datastores);
- this.coordinator = Preconditions.checkNotNull(coordinator);
- }
-
- public void setCloseable(final AutoCloseable closeable) {
- this.closeable = closeable;
- }
-
- @Override
- public void close() {
- super.close();
-
- if(closeable != null) {
- try {
- closeable.close();
- } catch(Exception e) {
- LOG.debug("Error closing instance", e);
- }
- }
- }
-
- @Override
- protected Object newTransactionIdentifier() {
- return "DOM-" + txNum.getAndIncrement();
- }
-
- @Override
- public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
- final YangInstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
-
- DOMStore potentialStore = getTxFactories().get(store);
- checkState(potentialStore != null, "Requested logical data store is not available.");
- return potentialStore.registerChangeListener(path, listener, triggeringScope);
- }
-
- @Override
- public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
- checkNotClosed();
-
- final Map<LogicalDatastoreType, DOMStoreTransactionChain> backingChains = new EnumMap<>(LogicalDatastoreType.class);
- for (Entry<LogicalDatastoreType, DOMStore> entry : getTxFactories().entrySet()) {
- backingChains.put(entry.getKey(), entry.getValue().createTransactionChain());
- }
-
- final long chainId = chainNum.getAndIncrement();
- LOG.debug("Transactoin chain {} created with listener {}, backing store chains {}", chainId, listener,
- backingChains);
- return new DOMDataBrokerTransactionChainImpl(chainId, backingChains, coordinator, listener);
-
- }
-
- @Override
- public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
- final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
- LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
- return coordinator.submit(transaction, cohorts);
+/**
+ * @deprecated Compatibility wrapper around {@link SerializedDOMDataBroker}.
+ */
+@Deprecated
+public final class DOMDataBrokerImpl extends SerializedDOMDataBroker {
+ public DOMDataBrokerImpl(final Map<LogicalDatastoreType, DOMStore> datastores, final ListeningExecutorService executor) {
+ super(datastores, executor);
}
}
AtomicReferenceFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, State.class, "state");
private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class);
private final AtomicLong txNum = new AtomicLong();
- private final DOMDataCommitExecutor coordinator;
+ private final AbstractDOMDataBroker broker;
private final TransactionChainListener listener;
private final long chainId;
*/
public DOMDataBrokerTransactionChainImpl(final long chainId,
final Map<LogicalDatastoreType, DOMStoreTransactionChain> chains,
- final DOMDataCommitExecutor coordinator, final TransactionChainListener listener) {
+ final AbstractDOMDataBroker broker, final TransactionChainListener listener) {
super(chains);
this.chainId = chainId;
- this.coordinator = Preconditions.checkNotNull(coordinator);
+ this.broker = Preconditions.checkNotNull(broker);
this.listener = Preconditions.checkNotNull(listener);
}
checkNotFailed();
checkNotClosed();
- final CheckedFuture<Void, TransactionCommitFailedException> ret = coordinator.submit(transaction, cohorts);
+ final CheckedFuture<Void, TransactionCommitFailedException> ret = broker.submit(transaction, cohorts);
COUNTER_UPDATER.incrementAndGet(this);
Futures.addCallback(ret, new FutureCallback<Void>() {
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import com.google.common.util.concurrent.CheckedFuture;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-
-/**
- * Executor of Three Phase Commit coordination for
- * {@link DOMDataWriteTransaction} transactions.
- *
- * Implementations are responsible for executing implementation of three-phase
- * commit protocol on supplied {@link DOMStoreThreePhaseCommitCohort}s.
- *
- *
- */
-public interface DOMDataCommitExecutor {
-
- /**
- * Submits supplied transaction to be executed in context of provided
- * cohorts.
- *
- * Transaction is used only as a context, cohorts should be associated with
- * this transaction.
- *
- * @param tx
- * Transaction to be used as context for reporting
- * @param cohort
- * DOM Store cohorts representing provided transaction, its
- * subtransactions.
- * @return a CheckedFuture. if commit coordination on cohorts finished successfully,
- * nothing is returned from the Future, On failure,
- * the Future fails with a {@link TransactionCommitFailedException}.
- *
- */
- CheckedFuture<Void,TransactionCommitFailedException> submit(DOMDataWriteTransaction tx,
- Iterable<DOMStoreThreePhaseCommitCohort> cohort);
-
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import com.google.common.util.concurrent.CheckedFuture;
-
-/**
- *
- * Implementation prototype of commit method for
- * {@link DOMForwardedWriteTransaction}.
- *
- */
-public interface DOMDataCommitImplementation {
-
- /**
- * User-supplied implementation of {@link DOMDataWriteTransaction#submit()}
- * for transaction.
- *
- * Callback invoked when {@link DOMDataWriteTransaction#submit()} is invoked
- * on transaction created by this factory.
- *
- * @param transaction
- * Transaction on which {@link DOMDataWriteTransaction#commit()}
- * was invoked.
- * @param cohorts
- * Iteration of cohorts for subtransactions associated with
- * commited transaction.
- *
- */
- CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
- final Iterable<DOMStoreThreePhaseCommitCohort> cohorts);
-}
-
final class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMStoreReadWriteTransaction> implements DOMDataReadWriteTransaction {
protected DOMForwardedReadWriteTransaction(final Object identifier,
final Map<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
- final DOMDataCommitImplementation commitImpl) {
+ final AbstractDOMForwardedTransactionFactory<?> commitImpl) {
super(identifier, backingTxs, commitImpl);
}
class DOMForwardedWriteTransaction<T extends DOMStoreWriteTransaction> extends
AbstractDOMForwardedCompositeTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
@SuppressWarnings("rawtypes")
- private static final AtomicReferenceFieldUpdater<DOMForwardedWriteTransaction, DOMDataCommitImplementation> IMPL_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(DOMForwardedWriteTransaction.class, DOMDataCommitImplementation.class, "commitImpl");
+ private static final AtomicReferenceFieldUpdater<DOMForwardedWriteTransaction, AbstractDOMForwardedTransactionFactory> IMPL_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DOMForwardedWriteTransaction.class, AbstractDOMForwardedTransactionFactory.class, "commitImpl");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DOMForwardedWriteTransaction, Future> FUTURE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DOMForwardedWriteTransaction.class, Future.class, "commitFuture");
* the transaction is running -- which we flip atomically using
* {@link #IMPL_UPDATER}.
*/
- private volatile DOMDataCommitImplementation commitImpl;
+ private volatile AbstractDOMForwardedTransactionFactory<?> commitImpl;
/**
* Future task of transaction commit. It starts off as null, but is
private volatile Future<?> commitFuture;
protected DOMForwardedWriteTransaction(final Object identifier,
- final Map<LogicalDatastoreType, T> backingTxs, final DOMDataCommitImplementation commitImpl) {
+ final Map<LogicalDatastoreType, T> backingTxs, final AbstractDOMForwardedTransactionFactory<?> commitImpl) {
super(identifier, backingTxs);
this.commitImpl = Preconditions.checkNotNull(commitImpl, "commitImpl must not be null.");
}
@Override
public boolean cancel() {
- final DOMDataCommitImplementation impl = IMPL_UPDATER.getAndSet(this, null);
+ final AbstractDOMForwardedTransactionFactory<?> impl = IMPL_UPDATER.getAndSet(this, null);
if (impl != null) {
LOG.trace("Transaction {} cancelled before submit", getIdentifier());
FUTURE_UPDATER.lazySet(this, CANCELLED_FUTURE);
return future.cancel(false);
}
+ @Deprecated
@Override
public ListenableFuture<RpcResult<TransactionStatus>> commit() {
return AbstractDataTransaction.convertToLegacyCommitFuture(submit());
@Override
public CheckedFuture<Void, TransactionCommitFailedException> submit() {
- final DOMDataCommitImplementation impl = IMPL_UPDATER.getAndSet(this, null);
+ final AbstractDOMForwardedTransactionFactory<?> impl = IMPL_UPDATER.getAndSet(this, null);
checkRunning(impl);
final Collection<T> txns = getSubtransactions();
return ret;
}
- private void checkRunning(final DOMDataCommitImplementation impl) {
+ private void checkRunning(final AbstractDOMForwardedTransactionFactory<?> impl) {
Preconditions.checkState(impl != null, "Transaction %s is no longer running", getIdentifier());
}
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.util.DurationStatisticsTracker;
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.slf4j.LoggerFactory;
/**
- *
* Implementation of blocking three phase commit coordinator, which which
* supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
*
- * This implementation does not support cancelation of commit,
+ * This implementation does not support cancellation of commit,
*
* In order to advance to next phase of three phase commit all subtasks of
* previous step must be finish.
*
* This executor does not have an upper bound on subtask timeout.
- *
- *
*/
-public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
-
- private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
+public class SerializedDOMDataBroker extends AbstractDOMDataBroker {
+ private static final Logger LOG = LoggerFactory.getLogger(SerializedDOMDataBroker.class);
private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
private final ListeningExecutorService executor;
*
* @param executor
*/
- public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
+ public SerializedDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, final ListeningExecutorService executor) {
+ super(datastores);
this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
}
}
@Override
- public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
+ protected CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
}
private SchemaContext schemaContext;
- private DOMDataBrokerImpl domBroker;
+ private AbstractDOMDataBroker domBroker;
private static <V> V measure(final String name, final Callable<V> callable) throws Exception {
// TODO Auto-generated method stub
.put(OPERATIONAL, operStore) //
.build();
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
- domBroker = new DOMDataBrokerImpl(stores, executor);
+ domBroker = new SerializedDOMDataBroker(stores, executor);
}
@Test
public class DOMBrokerTest {
private SchemaContext schemaContext;
- private DOMDataBrokerImpl domBroker;
+ private AbstractDOMDataBroker domBroker;
private ListeningExecutorService executor;
private ExecutorService futureExecutor;
private CommitExecutorService commitExecutor;
futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
- domBroker = new DOMDataBrokerImpl(stores, executor);
+ domBroker = new SerializedDOMDataBroker(stores, executor);
}
@After
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
/**
* Unit tests for DOMConcurrentDataCommitCoordinator.
private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class);
private final ThreadPoolExecutor futureExecutor =
new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
- private final DOMConcurrentDataCommitCoordinator coordinator =
- new DOMConcurrentDataCommitCoordinator(futureExecutor);
+ private ConcurrentDOMDataBroker coordinator;
@Before
public void setup() {
doReturn("tx").when(transaction).getIdentifier();
+
+ DOMStore store = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.sameThreadExecutor());
+
+ coordinator = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, store), futureExecutor);
}
@After
final CountDownLatch asyncCanCommitContinue = new CountDownLatch(1);
Answer<ListenableFuture<Boolean>> asyncCanCommit = new Answer<ListenableFuture<Boolean>>() {
@Override
- public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
+ public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
final SettableFuture<Boolean> future = SettableFuture.create();
if(doAsync) {
new Thread() {
final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
- public void onSuccess(Void result) {
+ public void onSuccess(final Void result) {
doneLatch.countDown();
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(final Throwable t) {
caughtEx.set(t);
doneLatch.countDown();
}
assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3);
}
- private void assertFailure(CheckedFuture<Void, TransactionCommitFailedException> future,
- Exception expCause, DOMStoreThreePhaseCommitCohort... mockCohorts)
+ private void assertFailure(final CheckedFuture<Void, TransactionCommitFailedException> future,
+ final Exception expCause, final DOMStoreThreePhaseCommitCohort... mockCohorts)
throws Exception {
try {
future.checkedGet(5, TimeUnit.SECONDS);
public class DOMTransactionChainTest {
private SchemaContext schemaContext;
- private DOMDataBrokerImpl domBroker;
+ private AbstractDOMDataBroker domBroker;
@Before
public void setupStore() {
.build();
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
- domBroker = new DOMDataBrokerImpl(stores, executor);
+ domBroker = new SerializedDOMDataBroker(stores, executor);
}
@Test
import org.opendaylight.controller.md.sal.binding.impl.ForwardedBindingDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
}
public DOMDataBroker createDOMDataBroker() {
- return new DOMDataBrokerImpl(getDatastores(), getCommitCoordinatorExecutor());
+ return new SerializedDOMDataBroker(getDatastores(), getCommitCoordinatorExecutor());
}
public ListeningExecutorService getCommitCoordinatorExecutor() {