X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMDataBrokerImpl.java;h=7e37a1e3a3467837b16963a8026236738535f599;hp=608ac9bc68d120c39d96b0389bbed324ba01652e;hb=2cf314976a3f81c7115f94c44fb37e967f8a4426;hpb=9d7a3392097e4c4a648327f77d7ca1a6aaf1b410 diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java index 608ac9bc68..7e37a1e3a3 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java @@ -7,340 +7,81 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import java.util.Collections; -import java.util.List; import java.util.Map.Entry; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; -import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -public class DOMDataBrokerImpl implements DOMDataBroker, AutoCloseable { +public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory implements DOMDataBroker, + AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class); - private static final Logger COORDINATOR_LOG = LoggerFactory.getLogger(CommitCoordination.class); - private final ImmutableMap datastores; - private final ListeningExecutorService executor; + + private final DOMDataCommitCoordinatorImpl coordinator; private final AtomicLong txNum = new AtomicLong(); + private final AtomicLong chainNum = new AtomicLong(); public DOMDataBrokerImpl(final ImmutableMap datastores, final ListeningExecutorService executor) { - super(); - this.datastores = datastores; - this.executor = executor; + super(datastores); + this.coordinator = new DOMDataCommitCoordinatorImpl(executor); } - private static final Function, Boolean> AND_FUNCTION = new Function, Boolean>() { - - @Override - public Boolean apply(final Iterable input) { - - for (Boolean value : input) { - if (value == false) { - return Boolean.FALSE; - } - } - return Boolean.TRUE; - } - }; - @Override - public DOMDataReadTransaction newReadOnlyTransaction() { - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Entry store : datastores.entrySet()) { - builder.put(store.getKey(), store.getValue().newReadOnlyTransaction()); - } - return new ReadOnlyTransactionImpl(newTransactionIdentifier(), builder.build()); - } - - private Object newTransactionIdentifier() { + protected Object newTransactionIdentifier() { return "DOM-" + txNum.getAndIncrement(); } - @Override - public DOMDataReadWriteTransaction newReadWriteTransaction() { - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Entry store : datastores.entrySet()) { - builder.put(store.getKey(), store.getValue().newReadWriteTransaction()); - } - return new ReadWriteTransactionImpl(newTransactionIdentifier(), builder.build(), this); - } - - @Override - public DOMDataWriteTransaction newWriteOnlyTransaction() { - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Entry store : datastores.entrySet()) { - builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction()); - } - return new WriteTransactionImpl(newTransactionIdentifier(), builder.build(), this); - } - @Override public ListenerRegistration registerDataChangeListener(final LogicalDatastoreType store, final InstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) { - DOMStore potentialStore = datastores.get(store); + DOMStore potentialStore = getTxFactories().get(store); checkState(potentialStore != null, "Requested logical data store is not available."); return potentialStore.registerChangeListener(path, listener, triggeringScope); } - private ListenableFuture> submit( - final WriteTransactionImpl transaction) { - LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); - return executor.submit(new CommitCoordination(transaction)); - } - - private abstract static class AbstractCompositeTransaction implements - AsyncTransaction> { - - private final ImmutableMap backingTxs; - private final Object identifier; - - protected AbstractCompositeTransaction(final Object identifier, final ImmutableMap backingTxs) { - this.identifier = checkNotNull(identifier, "Identifier should not be null"); - this.backingTxs = checkNotNull(backingTxs, "Backing transactions should not be null"); - } - - protected T getSubtransaction(final K key) { - return backingTxs.get(key); - } - - public Iterable getSubtransactions() { - return backingTxs.values(); - } - - @Override - public Object getIdentifier() { - return identifier; - } - - @Override - public void close() { - try { - for (T subtransaction : backingTxs.values()) { - subtransaction.close(); - } - } catch (Exception e) { - throw new IllegalStateException("Uncaught exception occured during closing transaction.", e); - } - } - - } - - private static class ReadOnlyTransactionImpl extends - AbstractCompositeTransaction implements - DOMDataReadTransaction { - - protected ReadOnlyTransactionImpl(final Object identifier, - final ImmutableMap backingTxs) { - super(identifier, backingTxs); - } - - @Override - public ListenableFuture>> read(final LogicalDatastoreType store, - final InstanceIdentifier path) { - return getSubtransaction(store).read(path); - } - - } - - private static class WriteTransactionImpl extends - AbstractCompositeTransaction implements DOMDataWriteTransaction { - - private final DOMDataBrokerImpl broker; - private ImmutableList cohorts; - - protected WriteTransactionImpl(final Object identifier, final ImmutableMap backingTxs, - final DOMDataBrokerImpl broker) { - super(identifier, backingTxs); - this.broker = broker; - } - - public synchronized Iterable ready() { - checkState(cohorts == null, "Transaction was already marked as ready."); - ImmutableList.Builder cohortsBuilder = ImmutableList.builder(); - for (DOMStoreWriteTransaction subTx : getSubtransactions()) { - cohortsBuilder.add(subTx.ready()); - } - cohorts = cohortsBuilder.build(); - return cohorts; - } - - protected ImmutableList getCohorts() { - return cohorts; - } - - @Override - public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode data) { - getSubtransaction(store).write(path, data); - } - - @Override - public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) { - getSubtransaction(store).delete(path); - } - - @Override - public void merge(final LogicalDatastoreType store, final InstanceIdentifier path, - final NormalizedNode data) { - getSubtransaction(store).merge(path,data); - } - - @Override - public void cancel() { - // TODO Auto-generated method stub - - } - - @Override - public ListenableFuture> commit() { - - ready(); - return broker.submit(this); - } - - } - - private static class ReadWriteTransactionImpl extends WriteTransactionImpl implements - DOMDataReadWriteTransaction { - - protected ReadWriteTransactionImpl(final Object identifier, - final ImmutableMap backingTxs, - final DOMDataBrokerImpl broker) { - // super(identifier, backingTxs); - super(identifier, backingTxs, broker); - } - - @Override - public ListenableFuture>> read(final LogicalDatastoreType store, - final InstanceIdentifier path) { - return getSubtransaction(store).read(path); - } - } - - private final class CommitCoordination implements Callable> { - - private final WriteTransactionImpl transaction; - - public CommitCoordination(final WriteTransactionImpl transaction) { - this.transaction = transaction; - } - - @Override - public RpcResult call() throws Exception { - - try { - Boolean canCommit = canCommit().get(); - - if (canCommit) { - try { - preCommit().get(); - try { - commit().get(); - COORDINATOR_LOG.debug("Tx: {} Is commited.", transaction.getIdentifier()); - return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, - Collections. emptySet()); - - } catch (InterruptedException | ExecutionException e) { - COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e); - } - - } catch (InterruptedException | ExecutionException e) { - COORDINATOR_LOG.warn("Tx: {} Error during preCommit, starting Abort", - transaction.getIdentifier(), e); - } - } else { - COORDINATOR_LOG.info("Tx: {} Did not pass canCommit phase.", transaction.getIdentifier()); - abort().get(); - } - } catch (InterruptedException | ExecutionException e) { - COORDINATOR_LOG.warn("Tx: {} Error during canCommit, starting Abort", transaction.getIdentifier(), e); - - } - try { - abort().get(); - } catch (InterruptedException | ExecutionException e) { - COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e); - } - return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections. emptySet()); - } - - public ListenableFuture preCommit() { - COORDINATOR_LOG.debug("Transaction {}: PreCommit Started ", transaction.getIdentifier()); - Builder> ops = ImmutableList.builder(); - for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) { - ops.add(cohort.preCommit()); - } - return (ListenableFuture) Futures.allAsList(ops.build()); - } - - public ListenableFuture commit() { - COORDINATOR_LOG.debug("Transaction {}: Commit Started ", transaction.getIdentifier()); - Builder> ops = ImmutableList.builder(); - for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) { - ops.add(cohort.commit()); - } - return (ListenableFuture) Futures.allAsList(ops.build()); - } - - public ListenableFuture canCommit() { - COORDINATOR_LOG.debug("Transaction {}: CanCommit Started ", transaction.getIdentifier()); - Builder> canCommitOperations = ImmutableList.builder(); - for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) { - canCommitOperations.add(cohort.canCommit()); - } - ListenableFuture> allCanCommits = Futures.allAsList(canCommitOperations.build()); - return Futures.transform(allCanCommits, AND_FUNCTION); - } - - public ListenableFuture abort() { - COORDINATOR_LOG.debug("Transaction {}: Abort Started ", transaction.getIdentifier()); - Builder> ops = ImmutableList.builder(); - for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) { - ops.add(cohort.abort()); - } - return (ListenableFuture) Futures.allAsList(ops.build()); - }; + @Override + public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) { + ImmutableMap.Builder backingChainsBuilder = ImmutableMap + .builder(); + for (Entry entry : getTxFactories().entrySet()) { + backingChainsBuilder.put(entry.getKey(), entry.getValue().createTransactionChain()); + } + long chainId = chainNum.getAndIncrement(); + ImmutableMap backingChains = backingChainsBuilder.build(); + LOG.debug("Transactoin chain {} created with listener {}, backing store chains {}", chainId, listener, + backingChains); + return new DOMDataBrokerTransactionChainImpl(chainId, backingChains, coordinator, listener); } @Override - public void close() throws Exception { - + public ListenableFuture> commit(final DOMDataWriteTransaction transaction, + final Iterable cohorts) { + LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts); + return coordinator.submit(transaction, cohorts, Optional. absent()); } }