--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractDOMBroker extends AbstractDOMTransactionFactory<DOMStore>
+ implements DOMDataBroker, AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDOMBroker.class);
+
+ private final AtomicLong txNum = new AtomicLong();
+ private final AtomicLong chainNum = new AtomicLong();
+ private final Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> extensions;
+ private volatile AutoCloseable closeable;
+
+ protected AbstractDOMBroker(final Map<LogicalDatastoreType, DOMStore> datastores) {
+ super(datastores);
+
+ boolean treeChange = true;
+ for (DOMStore ds : datastores.values()) {
+ if (!(ds instanceof DOMStoreTreeChangePublisher)) {
+ treeChange = false;
+ break;
+ }
+ }
+
+ if (treeChange) {
+ extensions = ImmutableMap.<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension>of(DOMDataTreeChangeService.class, new DOMDataTreeChangeService() {
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(final DOMDataTreeIdentifier treeId, final L listener) {
+ DOMStore publisher = getTxFactories().get(treeId.getDatastoreType());
+ checkState(publisher != null, "Requested logical data store is not available.");
+
+ return ((DOMStoreTreeChangePublisher) publisher).registerTreeChangeListener(treeId.getRootIdentifier(), listener);
+ }
+ });
+ } else {
+ extensions = Collections.emptyMap();
+ }
+ }
+
+ public void setCloseable(final AutoCloseable closeable) {
+ this.closeable = closeable;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.debug("Error closing instance", e);
+ }
+ }
+ }
+
+ @Override
+ protected Object newTransactionIdentifier() {
+ return "DOM-" + txNum.getAndIncrement();
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
+
+ DOMStore potentialStore = getTxFactories().get(store);
+ checkState(potentialStore != null, "Requested logical data store is not available.");
+ return potentialStore.registerChangeListener(path, listener, triggeringScope);
+ }
+
+ @Override
+ public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+ return extensions;
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+ checkNotClosed();
+
+ final Map<LogicalDatastoreType, DOMStoreTransactionChain> backingChains = new EnumMap<>(LogicalDatastoreType.class);
+ for (Map.Entry<LogicalDatastoreType, DOMStore> entry : getTxFactories().entrySet()) {
+ backingChains.put(entry.getKey(), entry.getValue().createTransactionChain());
+ }
+
+ final long chainId = chainNum.getAndIncrement();
+ LOG.debug("Transaction chain {} created with listener {}, backing store chains {}", chainId, listener,
+ backingChains);
+ return new DOMBrokerTransactionChain(chainId, backingChains, this, listener);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public abstract class AbstractDOMBrokerTransaction<K, T extends DOMStoreTransaction> implements
+ AsyncTransaction<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+
+ private Map<K, T> backingTxs;
+ private final Object identifier;
+ private final Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories;
+
+ /**
+ *
+ * Creates new composite Transactions.
+ *
+ * @param identifier
+ * Identifier of transaction.
+ */
+ protected AbstractDOMBrokerTransaction(final Object identifier, Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories) {
+ this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
+ this.storeTxFactories = Preconditions.checkNotNull(storeTxFactories, "Store Transaction Factories should not be null");
+ this.backingTxs = new EnumMap(LogicalDatastoreType.class);
+ }
+
+ /**
+ * Returns subtransaction associated with supplied key.
+ *
+ * @param key
+ * @return
+ * @throws NullPointerException
+ * if key is null
+ * @throws IllegalArgumentException
+ * if no subtransaction is associated with key.
+ */
+ protected final T getSubtransaction(final K key) {
+ Preconditions.checkNotNull(key, "key must not be null.");
+
+ T ret = backingTxs.get(key);
+ if(ret == null){
+ ret = createTransaction(key);
+ backingTxs.put(key, ret);
+ }
+ Preconditions.checkArgument(ret != null, "No subtransaction associated with %s", key);
+ return ret;
+ }
+
+ protected abstract T createTransaction(final K key);
+
+ /**
+ * Returns immutable Iterable of all subtransactions.
+ *
+ */
+ protected Collection<T> getSubtransactions() {
+ return backingTxs.values();
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return identifier;
+ }
+
+ protected void closeSubtransactions() {
+ /*
+ * We share one exception for all failures, which are added
+ * as supressedExceptions to it.
+ */
+ IllegalStateException failure = null;
+ for (T subtransaction : backingTxs.values()) {
+ try {
+ subtransaction.close();
+ } catch (Exception e) {
+ // If we did not allocated failure we allocate it
+ if (failure == null) {
+ failure = new IllegalStateException("Uncaught exception occured during closing transaction", e);
+ } else {
+ // We update it with additional exceptions, which occurred during error.
+ failure.addSuppressed(e);
+ }
+ }
+ }
+ // If we have failure, we throw it at after all attempts to close.
+ if (failure != null) {
+ throw failure;
+ }
+ }
+
+ protected DOMStoreTransactionFactory getTxFactory(K type){
+ return storeTxFactories.get(type);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+
+public abstract class AbstractDOMTransactionFactory<T extends DOMStoreTransactionFactory> implements AutoCloseable {
+ private static final AtomicIntegerFieldUpdater<AbstractDOMTransactionFactory> UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(AbstractDOMTransactionFactory.class, "closed");
+ private final Map<LogicalDatastoreType, T> storeTxFactories;
+ private volatile int closed = 0;
+
+ protected AbstractDOMTransactionFactory(final Map<LogicalDatastoreType, T> txFactories) {
+ this.storeTxFactories = new EnumMap<>(txFactories);
+ }
+
+ /**
+ * Implementations must return unique identifier for each and every call of
+ * this method;
+ *
+ * @return new Unique transaction identifier.
+ */
+ protected abstract Object newTransactionIdentifier();
+
+ /**
+ *
+ * @param transaction
+ * @param cohorts
+ * @return
+ */
+ protected abstract CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
+ final Collection<DOMStoreThreePhaseCommitCohort> cohorts);
+
+ /**
+ *
+ * @return
+ */
+ public final DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ checkNotClosed();
+
+ return new DOMBrokerReadOnlyTransaction(newTransactionIdentifier(), storeTxFactories);
+ }
+
+
+ /**
+ *
+ * @return
+ */
+ public final DOMDataWriteTransaction newWriteOnlyTransaction() {
+ checkNotClosed();
+
+ return new DOMBrokerWriteOnlyTransaction(newTransactionIdentifier(), storeTxFactories, this);
+ }
+
+
+ /**
+ *
+ * @return
+ */
+ public final DOMDataReadWriteTransaction newReadWriteTransaction() {
+ checkNotClosed();
+
+ return new DOMBrokerReadWriteTransaction<>(newTransactionIdentifier(), storeTxFactories, this);
+ }
+
+ /**
+ * Convenience accessor of backing factories intended to be used only by
+ * finalization of this class.
+ *
+ * <b>Note:</b>
+ * Finalization of this class may want to access other functionality of
+ * supplied Transaction factories.
+ *
+ * @return Map of backing transaction factories.
+ */
+ protected final Map<LogicalDatastoreType, T> getTxFactories() {
+ return storeTxFactories;
+ }
+
+ /**
+ * Checks if instance is not closed.
+ *
+ * @throws IllegalStateException If instance of this class was closed.
+ *
+ */
+ protected final void checkNotClosed() {
+ Preconditions.checkState(closed == 0, "Transaction factory was closed. No further operations allowed.");
+ }
+
+ @Override
+ public void close() {
+ final boolean success = UPDATER.compareAndSet(this, 0, 1);
+ Preconditions.checkState(success, "Transaction factory was already closed");
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class DOMBrokerReadOnlyTransaction<T extends DOMStoreReadTransaction>
+ extends AbstractDOMBrokerTransaction<LogicalDatastoreType, T>
+ implements DOMDataReadOnlyTransaction {
+ /**
+ * Creates new composite Transactions.
+ *
+ * @param identifier Identifier of transaction.
+ */
+ protected DOMBrokerReadOnlyTransaction(Object identifier, Map storeTxFactories) {
+ super(identifier, storeTxFactories);
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ return getSubtransaction(store).read(path);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(
+ final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ return getSubtransaction(store).exists(path);
+ }
+
+ @Override
+ public void close() {
+ closeSubtransactions();
+ }
+
+ @Override
+ protected T createTransaction(LogicalDatastoreType key) {
+ return (T) getTxFactory(key).newReadOnlyTransaction();
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class DOMBrokerReadWriteTransaction<T extends DOMStoreReadWriteTransaction>
+ extends DOMBrokerWriteOnlyTransaction<DOMStoreReadWriteTransaction> implements DOMDataReadWriteTransaction {
+ /**
+ * Creates new composite Transactions.
+ *
+ * @param identifier Identifier of transaction.
+ * @param storeTxFactories
+ */
+ protected DOMBrokerReadWriteTransaction(Object identifier, Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories, final AbstractDOMTransactionFactory<?> commitImpl) {
+ super(identifier, storeTxFactories, commitImpl);
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ return getSubtransaction(store).read(path);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(
+ final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ return getSubtransaction(store).exists(path);
+ }
+
+ @Override
+ protected DOMStoreReadWriteTransaction createTransaction(LogicalDatastoreType key) {
+ return getTxFactory(key).newReadWriteTransaction();
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DOMBrokerTransactionChain extends AbstractDOMTransactionFactory<DOMStoreTransactionChain>
+ implements DOMTransactionChain {
+ private static enum State {
+ RUNNING,
+ CLOSING,
+ CLOSED,
+ FAILED,
+ }
+
+ private static final AtomicIntegerFieldUpdater<DOMBrokerTransactionChain> COUNTER_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(DOMBrokerTransactionChain.class, "counter");
+ private static final AtomicReferenceFieldUpdater<DOMBrokerTransactionChain, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DOMBrokerTransactionChain.class, State.class, "state");
+ private static final Logger LOG = LoggerFactory.getLogger(DOMBrokerTransactionChain.class);
+ private final AtomicLong txNum = new AtomicLong();
+ private final AbstractDOMBroker broker;
+ private final TransactionChainListener listener;
+ private final long chainId;
+
+ private volatile State state = State.RUNNING;
+ private volatile int counter = 0;
+
+ /**
+ *
+ * @param chainId
+ * ID of transaction chain
+ * @param chains
+ * Backing {@link DOMStoreTransactionChain}s.
+ * @param listener
+ * Listener, which listens on transaction chain events.
+ * @throws NullPointerException
+ * If any of arguments is null.
+ */
+ public DOMBrokerTransactionChain(final long chainId,
+ final Map<LogicalDatastoreType, DOMStoreTransactionChain> chains,
+ AbstractDOMBroker broker, final TransactionChainListener listener) {
+ super(chains);
+ this.chainId = chainId;
+ this.broker = Preconditions.checkNotNull(broker);
+ this.listener = Preconditions.checkNotNull(listener);
+ }
+
+ private void checkNotFailed() {
+ Preconditions.checkState(state != State.FAILED, "Transaction chain has failed");
+ }
+
+ @Override
+ protected Object newTransactionIdentifier() {
+ return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit(
+ final DOMDataWriteTransaction transaction, final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ checkNotFailed();
+ checkNotClosed();
+
+ final CheckedFuture<Void, TransactionCommitFailedException> ret = broker.submit(transaction, cohorts);
+
+ COUNTER_UPDATER.incrementAndGet(this);
+ Futures.addCallback(ret, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ transactionCompleted();
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ transactionFailed(transaction, t);
+ }
+ });
+
+ return ret;
+ }
+
+ @Override
+ public void close() {
+ final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING);
+ if (!success) {
+ LOG.debug("Chain {} is no longer running", this);
+ return;
+ }
+
+ super.close();
+ for (DOMStoreTransactionChain subChain : getTxFactories().values()) {
+ subChain.close();
+ }
+
+ if (counter == 0) {
+ finishClose();
+ }
+ }
+
+ private void finishClose() {
+ state = State.CLOSED;
+ listener.onTransactionChainSuccessful(this);
+ }
+
+ private void transactionCompleted() {
+ if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) {
+ finishClose();
+ }
+ }
+
+ private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
+ state = State.FAILED;
+ LOG.debug("Transaction chain {}Â failed.", this, cause);
+ listener.onTransactionChainFailed(this, tx, cause);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DOMBrokerWriteOnlyTransaction<T extends DOMStoreWriteTransaction>
+ extends AbstractDOMBrokerTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
+
+ private static final AtomicReferenceFieldUpdater<DOMBrokerWriteOnlyTransaction, AbstractDOMTransactionFactory> IMPL_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DOMBrokerWriteOnlyTransaction.class, AbstractDOMTransactionFactory.class, "commitImpl");
+ @SuppressWarnings("rawtypes")
+ private static final AtomicReferenceFieldUpdater<DOMBrokerWriteOnlyTransaction, Future> FUTURE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DOMBrokerWriteOnlyTransaction.class, Future.class, "commitFuture");
+ private static final Logger LOG = LoggerFactory.getLogger(DOMBrokerWriteOnlyTransaction.class);
+ private static final Future<?> CANCELLED_FUTURE = Futures.immediateCancelledFuture();
+
+ /**
+ * Implementation of real commit. It also acts as an indication that
+ * the transaction is running -- which we flip atomically using
+ * {@link #IMPL_UPDATER}.
+ */
+ private volatile AbstractDOMTransactionFactory<?> commitImpl;
+
+ /**
+ * Future task of transaction commit. It starts off as null, but is
+ * set appropriately on {@link #submit()} and {@link #cancel()} via
+ * {@link AtomicReferenceFieldUpdater#lazySet(Object, Object)}.
+ *
+ * Lazy set is safe for use because it is only referenced to in the
+ * {@link #cancel()} slow path, where we will busy-wait for it. The
+ * fast path gets the benefit of a store-store barrier instead of the
+ * usual store-load barrier.
+ */
+ private volatile Future<?> commitFuture;
+
+ protected DOMBrokerWriteOnlyTransaction(final Object identifier,
+ Map storeTxFactories, final AbstractDOMTransactionFactory<?> commitImpl) {
+ super(identifier, storeTxFactories);
+ this.commitImpl = Preconditions.checkNotNull(commitImpl, "commitImpl must not be null.");
+ }
+
+ @Override
+ protected T createTransaction(LogicalDatastoreType key) {
+ // FIXME : Casting shouldn't be necessary here
+ return (T) getTxFactory(key).newWriteOnlyTransaction();
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkRunning(commitImpl);
+ getSubtransaction(store).write(path, data);
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ checkRunning(commitImpl);
+ getSubtransaction(store).delete(path);
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkRunning(commitImpl);
+ getSubtransaction(store).merge(path, data);
+ }
+
+ @Override
+ public boolean cancel() {
+ final AbstractDOMTransactionFactory<?> impl = IMPL_UPDATER.getAndSet(this, null);
+ if (impl != null) {
+ LOG.trace("Transaction {} cancelled before submit", getIdentifier());
+ FUTURE_UPDATER.lazySet(this, CANCELLED_FUTURE);
+ closeSubtransactions();
+ return true;
+ }
+
+ // The transaction is in process of being submitted or cancelled. Busy-wait
+ // for the corresponding future.
+ Future<?> future;
+ do {
+ future = commitFuture;
+ } while (future == null);
+
+ return future.cancel(false);
+ }
+
+ @Deprecated
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ return AbstractDataTransaction.convertToLegacyCommitFuture(submit());
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ final AbstractDOMTransactionFactory<?> impl = IMPL_UPDATER.getAndSet(this, null);
+ checkRunning(impl);
+
+ final Collection<T> txns = getSubtransactions();
+ final Collection<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(txns.size());
+
+ // FIXME: deal with errors thrown by backed (ready and submit can fail in theory)
+ for (DOMStoreWriteTransaction txn : txns) {
+ cohorts.add(txn.ready());
+ }
+
+ final CheckedFuture<Void, TransactionCommitFailedException> ret = impl.submit(this, cohorts);
+ FUTURE_UPDATER.lazySet(this, ret);
+ return ret;
+ }
+
+ private void checkRunning(final AbstractDOMTransactionFactory<?> impl) {
+ Preconditions.checkState(impl != null, "Transaction %s is no longer running", getIdentifier());
+ }
+
+}
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
import java.util.List;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* previous Tx's ready operations haven't completed yet.
*/
@Override
- protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
+ protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(final String shardName) {
// Check if there are any previous ready Futures, otherwise let the super class handle it.
if(previousReadyFutures.isEmpty()) {
return super.sendFindPrimaryShardAsync(shardName);
previousReadyFutures, getActorContext().getClientDispatcher());
// Add a callback for completion of the combined Futures.
- final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
+ final Promise<PrimaryShardInfo> returnPromise = akka.dispatch.Futures.promise();
OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> notUsed) {
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.broker.impl.AbstractDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.LoggerFactory;
/**
- * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3
+ * ConcurrentDOMDataBroker commits transactions concurrently. The 3
* commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
* (ie async) per transaction but multiple transaction commits can run concurrent.
*
* @author Thomas Pantelis
*/
-public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
+public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
private static final String CAN_COMMIT = "CAN_COMMIT";
private static final String PRE_COMMIT = "PRE_COMMIT";
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
} else {
RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
setListenerRegistrationActor(actorContext.actorSelection(
- reply.getListenerRegistrationPath().path()));
+ reply.getListenerRegistrationPath()));
}
}
}, actorContext.getClientDispatcher());
}
+
+ @VisibleForTesting
+ ActorSelection getListenerRegistrationActor() {
+ return listenerRegistrationActor;
+ }
+
+ @VisibleForTesting
+ ActorRef getDataChangeListenerActor() {
+ return dataChangeListenerActor;
+ }
}
if (chain != null) {
chain.close();
} else {
- LOG.warn("Closing non-existent transaction chain {}", transactionChainId);
+ LOG.debug("Closing non-existent transaction chain {}", transactionChainId);
}
}
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
return ShardStrategyFactory.getStrategy(path).findShard(path);
}
- protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+ protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
return actorContext.findPrimaryShardAsync(shardName);
}
private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
if(txFutureCallback == null) {
- Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
+ Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
txFutureCallback = newTxFutureCallback;
txFutureCallbackMap.put(shardName, txFutureCallback);
- findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
- public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
if(failure != null) {
newTxFutureCallback.createTransactionContext(failure, null);
} else {
- newTxFutureCallback.setPrimaryShard(primaryShard);
+ newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
}
}
}, actorContext.getClientDispatcher());
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Local message DTO that contains information about the primary shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class PrimaryShardInfo {
+ private final ActorSelection primaryShardActor;
+ private final Optional<DataTree> localShardDataTree;
+
+ public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, @Nonnull Optional<DataTree> localShardDataTree) {
+ this.primaryShardActor = Preconditions.checkNotNull(primaryShardActor);
+ this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
+ }
+
+ /**
+ * Returns an ActorSelection representing the primary shard actor.
+ */
+ public @Nonnull ActorSelection getPrimaryShardActor() {
+ return primaryShardActor;
+ }
+
+ /**
+ * Returns an Optional whose value contains the primary shard's DataTree if the primary shard is local
+ * to the caller. Otherwise the Optional value is absent.
+ */
+ public @Nonnull Optional<DataTree> getLocalShardDataTree() {
+ return localShardDataTree;
+ }
+}
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import akka.actor.ActorPath;
import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
+
import java.io.Serializable;
/**
this.listenerRegistrationPath = Preconditions.checkNotNull(listenerRegistrationPath);
}
- public ActorRef getListenerRegistrationPath() {
- return listenerRegistrationPath;
+ public ActorPath getListenerRegistrationPath() {
+ return listenerRegistrationPath.path();
}
}
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
- private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+ private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
- primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+ primaryShardInfoCache = CacheBuilder.newBuilder()
.expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
.build();
}
return schemaContext;
}
- public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
- Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+ public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
+ Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
if(ret != null){
return ret;
}
Future<Object> future = executeOperationAsync(shardManager,
new FindPrimary(shardName, true), shardInitializationTimeout);
- return future.transform(new Mapper<Object, ActorSelection>() {
+ return future.transform(new Mapper<Object, PrimaryShardInfo>() {
@Override
- public ActorSelection checkedApply(Object response) throws Exception {
+ public PrimaryShardInfo checkedApply(Object response) throws Exception {
if(response instanceof PrimaryFound) {
PrimaryFound found = (PrimaryFound)response;
LOG.debug("Primary found {}", found.getPrimaryPath());
ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
- primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
- return actorSelection;
+ PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.<DataTree>absent());
+ primaryShardInfoCache.put(shardName, Futures.successful(info));
+ return info;
} else if(response instanceof NotInitializedException) {
throw (NotInitializedException)response;
} else if(response instanceof PrimaryNotFoundException) {
public void broadcast(final Object message){
for(final String shardName : configuration.getAllShardNames()){
- Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
- primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
- public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
if(failure != null) {
LOG.warn("broadcast failed to send message {} to shard {}: {}",
message.getClass().getSimpleName(), shardName, failure);
} else {
- primaryShard.tell(message, ActorRef.noSender());
+ primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
}
}
}, getClientDispatcher());
}
@VisibleForTesting
- Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
- return primaryShardActorSelectionCache;
+ Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
+ return primaryShardInfoCache;
}
}
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
}
+ protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) {
+ return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
+ Optional.<DataTree>absent()));
+ }
+
protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
log.info("Created mock shard actor {}", actorRef);
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
- doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
+ doReturn(primaryShardInfoReply(actorSystem, actorRef)).
when(mockActorContext).findPrimaryShardAsync(eq(shardName));
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/**
* Unit tests for DOMConcurrentDataCommitCoordinator.
assertFailure(future, cause, mockCohort1, mockCohort2);
}
+
+ @Test
+ public void testCreateReadWriteTransaction(){
+ DOMStore domStore = mock(DOMStore.class);
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+ dataBroker.newReadWriteTransaction();
+
+ verify(domStore, never()).newReadWriteTransaction();
+ }
+
+
+ @Test
+ public void testCreateWriteOnlyTransaction(){
+ DOMStore domStore = mock(DOMStore.class);
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+ dataBroker.newWriteOnlyTransaction();
+
+ verify(domStore, never()).newWriteOnlyTransaction();
+ }
+
+ @Test
+ public void testCreateReadOnlyTransaction(){
+ DOMStore domStore = mock(DOMStore.class);
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+ dataBroker.newReadOnlyTransaction();
+
+ verify(domStore, never()).newReadOnlyTransaction();
+ }
+
+ @Test
+ public void testLazySubTransactionCreationForReadWriteTransactions(){
+ DOMStore configDomStore = mock(DOMStore.class);
+ DOMStore operationalDomStore = mock(DOMStore.class);
+ DOMStoreReadWriteTransaction storeTxn = mock(DOMStoreReadWriteTransaction.class);
+
+ doReturn(storeTxn).when(operationalDomStore).newReadWriteTransaction();
+ doReturn(storeTxn).when(configDomStore).newReadWriteTransaction();
+
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor);
+ DOMDataReadWriteTransaction dataTxn = dataBroker.newReadWriteTransaction();
+
+ dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+ dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+ dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+
+ verify(configDomStore, never()).newReadWriteTransaction();
+ verify(operationalDomStore, times(1)).newReadWriteTransaction();
+
+ dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+ verify(configDomStore, times(1)).newReadWriteTransaction();
+ verify(operationalDomStore, times(1)).newReadWriteTransaction();
+
+ }
+
+ @Test
+ public void testLazySubTransactionCreationForWriteOnlyTransactions(){
+ DOMStore configDomStore = mock(DOMStore.class);
+ DOMStore operationalDomStore = mock(DOMStore.class);
+ DOMStoreWriteTransaction storeTxn = mock(DOMStoreWriteTransaction.class);
+
+ doReturn(storeTxn).when(operationalDomStore).newWriteOnlyTransaction();
+ doReturn(storeTxn).when(configDomStore).newWriteOnlyTransaction();
+
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor);
+ DOMDataWriteTransaction dataTxn = dataBroker.newWriteOnlyTransaction();
+
+ dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+ dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+ verify(configDomStore, never()).newWriteOnlyTransaction();
+ verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
+
+ dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+ verify(configDomStore, times(1)).newWriteOnlyTransaction();
+ verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
+
+ }
+
+
+ @Test
+ public void testLazySubTransactionCreationForReadOnlyTransactions(){
+ DOMStore configDomStore = mock(DOMStore.class);
+ DOMStore operationalDomStore = mock(DOMStore.class);
+ DOMStoreReadTransaction storeTxn = mock(DOMStoreReadTransaction.class);
+
+ doReturn(storeTxn).when(operationalDomStore).newReadOnlyTransaction();
+ doReturn(storeTxn).when(configDomStore).newReadOnlyTransaction();
+
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor);
+ DOMDataReadOnlyTransaction dataTxn = dataBroker.newReadOnlyTransaction();
+
+ dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+ dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+
+ verify(configDomStore, never()).newReadOnlyTransaction();
+ verify(operationalDomStore, times(1)).newReadOnlyTransaction();
+
+ dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build());
+
+ verify(configDomStore, times(1)).newReadOnlyTransaction();
+ verify(operationalDomStore, times(1)).newReadOnlyTransaction();
+
+ }
+
+ @Test
+ public void testSubmitWithOnlyOneSubTransaction() throws InterruptedException {
+ DOMStore configDomStore = mock(DOMStore.class);
+ DOMStore operationalDomStore = mock(DOMStore.class);
+ DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class);
+ DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
+
+ doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction();
+ doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready();
+ doReturn(Futures.immediateFuture(false)).when(mockCohort).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(mockCohort).abort();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList();
+
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor) {
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction, Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ commitCohorts.addAll(cohorts);
+ latch.countDown();
+ return super.submit(transaction, cohorts);
+ }
+ };
+ DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
+
+ domDataReadWriteTransaction.delete(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+
+ domDataReadWriteTransaction.submit();
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ assertTrue(commitCohorts.size() == 1);
+ }
+
+ @Test
+ public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException {
+ DOMStore configDomStore = mock(DOMStore.class);
+ DOMStore operationalDomStore = mock(DOMStore.class);
+ DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
+ DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class);
+ DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class);
+ DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class);
+
+ doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction();
+ doReturn(configTransaction).when(configDomStore).newReadWriteTransaction();
+
+ doReturn(mockCohortOperational).when(operationalTransaction).ready();
+ doReturn(Futures.immediateFuture(false)).when(mockCohortOperational).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(mockCohortOperational).abort();
+
+ doReturn(mockCohortConfig).when(configTransaction).ready();
+ doReturn(Futures.immediateFuture(false)).when(mockCohortConfig).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(mockCohortConfig).abort();
+
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList();
+
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor) {
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction, Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ commitCohorts.addAll(cohorts);
+ latch.countDown();
+ return super.submit(transaction, cohorts);
+ }
+ };
+ DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
+
+ domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+ domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+ domDataReadWriteTransaction.submit();
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ assertTrue(commitCohorts.size() == 2);
+ }
+
+ @Test
+ public void testCreateTransactionChain(){
+ DOMStore domStore = mock(DOMStore.class);
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+
+ dataBroker.createTransactionChain(mock(TransactionChainListener.class));
+
+ verify(domStore, times(2)).createTransactionChain();
+
+ }
+
+ @Test
+ public void testCreateTransactionOnChain(){
+ DOMStore domStore = mock(DOMStore.class);
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+
+ DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
+ DOMStoreTransactionChain mockChain = mock(DOMStoreTransactionChain.class);
+
+ doReturn(mockChain).when(domStore).createTransactionChain();
+ doReturn(operationalTransaction).when(mockChain).newWriteOnlyTransaction();
+
+ DOMTransactionChain transactionChain = dataBroker.createTransactionChain(mock(TransactionChainListener.class));
+
+ DOMDataWriteTransaction domDataWriteTransaction = transactionChain.newWriteOnlyTransaction();
+
+ verify(mockChain, never()).newWriteOnlyTransaction();
+
+ domDataWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+ }
+
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.DeadLetter;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+public class DataTreeChangeListenerActorTest extends AbstractActorTest {
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testDataChangedWhenNotificationsAreEnabled(){
+ new JavaTestKit(getSystem()) {{
+ final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
+ final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
+ final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
+ final Props props = DataTreeChangeListenerActor.props(mockListener);
+ final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsEnabled");
+
+ // Let the DataChangeListener know that notifications should be enabled
+ subject.tell(new EnableNotification(true), getRef());
+
+ subject.tell(new DataTreeChanged(mockCandidates),
+ getRef());
+
+ expectMsgClass(DataTreeChangedReply.class);
+
+ Mockito.verify(mockListener).onDataTreeChanged(mockCandidates);
+ }};
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testDataChangedWhenNotificationsAreDisabled(){
+ new JavaTestKit(getSystem()) {{
+ final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
+ final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
+ final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
+ final Props props = DataTreeChangeListenerActor.props(mockListener);
+ final ActorRef subject =
+ getSystem().actorOf(props, "testDataTreeChangedNotificationsDisabled");
+
+ subject.tell(new DataTreeChanged(mockCandidates),
+ getRef());
+
+ new Within(duration("1 seconds")) {
+ @Override
+ protected void run() {
+ expectNoMsg();
+
+ Mockito.verify(mockListener, Mockito.never()).onDataTreeChanged(
+ Matchers.anyCollectionOf(DataTreeCandidate.class));
+ }
+ };
+ }};
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testDataChangedWithNoSender(){
+ new JavaTestKit(getSystem()) {{
+ final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
+ final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
+ final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
+ final Props props = DataTreeChangeListenerActor.props(mockListener);
+ final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender");
+
+ getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
+
+ subject.tell(new DataTreeChanged(mockCandidates), ActorRef.noSender());
+
+ // Make sure no DataChangedReply is sent to DeadLetters.
+ while(true) {
+ DeadLetter deadLetter;
+ try {
+ deadLetter = expectMsgClass(duration("1 seconds"), DeadLetter.class);
+ } catch (AssertionError e) {
+ // Timed out - got no DeadLetter - this is good
+ break;
+ }
+
+ // We may get DeadLetters for other messages we don't care about.
+ Assert.assertFalse("Unexpected DataTreeChangedReply",
+ deadLetter.message() instanceof DataTreeChangedReply);
+ }
+ }};
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testDataChangedWithListenerRuntimeEx(){
+ new JavaTestKit(getSystem()) {{
+ final DataTreeCandidate mockTreeCandidate1 = Mockito.mock(DataTreeCandidate.class);
+ final ImmutableList<DataTreeCandidate> mockCandidates1 = ImmutableList.of(mockTreeCandidate1);
+ final DataTreeCandidate mockTreeCandidate2 = Mockito.mock(DataTreeCandidate.class);
+ final ImmutableList<DataTreeCandidate> mockCandidates2 = ImmutableList.of(mockTreeCandidate2);
+ final DataTreeCandidate mockTreeCandidate3 = Mockito.mock(DataTreeCandidate.class);
+ final ImmutableList<DataTreeCandidate> mockCandidates3 = ImmutableList.of(mockTreeCandidate3);
+
+ final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
+ Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataTreeChanged(mockCandidates2);
+
+ Props props = DataTreeChangeListenerActor.props(mockListener);
+ ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithListenerRuntimeEx");
+
+ // Let the DataChangeListener know that notifications should be enabled
+ subject.tell(new EnableNotification(true), getRef());
+
+ subject.tell(new DataTreeChanged(mockCandidates1),getRef());
+ expectMsgClass(DataTreeChangedReply.class);
+
+ subject.tell(new DataTreeChanged(mockCandidates2),getRef());
+ expectMsgClass(DataTreeChangedReply.class);
+
+ subject.tell(new DataTreeChanged(mockCandidates3),getRef());
+ expectMsgClass(DataTreeChangedReply.class);
+
+ Mockito.verify(mockListener).onDataTreeChanged(mockCandidates1);
+ Mockito.verify(mockListener).onDataTreeChanged(mockCandidates2);
+ Mockito.verify(mockListener).onDataTreeChanged(mockCandidates3);
+ }};
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
+ @SuppressWarnings("unchecked")
+ private final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class);
+
+ @Test(timeout=10000)
+ public void testSuccessfulRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
+ new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init("shard-1", path);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardFound(getRef()));
+
+ RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
+ Assert.assertEquals("getPath", path, registerMsg.getPath());
+
+ reply(new RegisterDataTreeChangeListenerReply(getRef()));
+
+
+ for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
+ proxy.getListenerRegistrationActor());
+
+ watch(proxy.getDataChangeListenerActor());
+
+ proxy.close();
+
+ // The listener registration actor should get a Close message
+ expectMsgClass(timeout, CloseDataTreeChangeListenerRegistration.class);
+
+ // The DataChangeListener actor should be terminated
+ expectMsgClass(timeout, Terminated.class);
+
+ proxy.close();
+
+ expectNoMsg();
+ }};
+ }
+
+ @Test(timeout=10000)
+ public void testLocalShardNotFound() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
+ new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init("shard-1", path);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardNotFound("shard-1"));
+
+ expectNoMsg(duration("1 seconds"));
+ }};
+ }
+
+ @Test(timeout=10000)
+ public void testLocalShardNotInitialized() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
+ new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init("shard-1", path);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new NotInitializedException("not initialized"));
+
+ new Within(duration("1 seconds")) {
+ @Override
+ protected void run() {
+ expectNoMsg();
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testFailedRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorSystem mockActorSystem = mock(ActorSystem.class);
+
+ ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class),
+ "testFailedRegistration");
+ doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
+ ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
+ MoreExecutors.sameThreadExecutor());
+
+
+ ActorContext actorContext = mock(ActorContext.class);
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+
+ doReturn(executor).when(actorContext).getClientDispatcher();
+ doReturn(mockActorSystem).when(actorContext).getActorSystem();
+
+ String shardName = "shard-1";
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
+ new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+
+ doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+ doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
+ doReturn(Futures.failed(new RuntimeException("mock"))).
+ when(actorContext).executeOperationAsync(any(ActorRef.class),
+ any(Object.class), any(Timeout.class));
+ doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
+
+ proxy.init("shard-1", path);
+
+ Assert.assertEquals("getListenerRegistrationActor", null,
+ proxy.getListenerRegistrationActor());
+ }};
+ }
+
+ @Test
+ public void testCloseBeforeRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = mock(ActorContext.class);
+
+ String shardName = "shard-1";
+
+ doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+ doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
+ doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath();
+ doReturn(getSystem().actorSelection(getRef().path())).
+ when(actorContext).actorSelection(getRef().path());
+ doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+ doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
+
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
+ new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+
+
+ Answer<Future<Object>> answer = new Answer<Future<Object>>() {
+ @Override
+ public Future<Object> answer(InvocationOnMock invocation) {
+ proxy.close();
+ return Futures.successful((Object)new RegisterDataTreeChangeListenerReply(getRef()));
+ }
+ };
+
+ doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
+ any(Object.class), any(Timeout.class));
+
+ proxy.init(shardName, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
+
+ expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class);
+
+ Assert.assertEquals("getListenerRegistrationActor", null,
+ proxy.getListenerRegistrationActor());
+ }};
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+public class DataTreeChangeListenerRegistrationActorTest extends AbstractActorTest {
+ private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+
+ static {
+ store.onGlobalContextUpdated(TestModel.createTestContext());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testOnReceiveCloseListenerRegistration() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ListenerRegistration mockListenerReg = Mockito.mock(ListenerRegistration.class);
+ final Props props = DataTreeChangeListenerRegistrationActor.props(mockListenerReg);
+ final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
+
+ subject.tell(CloseDataTreeChangeListenerRegistration.getInstance(), getRef());
+
+ expectMsgClass(duration("1 second"), CloseDataTreeChangeListenerRegistrationReply.class);
+
+ Mockito.verify(mockListenerReg).close();
+ }};
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import java.util.Arrays;
+import java.util.Collection;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+public class ForwardingDataTreeChangeListenerTest extends AbstractActorTest {
+
+ @Test
+ public void testOnDataChanged() throws Exception {
+ final Props props = Props.create(MessageCollectorActor.class);
+ final ActorRef actorRef = getSystem().actorOf(props);
+
+ ForwardingDataTreeChangeListener forwardingListener = new ForwardingDataTreeChangeListener(
+ getSystem().actorSelection(actorRef.path()));
+
+ Collection<DataTreeCandidate> expected = Arrays.asList(Mockito.mock(DataTreeCandidate.class));
+ forwardingListener.onDataTreeChanged(expected);
+
+ DataTreeChanged actual = MessageCollectorActor.expectFirstMatching(actorRef, DataTreeChanged.class);
+ Assert.assertSame(expected, actual.getChanges());
+ }
+}
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
}};
}
+ @Test
+ public void testRegisterDataTreeChangeListener() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps(), "testRegisterDataTreeChangeListener");
+
+ waitUntilLeader(shard);
+
+ shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
+
+ MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
+ "testRegisterDataTreeChangeListener-DataTreeChangeListener");
+
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
+
+ RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ String replyPath = reply.getListenerRegistrationPath().toString();
+ assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
+ "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
+
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+ final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+ Creator<Shard> creator = new Creator<Shard>() {
+ boolean firstElectionTimeout = true;
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ if(message instanceof ElectionTimeout && firstElectionTimeout) {
+ firstElectionTimeout = false;
+ final ActorRef self = getSelf();
+ new Thread() {
+ @Override
+ public void run() {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }
+ }.start();
+
+ onFirstElectionTimeout.countDown();
+ } else {
+ super.onReceiveCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
+ "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
+
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)),
+ "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
+
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ assertEquals("Got first ElectionTimeout", true,
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+ shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
+ RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+ shard.tell(new FindLeader(), getRef());
+ FindLeaderReply findLeadeReply =
+ expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ onChangeListenerRegistered.countDown();
+
+ // TODO: investigate why we do not receive data chage events
+ listener.waitForChangeEvents();
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
@Test
public void testCreateTransaction(){
new ShardTestKit(getSystem()) {{
if (exToThrow instanceof PrimaryNotFoundException) {
doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
} else {
- doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
+ doReturn(primaryShardInfoReply(getSystem(), actorRef)).
when(mockActorContext).findPrimaryShardAsync(anyString());
}
doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
actorSelection(actorRef.path().toString());
- doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
+ doReturn(primaryShardInfoReply(getSystem(), actorRef)).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
when(mockActorContext).actorSelection(shardActorRef.path().toString());
if(shardFound) {
- doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ doReturn(primaryShardInfoReply(actorSystem, shardActorRef)).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
} else {
doReturn(Futures.failed(new PrimaryNotFoundException("test")))
doReturn(getSystem().actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
- doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))).
+ doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
when(mockActorContext).findPrimaryShardAsync(eq(shardName));
doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+ final String expPrimaryPath = "akka://test-system/find-primary-shard";
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
+ return Futures.successful((Object) new PrimaryFound(expPrimaryPath));
}
};
- Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
- ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
assertNotNull(actual);
+ assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
+ assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+ expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
- Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
- ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+ PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
- assertEquals(cachedSelection, actual);
+ assertEquals(cachedInfo, actual);
// Wait for 200 Milliseconds. The cached entry should have been removed.
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
- cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
};
- Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
try {
Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
}
- Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
}
};
- Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
try {
Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
}
- Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
+
+ private final List<Collection<DataTreeCandidate>> changeList =
+ Collections.synchronizedList(Lists.<Collection<DataTreeCandidate>>newArrayList());
+
+ private volatile CountDownLatch changeLatch;
+ private int expChangeEventCount;
+
+ public MockDataTreeChangeListener(int expChangeEventCount) {
+ reset(expChangeEventCount);
+ }
+
+ public void reset(int expChangeEventCount) {
+ changeLatch = new CountDownLatch(expChangeEventCount);
+ this.expChangeEventCount = expChangeEventCount;
+ changeList.clear();
+ }
+
+ @Override
+ public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+ changeList.add(changes);
+ changeLatch.countDown();
+ }
+
+ public void waitForChangeEvents() {
+ boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);
+ if(!done) {
+ fail(String.format("Missing change notifications. Expected: %d. Actual: %d",
+ expChangeEventCount, (expChangeEventCount - changeLatch.getCount())));
+ }
+ }
+
+ public void expectNoMoreChanges(String assertMsg) {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ assertEquals(assertMsg, expChangeEventCount, changeList.size());
+ }
+}
* Create rpc implementation capable of handling RPC for monitoring and notifications even before the schemas of remote device are downloaded
*/
static NetconfDeviceRpc getRpcForInitialization(final NetconfDeviceCommunicator listener) {
- return new NetconfDeviceRpc(INIT_SCHEMA_CTX, listener, new NetconfMessageTransformer(INIT_SCHEMA_CTX));
+ return new NetconfDeviceRpc(INIT_SCHEMA_CTX, listener, new NetconfMessageTransformer(INIT_SCHEMA_CTX, false));
}
@VisibleForTesting
void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
- messageTransformer = new NetconfMessageTransformer(result);
+ messageTransformer = new NetconfMessageTransformer(result, true);
updateTransformer(messageTransformer);
// salFacade.onDeviceConnected has to be called before the notification handler is initialized
}
private NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
- return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result));
+ return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
}
private Collection<SourceIdentifier> stripMissingSource(final Collection<SourceIdentifier> requiredSources, final SourceIdentifier sIdToRemove) {
private final Multimap<QName, NotificationDefinition> mappedNotifications;
private final DomToNormalizedNodeParserFactory parserFactory;
- public NetconfMessageTransformer(final SchemaContext schemaContext) {
+ public NetconfMessageTransformer(final SchemaContext schemaContext, final boolean strictParsing) {
this.counter = new MessageCounter();
this.schemaContext = schemaContext;
- parserFactory = DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, schemaContext);
+ parserFactory = DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, schemaContext, strictParsing);
mappedRpcs = Maps.uniqueIndex(schemaContext.getOperations(), QNAME_FUNCTION);
mappedNotifications = Multimaps.index(schemaContext.getNotifications(), QNAME_NOREV_FUNCTION);
final DataSchemaNode schemasNode = ((ContainerSchemaNode) NetconfDevice.INIT_SCHEMA_CTX.getDataChildByName("netconf-state")).getDataChildByName("schemas");
final Document schemasXml = XmlUtil.readXmlToDocument(getClass().getResourceAsStream("/netconf-state.schemas.payload.xml"));
- final ToNormalizedNodeParser<Element, ContainerNode, ContainerSchemaNode> containerNodeParser = DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, NetconfDevice.INIT_SCHEMA_CTX).getContainerNodeParser();
+ final ToNormalizedNodeParser<Element, ContainerNode, ContainerSchemaNode> containerNodeParser = DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, NetconfDevice.INIT_SCHEMA_CTX, false).getContainerNodeParser();
final ContainerNode compositeNodeSchemas = containerNodeParser.parse(Collections.singleton(schemasXml.getDocumentElement()), (ContainerSchemaNode) schemasNode);
final NetconfStateSchemas schemas = NetconfStateSchemas.create(new RemoteDeviceId("device", new InetSocketAddress(99)), compositeNodeSchemas);
public void setup() throws Exception {
final SchemaContext schemaContext = getNotificationSchemaContext(getClass());
- messageTransformer = new NetconfMessageTransformer(schemaContext);
+ messageTransformer = new NetconfMessageTransformer(schemaContext, true);
final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
cfgCtx = parser.resolveSchemaContext(Sets.union(configModules, notifModules));
assertNotNull(cfgCtx);
- messageTransformer = new NetconfMessageTransformer(cfgCtx);
+ messageTransformer = new NetconfMessageTransformer(cfgCtx, true);
}
private LeafNode<Object> buildLeaf(final QName running, final Object value) {
}
private NetconfMessageTransformer getTransformer(final SchemaContext schema) {
- return new NetconfMessageTransformer(schema);
+ return new NetconfMessageTransformer(schema, true);
}
@Test
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import javax.ws.rs.core.Response.Status;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
-import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
-import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// PUT configuration
public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPut(
- final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
+ final SchemaContext globalSchema, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
checkPreconditions();
- final DataNormalizationOperation<?> rootOp = ControllerContext.getInstance().getRootOperation();
- return putDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, rootOp);
+ return putDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
}
public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPut(
final DOMMountPoint mountPoint, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
final Optional<DOMDataBroker> domDataBrokerService = mountPoint.getService(DOMDataBroker.class);
if (domDataBrokerService.isPresent()) {
- final DataNormalizationOperation<?> rootOp = new DataNormalizer(mountPoint.getSchemaContext()).getRootOperation();
return putDataViaTransaction(domDataBrokerService.get().newReadWriteTransaction(), CONFIGURATION, path,
- payload, rootOp);
+ payload, mountPoint.getSchemaContext());
}
throw new RestconfDocumentedException("DOM data broker service isn't available for mount point.");
}
// POST configuration
public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPost(
- final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
+ final SchemaContext globalSchema, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
checkPreconditions();
- final DataNormalizationOperation<?> rootOp = ControllerContext.getInstance().getRootOperation();
- return postDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, rootOp);
+ return postDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
}
public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPost(
final DOMMountPoint mountPoint, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
final Optional<DOMDataBroker> domDataBrokerService = mountPoint.getService(DOMDataBroker.class);
if (domDataBrokerService.isPresent()) {
- final DataNormalizationOperation<?> rootOp = new DataNormalizer(mountPoint.getSchemaContext()).getRootOperation();
return postDataViaTransaction(domDataBrokerService.get().newReadWriteTransaction(), CONFIGURATION, path,
- payload, rootOp);
+ payload, mountPoint.getSchemaContext());
}
throw new RestconfDocumentedException("DOM data broker service isn't available for mount point.");
}
private CheckedFuture<Void, TransactionCommitFailedException> postDataViaTransaction(
final DOMDataReadWriteTransaction rWTransaction, final LogicalDatastoreType datastore,
- final YangInstanceIdentifier parentPath, final NormalizedNode<?, ?> payload, final DataNormalizationOperation<?> root) {
+ final YangInstanceIdentifier parentPath, final NormalizedNode<?, ?> payload, final SchemaContext schemaContext) {
// FIXME: This is doing correct post for container and list children
// not sure if this will work for choice case
final YangInstanceIdentifier path;
LOG.trace("It wasn't possible to get data loaded from datastore at path " + path);
}
- ensureParentsByMerge(datastore, path, rWTransaction, root);
+ ensureParentsByMerge(datastore, path, rWTransaction, schemaContext);
rWTransaction.merge(datastore, path, payload);
LOG.trace("Post " + datastore.name() + " via Restconf: {}", path);
return rWTransaction.submit();
private CheckedFuture<Void, TransactionCommitFailedException> putDataViaTransaction(
final DOMDataReadWriteTransaction writeTransaction, final LogicalDatastoreType datastore,
- final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload, final DataNormalizationOperation<?> root) {
+ final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload, final SchemaContext schemaContext) {
LOG.trace("Put " + datastore.name() + " via Restconf: {}", path);
- ensureParentsByMerge(datastore, path, writeTransaction, root);
+ ensureParentsByMerge(datastore, path, writeTransaction, schemaContext);
writeTransaction.put(datastore, path, payload);
return writeTransaction.submit();
}
this.domDataBroker = domDataBroker;
}
- private final void ensureParentsByMerge(final LogicalDatastoreType store,
- final YangInstanceIdentifier normalizedPath, final DOMDataReadWriteTransaction rwTx,
- final DataNormalizationOperation<?> root) {
- final List<PathArgument> currentArguments = new ArrayList<>();
- final Iterator<PathArgument> iterator = normalizedPath.getPathArguments().iterator();
- DataNormalizationOperation<?> currentOp = root;
- while (iterator.hasNext()) {
- final PathArgument currentArg = iterator.next();
- try {
- currentOp = currentOp.getChild(currentArg);
- } catch (final DataNormalizationException e) {
- rwTx.cancel();
- throw new IllegalArgumentException(
- String.format("Invalid child encountered in path %s", normalizedPath), e);
- }
- currentArguments.add(currentArg);
- final YangInstanceIdentifier currentPath = YangInstanceIdentifier.create(currentArguments);
+ private void ensureParentsByMerge(final LogicalDatastoreType store,
+ final YangInstanceIdentifier normalizedPath, final DOMDataReadWriteTransaction rwTx, final SchemaContext schemaContext) {
+ final List<PathArgument> normalizedPathWithoutChildArgs = new ArrayList<>();
+ YangInstanceIdentifier rootNormalizedPath = null;
- final Boolean exists;
+ final Iterator<PathArgument> it = normalizedPath.getPathArguments().iterator();
- try {
-
- final CheckedFuture<Boolean, ReadFailedException> future = rwTx.exists(store, currentPath);
- exists = future.checkedGet();
- } catch (final ReadFailedException e) {
- LOG.error("Failed to read pre-existing data from store {} path {}", store, currentPath, e);
- rwTx.cancel();
- throw new IllegalStateException("Failed to read pre-existing data", e);
+ while(it.hasNext()) {
+ final PathArgument pathArgument = it.next();
+ if(rootNormalizedPath == null) {
+ rootNormalizedPath = YangInstanceIdentifier.create(pathArgument);
}
- if (!exists && iterator.hasNext()) {
- rwTx.merge(store, currentPath, currentOp.createDefault(currentArg));
+ // Skip last element, its not a parent
+ if(it.hasNext()) {
+ normalizedPathWithoutChildArgs.add(pathArgument);
}
}
+
+ // No parent structure involved, no need to ensure parents
+ if(normalizedPathWithoutChildArgs.isEmpty()) {
+ return;
+ }
+
+ Preconditions.checkArgument(rootNormalizedPath != null, "Empty path received");
+
+ final NormalizedNode<?, ?> parentStructure =
+ ImmutableNodes.fromInstanceId(schemaContext, YangInstanceIdentifier.create(normalizedPathWithoutChildArgs));
+ rwTx.merge(store, rootNormalizedPath, parentStructure);
}
}
if (mountPoint != null) {
broker.commitConfigurationDataPut(mountPoint, normalizedII, payload.getData()).checkedGet();
} else {
- broker.commitConfigurationDataPut(normalizedII, payload.getData()).checkedGet();
+ broker.commitConfigurationDataPut(controllerContext.getGlobalSchema(), normalizedII, payload.getData()).checkedGet();
}
break;
if (mountPoint != null) {
broker.commitConfigurationDataPost(mountPoint, normalizedII, payload.getData()).checkedGet();
} else {
- broker.commitConfigurationDataPost(normalizedII, payload.getData()).checkedGet();
+ broker.commitConfigurationDataPost(controllerContext.getGlobalSchema(), normalizedII, payload.getData()).checkedGet();
}
} catch(final RestconfDocumentedException e) {
throw e;
restconfImpl = RestconfImpl.getInstance();
restconfImpl.setBroker(brokerFacade);
restconfImpl.setControllerContext(controllerContext);
- when(brokerFacade.commitConfigurationDataPut(any(YangInstanceIdentifier.class), any(NormalizedNode.class)))
+ when(brokerFacade.commitConfigurationDataPut(any(SchemaContext.class), any(YangInstanceIdentifier.class), any(NormalizedNode.class)))
.thenReturn(mock(CheckedFuture.class));
}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
/**
when(wTransaction.submit()).thenReturn(expFuture);
- final Future<Void> actualFuture = brokerFacade.commitConfigurationDataPut(instanceID, dummyNode);
+ final Future<Void> actualFuture = brokerFacade.commitConfigurationDataPut((SchemaContext)null, instanceID, dummyNode);
assertSame("commitConfigurationDataPut", expFuture, actualFuture);
when(rwTransaction.submit()).thenReturn(expFuture);
final CheckedFuture<Void, TransactionCommitFailedException> actualFuture = brokerFacade.commitConfigurationDataPost(
- YangInstanceIdentifier.builder().build(), dummyNode);
+ (SchemaContext)null, YangInstanceIdentifier.builder().build(), dummyNode);
assertSame("commitConfigurationDataPost", expFuture, actualFuture);
when(rwTransaction.read(eq(LogicalDatastoreType.CONFIGURATION), any(YangInstanceIdentifier.class))).thenReturn(
dummyNodeInFuture);
try {
- brokerFacade.commitConfigurationDataPost(instanceID, dummyNode);
+ // Schema context is only necessary for ensuring parent structure
+ brokerFacade.commitConfigurationDataPost((SchemaContext)null, instanceID, dummyNode);
} catch (final RestconfDocumentedException e) {
assertEquals("getErrorTag", RestconfError.ErrorTag.DATA_EXISTS, e.getErrors().get(0).getErrorTag());
throw e;
final RpcResult<TransactionStatus> rpcResult = new DummyRpcResult.Builder<TransactionStatus>().result(
TransactionStatus.COMMITED).build();
- when(brokerFacade.commitConfigurationDataPost(any(YangInstanceIdentifier.class), any(NormalizedNode.class)))
+ when(brokerFacade.commitConfigurationDataPost((SchemaContext)null, any(YangInstanceIdentifier.class), any(NormalizedNode.class)))
.thenReturn(mock(CheckedFuture.class));
final ArgumentCaptor<YangInstanceIdentifier> instanceIdCaptor = ArgumentCaptor.forClass(YangInstanceIdentifier.class);
// FIXME : NEVER test a nr. of call some service in complex test suite
// verify(brokerFacade, times(2))
verify(brokerFacade, times(1))
- .commitConfigurationDataPost(instanceIdCaptor.capture(), compNodeCaptor.capture());
+ .commitConfigurationDataPost((SchemaContext)null, instanceIdCaptor.capture(), compNodeCaptor.capture());
// identifier = "[(urn:ietf:params:xml:ns:yang:test-interface?revision=2014-07-01)interfaces, (urn:ietf:params:xml:ns:yang:test-interface?revision=2014-07-01)block]";
assertEquals(identifier, ImmutableList.copyOf(instanceIdCaptor.getValue().getPathArguments()).toString());
}
public void createConfigurationDataNullTest() throws UnsupportedEncodingException {
initMocking();
- when(brokerFacade.commitConfigurationDataPost(any(YangInstanceIdentifier.class),any(NormalizedNode.class)))
+ when(brokerFacade.commitConfigurationDataPost(any(SchemaContext.class), any(YangInstanceIdentifier.class),any(NormalizedNode.class)))
.thenReturn(Futures.<Void, TransactionCommitFailedException>immediateCheckedFuture(null));
//FIXME : find who is set schemaContext
doThrow(OptimisticLockFailedException.class).
when(brokerFacade).commitConfigurationDataPut(
- any(YangInstanceIdentifier.class), any(NormalizedNode.class));
+ any(SchemaContext.class), any(YangInstanceIdentifier.class), any(NormalizedNode.class));
assertEquals(500, put(uri, MediaType.APPLICATION_XML, xmlData));
doThrow(OptimisticLockFailedException.class).doReturn(mock(CheckedFuture.class)).
when(brokerFacade).commitConfigurationDataPut(
- any(YangInstanceIdentifier.class), any(NormalizedNode.class));
+ any(SchemaContext.class), any(YangInstanceIdentifier.class), any(NormalizedNode.class));
assertEquals(200, put(uri, MediaType.APPLICATION_XML, xmlData));
}
doThrow(TransactionCommitFailedException.class).
when(brokerFacade).commitConfigurationDataPut(
- any(YangInstanceIdentifier.class), any(NormalizedNode.class));
+ (SchemaContext)null, any(YangInstanceIdentifier.class), any(NormalizedNode.class));
assertEquals(500, put(uri, MediaType.APPLICATION_XML, xmlData));
}
private void mockCommitConfigurationDataPutMethod(final boolean noErrors) {
if (noErrors) {
doReturn(mock(CheckedFuture.class)).when(brokerFacade).commitConfigurationDataPut(
- any(YangInstanceIdentifier.class), any(NormalizedNode.class));
+ any(SchemaContext.class), any(YangInstanceIdentifier.class), any(NormalizedNode.class));
} else {
doThrow(RestconfDocumentedException.class).when(brokerFacade).commitConfigurationDataPut(
- any(YangInstanceIdentifier.class), any(NormalizedNode.class));
+ any(SchemaContext.class), any(YangInstanceIdentifier.class), any(NormalizedNode.class));
}
}
private static final Logger LOG = LoggerFactory.getLogger(SubtreeFilter.class);
static Document applySubtreeFilter(Document requestDocument, Document rpcReply) throws NetconfDocumentedException {
- // FIXME: rpcReply document must be reread otherwise some nodes do not inherit namespaces. (services/service)
- try {
- rpcReply = XmlUtil.readXmlToDocument(XmlUtil.toString(rpcReply, true));
- } catch (SAXException | IOException e) {
- LOG.error("Cannot transform document", e);
- throw new NetconfDocumentedException("Cannot transform document");
- }
-
OperationNameAndNamespace operationNameAndNamespace = new OperationNameAndNamespace(requestDocument);
if (XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0.equals(operationNameAndNamespace.getNamespace()) &&
XmlNetconfConstants.GET.equals(operationNameAndNamespace.getOperationName()) ||
// not implement filtering.
Optional<XmlElement> maybeFilter = operationNameAndNamespace.getOperationElement().getOnlyChildElementOptionally(
XmlNetconfConstants.FILTER, XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
- if (maybeFilter.isPresent() && (
- "subtree".equals(maybeFilter.get().getAttribute("type"))||
- "subtree".equals(maybeFilter.get().getAttribute("type", XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0)))
- ) {
+ if (!maybeFilter.isPresent()) {
+ return rpcReply;
+ }
+ // FIXME: rpcReply document must be reread otherwise some nodes do not inherit namespaces. (services/service)
+ try {
+ rpcReply = XmlUtil.readXmlToDocument(XmlUtil.toString(rpcReply, true));
+ } catch (SAXException | IOException e) {
+ LOG.error("Cannot transform document", e);
+ throw new NetconfDocumentedException("Cannot transform document" + e);
+ }
+ XmlElement filter = maybeFilter.get();
+ if ("subtree".equals(filter.getAttribute("type"))||
+ "subtree".equals(filter.getAttribute("type", XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0))) {
// do
return filtered(maybeFilter.get(), rpcReply);
}
}
+
return rpcReply; // return identical document
}