import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.config.threadpool.ThreadPool;
+import com.google.common.base.Optional;
+
/**
* Implementation of {@link ThreadPool} using flexible number of threads wraps
* {@link ExecutorService}.
public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
ThreadFactory threadFactory) {
+ this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.<Integer>absent()));
+ }
+
+ public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+ ThreadFactory threadFactory, Optional<Integer> queueCapacity) {
+ this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
+ }
+
+ private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+ ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
- new SynchronousQueue<Runnable>(), threadFactory);
+ queue, threadFactory, new FlexibleRejectionHandler());
executor.prestartAllCoreThreads();
}
+ /**
+ * Overriding the queue:
+ * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+ * occurs in RejectedExecutionHandler.
+ * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+ */
+ private static ForwardingBlockingQueue getQueue(Optional<Integer> capacity) {
+ final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<Runnable>(capacity.get()) : new LinkedBlockingQueue<Runnable>();
+ return new ForwardingBlockingQueue(delegate);
+ }
+
@Override
public ExecutorService getExecutor() {
return Executors.unconfigurableExecutorService(executor);
executor.shutdown();
}
+ /**
+ * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+ */
+ private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
+ }
+ }
+ }
+
+ private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
+ private final BlockingQueue<Runnable> delegate;
+
+ public ForwardingBlockingQueue(BlockingQueue<Runnable> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected BlockingQueue<Runnable> delegate() {
+ return delegate;
+ }
+
+ @Override
+ public boolean offer(final Runnable r) {
+ // ThreadPoolExecutor will spawn a new thread after core size is reached only
+ // if the queue.offer returns false.
+ return false;
+ }
+ }
}
*/
package org.opendaylight.controller.config.yang.threadpool.impl.flexible;
+import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.config.api.JmxAttributeValidationException;
JmxAttributeValidationException.checkNotNull(getMaxThreadCount(), maxThreadCountJmxAttribute);
JmxAttributeValidationException.checkCondition(getMaxThreadCount() > 0, "must be greater than zero",
maxThreadCountJmxAttribute);
+
+ if(getQueueCapacity() != null) {
+ JmxAttributeValidationException.checkCondition(getQueueCapacity() > 0, "Queue capacity cannot be < 1", queueCapacityJmxAttribute);
+ }
}
@Override
public java.lang.AutoCloseable createInstance() {
return new FlexibleThreadPoolWrapper(getMinThreadCount(), getMaxThreadCount(), getKeepAliveMillis(),
- TimeUnit.MILLISECONDS, getThreadFactoryDependency());
+ TimeUnit.MILLISECONDS, getThreadFactoryDependency(), Optional.fromNullable(getQueueCapacity()));
}
}
type uint32;
}
+ leaf queueCapacity {
+ type uint16;
+ mandatory false;
+ description "Capacity of queue that holds waiting tasks";
+ }
+
container threadFactory {
uses config:service-ref {
refine type {
<data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
- <!-- Netconf dispatcher to be used by all netconf-connectors -->
+ <!-- Netconf dispatcher to be used by all netconf-connectors -->
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf:client:dispatcher">prefix:netconf-client-dispatcher</type>
<name>global-netconf-dispatcher</name>
</timer>
</module>
- <!-- Thread factory to be used by all threadpools in netconf-connectors -->
+ <!-- Thread factory to be used by all threadpools in netconf-connectors -->
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
<name>global-netconf-processing-executor-threadfactory</name>
<name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
- </module>
- <!-- Flexible threadpool for all netconf connectors, Max thread count is set to 4 -->
+ </module>
+ <!-- flexible threadpool for all netconf connectors, Max thread count is set to 4. -->
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">prefix:threadpool-flexible</type>
<name>global-netconf-processing-executor</name>
<minThreadCount xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">1</minThreadCount>
<max-thread-count xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">4</max-thread-count>
<keepAliveMillis xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">600000</keepAliveMillis>
+
<threadFactory xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadfactory</type>
<name>global-netconf-processing-executor-threadfactory</name>
</threadFactory>
- </module>
+ </module>
</modules>
<services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
+/**
+ * A transaction that provides read access to a logical data store.
+ * <p>
+ * For more information on usage and examples, please see the documentation in {@link AsyncReadTransaction}.
+ */
public interface ReadTransaction extends AsyncReadTransaction<InstanceIdentifier<?>, DataObject> {
- @Override
- ListenableFuture<Optional<DataObject>> read(LogicalDatastoreType store, InstanceIdentifier<?> path);
+
+ /**
+ * Reads data from the provided logical data store located at the provided path.
+ *<p>
+ * If the target is a subtree, then the whole subtree is read (and will be
+ * accessible from the returned data object).
+ *
+ * @param store
+ * Logical data store from which read should occur.
+ * @param path
+ * Path which uniquely identifies subtree which client want to
+ * read
+ * @return Listenable Future which contains read result
+ * <ul>
+ * <li>If data at supplied path exists the
+ * {@link ListeblaFuture#get()} returns Optional object containing
+ * data once read is done.
+ * <li>If data at supplied path does not exists the
+ * {@link ListenbleFuture#get()} returns {@link Optional#absent()}.
+ * </ul>
+ */
+ <T extends DataObject> ListenableFuture<Optional<T>> read(LogicalDatastoreType store, InstanceIdentifier<T> path);
}
* For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
*/
public interface WriteTransaction extends AsyncWriteTransaction<InstanceIdentifier<?>, DataObject> {
- @Override
- void put(LogicalDatastoreType store, InstanceIdentifier<?> path, DataObject data);
+
+ /**
+ * Stores a piece of data at the specified path. This acts as an add / replace
+ * operation, which is to say that whole subtree will be replaced by the specified data.
+ * <p>
+ * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
+ * <p>
+ * If you need to make sure that a parent object exists but you do not want modify
+ * its pre-existing state by using put, consider using {@link #merge} instead.
+ *
+ * @param store
+ * the logical data store which should be modified
+ * @param path
+ * the data object path
+ * @param data
+ * the data object to be written to the specified path
+ * @throws IllegalStateException
+ * if the transaction has already been submitted
+ */
+ <T extends DataObject> void put(LogicalDatastoreType store, InstanceIdentifier<T> path, T data);
+
+ /**
+ * Merges a piece of data with the existing data at a specified path. Any pre-existing data
+ * which is not explicitly overwritten will be preserved. This means that if you store a container,
+ * its child lists will be merged.
+ * <p>
+ * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
+ *<p>
+ * If you require an explicit replace operation, use {@link #put} instead.
+ *
+ * @param store
+ * the logical data store which should be modified
+ * @param path
+ * the data object path
+ * @param data
+ * the data object to be merged to the specified path
+ * @throws IllegalStateException
+ * if the transaction has already been submitted
+ */
+ <T extends DataObject> void merge(LogicalDatastoreType store, InstanceIdentifier<T> path, T data);
@Override
void delete(LogicalDatastoreType store, InstanceIdentifier<?> path);
return codec;
}
- protected final ListenableFuture<Optional<DataObject>> doRead(final DOMDataReadTransaction readTx,
- final LogicalDatastoreType store, final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> path) {
+ protected final <T extends DataObject> ListenableFuture<Optional<T>> doRead(final DOMDataReadTransaction readTx,
+ final LogicalDatastoreType store, final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<T> path) {
return Futures.transform(readTx.read(store, codec.toNormalized(path)), codec.deserializeFunction(path));
}
}
import java.util.Collections;
import java.util.Map.Entry;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.Identifiable;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
*
getDelegate().delete(store, normalized);
}
- protected final ListenableFuture<RpcResult<TransactionStatus>> doCommit() {
- return getDelegate().commit();
+ protected final CheckedFuture<Void,TransactionCommitFailedException> doSubmit() {
+ return getDelegate().submit();
}
protected final boolean doCancel() {
}
@Override
- public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
- final InstanceIdentifier<?> path) {
+ public <T extends DataObject> ListenableFuture<Optional<T>> read(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path) {
return doRead(getDelegate(),store, path);
}
}
@Override
- public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
- final InstanceIdentifier<?> path) {
+ public <T extends DataObject> ListenableFuture<Optional<T>> read(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path) {
return doRead(getDelegate(), store, path);
}
}
\ No newline at end of file
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
-
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
class BindingDataWriteTransactionImpl<T extends DOMDataWriteTransaction> extends
super(delegateTx, codec);
}
-
-
@Override
- public void put(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+ public <T extends DataObject> void put(final LogicalDatastoreType store, final InstanceIdentifier<T> path,
+ final T data) {
doPut(store, path, data);
}
@Override
- public void merge(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+ public <T extends DataObject> void merge(final LogicalDatastoreType store, final InstanceIdentifier<T> path,
+ final T data) {
doMerge(store, path, data);
}
@Override
public ListenableFuture<RpcResult<TransactionStatus>> commit() {
- return doCommit();
+ return AbstractDataTransaction.convertToLegacyCommitFuture(submit());
+ }
+
+ @Override
+ public CheckedFuture<Void,TransactionCommitFailedException> submit() {
+ return doSubmit();
}
@Override
*/
public Optional<InstanceIdentifier<? extends DataObject>> toBinding(
final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
- throws DeserializationException {
+ throws DeserializationException {
PathArgument lastArgument = Iterables.getLast(normalized.getPathArguments());
// Used instance-identifier codec do not support serialization of last
private Optional<InstanceIdentifier<? extends DataObject>> toBindingAugmented(
final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
- throws DeserializationException {
+ throws DeserializationException {
Optional<InstanceIdentifier<? extends DataObject>> potential = toBindingImpl(normalized);
// Shorthand check, if codec already supports deserialization
// of AugmentationIdentifier we will return
private Optional<InstanceIdentifier<? extends DataObject>> toBindingImpl(
final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
- throws DeserializationException {
+ throws DeserializationException {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier legacyPath;
try {
private DataNormalizationOperation<?> findNormalizationOperation(
final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
- throws DataNormalizationException {
+ throws DataNormalizationException {
DataNormalizationOperation<?> current = legacyToNormalized.getRootOperation();
for (PathArgument arg : normalized.getPathArguments()) {
current = current.getChild(arg);
public Optional<Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject>> toBinding(
final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized)
- throws DeserializationException {
+ throws DeserializationException {
Optional<InstanceIdentifier<? extends DataObject>> potentialPath = toBinding(normalized.getKey());
if (potentialPath.isPresent()) {
InstanceIdentifier<? extends DataObject> bindingPath = potentialPath.get();
return Optional.absent();
}
- private Optional<AugmentationSchema> findAugmentation(final Class targetType,
+ private Optional<AugmentationSchema> findAugmentation(final Class<?> targetType,
final Set<AugmentationSchema> augmentations) {
YangModuleInfo moduleInfo;
try {
if (isAugmentation(arg.getType())) {
count++;
}
+
}
return count;
}
return count;
}
- public Function<Optional<NormalizedNode<?, ?>>, Optional<DataObject>> deserializeFunction(
- final InstanceIdentifier<?> path) {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public <T extends DataObject> Function<Optional<NormalizedNode<?, ?>>, Optional<T>> deserializeFunction(final InstanceIdentifier<T> path) {
return new DeserializeFunction(this, path);
}
- private static class DeserializeFunction implements Function<Optional<NormalizedNode<?, ?>>, Optional<DataObject>> {
+ private static class DeserializeFunction<T extends DataObject> implements Function<Optional<NormalizedNode<?, ?>>, Optional<T>> {
private final BindingToNormalizedNodeCodec codec;
private final InstanceIdentifier<?> path;
this.path = Preconditions.checkNotNull(path, "Path must not be null");
}
+ @SuppressWarnings("rawtypes")
@Nullable
@Override
- public Optional<DataObject> apply(@Nullable final Optional<NormalizedNode<?, ?>> normalizedNode) {
+ public Optional apply(@Nullable final Optional<NormalizedNode<?, ?>> normalizedNode) {
if (normalizedNode.isPresent()) {
final DataObject dataObject;
try {
/**
* Returns an default object according to YANG schema for supplied path.
*
- * @param path
- * DOM Path
+ * @param path DOM Path
* @return Node with defaults set on.
*/
public NormalizedNode<?, ?> getDefaultNodeFor(final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path) {
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
import org.opendaylight.controller.md.sal.common.api.data.DataReader;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
@Override
public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
if(requestCommitSuccess) {
- return tx.getDelegate().commit();
+ return AbstractDataTransaction.convertToLegacyCommitFuture(tx.getDelegate().submit());
}
return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>failed().withResult(TransactionStatus.FAILED).build());
}
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
import java.util.EventListener;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RpcProviderRegistryImpl implements //
- RpcProviderRegistry, //
- RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
+public class RpcProviderRegistryImpl implements RpcProviderRegistry, RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
private RuntimeCodeGenerator rpcFactory = SingletonHolder.RPC_GENERATOR_IMPL;
- // publicProxies is a cache of proxy objects where each value in the map corresponds to a specific RpcService
- private final Map<Class<? extends RpcService>, RpcService> publicProxies = new WeakHashMap<>();
+ // cache of proxy objects where each value in the map corresponds to a specific RpcService
+ private final LoadingCache<Class<? extends RpcService>, RpcService> publicProxies = CacheBuilder.newBuilder().weakKeys().
+ build(new CacheLoader<Class<? extends RpcService>, RpcService>() {
+ @Override
+ public RpcService load(final Class<? extends RpcService> type) {
+ final RpcService proxy = rpcFactory.getDirectProxyFor(type);
+ LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
+ return proxy;
+ }
+ });
+
private final Map<Class<? extends RpcService>, RpcRouter<?>> rpcRouters = new WeakHashMap<>();
private final ListenerRegistry<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> routeChangeListeners = ListenerRegistry
.create();
@SuppressWarnings("unchecked")
@Override
public final <T extends RpcService> T getRpcService(final Class<T> type) {
-
- T potentialProxy = (T) publicProxies.get(type);
- if (potentialProxy != null) {
- return potentialProxy;
- }
- synchronized (this) {
- /**
- * Potential proxy could be instantiated by other thread while we
- * were waiting for the lock.
- */
-
- potentialProxy = (T) publicProxies.get(type);
- if (potentialProxy != null) {
- return potentialProxy;
- }
- T proxy = rpcFactory.getDirectProxyFor(type);
- LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
- publicProxies.put(type, proxy);
- return proxy;
- }
+ return (T) publicProxies.getUnchecked(type);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
}
- private class RouteChangeForwarder<T extends RpcService> implements
- RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
+ private class RouteChangeForwarder<T extends RpcService> implements RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
private final Class<T> type;
}
}
- public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements
- RpcRegistration<T> {
+ public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
private final Class<T> serviceType;
private RpcProviderRegistryImpl registry;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
import org.opendaylight.controller.md.sal.common.api.data.DataModification;
import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * @deprecated This is part of the legacy DataBrokerService
+ */
+@Deprecated
class BindingToDomCommitHandler implements
DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> {
this.biDataService = biDataService;
}
- public void setMappingService(BindingIndependentMappingService mappingService) {
+ public void setMappingService(final BindingIndependentMappingService mappingService) {
this.mappingService = mappingService;
}
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+
import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * @deprecated This is part of the legacy DataBrokerService
+ */
+@Deprecated
class DomToBindingCommitHandler implements //
RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
private DataProviderService baDataService;
private BindingIndependentMappingService mappingService;
- public void setBindingAwareDataService(DataProviderService baDataService) {
+ public void setBindingAwareDataService(final DataProviderService baDataService) {
this.baDataService = baDataService;
}
- public void setMappingService(BindingIndependentMappingService mappingService) {
+ public void setMappingService(final BindingIndependentMappingService mappingService) {
this.mappingService = mappingService;
}
"foo").build()).build();
initialTx.put(LogicalDatastoreType.OPERATIONAL, path(TOP_FOO_KEY),
topLevelList(TOP_FOO_KEY, fooAugment));
- assertCommit(initialTx.commit());
+ assertCommit(initialTx.submit());
}
private void delete(final InstanceIdentifier<?> path) {
WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
tx.delete(LogicalDatastoreType.OPERATIONAL, path);
- assertCommit(tx.commit());
+ assertCommit(tx.submit());
}
private void verifyRemoved(
protected void setupWithDataBroker(final DataBroker dataBroker) {
WriteTransaction initialTx = dataBroker.newWriteOnlyTransaction();
initialTx.put(CONFIGURATION, TOP, top(topLevelList(TOP_FOO_KEY)));
- assertCommit(initialTx.commit());
+ assertCommit(initialTx.submit());
}
@Test
ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
writeTx.put(CONFIGURATION, TOP, top(topLevelList(TOP_BAR_KEY)));
- assertCommit(writeTx.commit());
+ assertCommit(writeTx.submit());
AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> top = topListener.event();
AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> all = allListener.event();
AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> foo = fooListener.event();
ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
writeTx.merge(CONFIGURATION, TOP, top(topLevelList(TOP_BAR_KEY)));
- assertCommit(writeTx.commit());
+ assertCommit(writeTx.submit());
verifyBarOnlyAdded(topListener,allListener,fooListener,barListener);
}
ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
writeTx.put(CONFIGURATION, TOP_BAR, topLevelList(TOP_BAR_KEY));
- assertCommit(writeTx.commit());
+ assertCommit(writeTx.submit());
verifyBarOnlyAdded(topListener,allListener,fooListener,barListener);
}
ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
writeTx.merge(CONFIGURATION, TOP_BAR, topLevelList(TOP_BAR_KEY));
- assertCommit(writeTx.commit());
+ assertCommit(writeTx.submit());
verifyBarOnlyAdded(topListener,allListener,fooListener,barListener);
}
*/
package org.opendaylight.controller.md.sal.binding.impl.test;
-import static org.junit.Assert.assertEquals;
-
import java.util.concurrent.ExecutionException;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.TopBuilder;
WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
writeTx.put(LogicalDatastoreType.OPERATIONAL, TOP_PATH, new TopBuilder().build());
writeTx.put(LogicalDatastoreType.OPERATIONAL, NODE_PATH, new TopLevelListBuilder().setKey(TOP_LIST_KEY).build());
- assertEquals(TransactionStatus.COMMITED, writeTx.commit().get().getResult());
+ writeTx.submit().get();
}
}
*/
package org.opendaylight.controller.md.sal.binding.test;
-import static org.junit.Assert.assertEquals;
-
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import com.google.common.util.concurrent.ListenableFuture;
return domBroker;
}
- protected static final void assertCommit(final ListenableFuture<RpcResult<TransactionStatus>> commit) {
+ protected static final void assertCommit(final ListenableFuture<Void> commit) {
try {
- assertEquals(TransactionStatus.COMMITED,commit.get(500, TimeUnit.MILLISECONDS).getResult());
+ commit.get(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+/**
+ *
+ *
+ * @deprecated Use
+ * {@link org.opendaylight.controller.md.sal.binding.api.ReadTransaction#read(org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType, InstanceIdentifier)}
+ * instead.
+ */
+@Deprecated
public final class TypeSafeDataReader {
private final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> delegate;
return delegate;
}
- public TypeSafeDataReader(DataReader<InstanceIdentifier<? extends DataObject>, DataObject> delegate) {
+ public TypeSafeDataReader(
+ final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> delegate) {
this.delegate = delegate;
}
@SuppressWarnings("unchecked")
- public <D extends DataObject> D readConfigurationData(InstanceIdentifier<D> path) {
+ public <D extends DataObject> D readConfigurationData(
+ final InstanceIdentifier<D> path) {
return (D) delegate.readConfigurationData(path);
}
@SuppressWarnings("unchecked")
- public <D extends DataObject> D readOperationalData(InstanceIdentifier<D> path) {
+ public <D extends DataObject> D readOperationalData(
+ final InstanceIdentifier<D> path) {
return (D) delegate.readOperationalData(path);
}
- public static TypeSafeDataReader forReader(DataReader<InstanceIdentifier<? extends DataObject>, DataObject> delegate) {
+ public static TypeSafeDataReader forReader(
+ final DataReader<InstanceIdentifier<? extends DataObject>, DataObject> delegate) {
return new TypeSafeDataReader(delegate);
}
}
import org.opendaylight.yangtools.concepts.Path;
/**
- * Read-only transaction, which provides stable view of data
- * and is {@link AutoCloseable} resource.
+ * Marker interface for a read-only view of the data tree.
*
* @see AsyncReadTransaction
*
-* @param <P>
+ * @param <P>
* Type of path (subtree identifier), which represents location in
* tree
* @param <D>
import org.opendaylight.yangtools.concepts.Path;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
-
/**
*
- * Provides a stateful read view of the data tree.
+ * Marker interface for stateful read view of the data tree.
*
* <p>
* View of the data tree is a stable point-in-time snapshot of the current data tree state when
* <p>
* <b>Note:</b> example contains blocking calls on future only to illustrate
* that action happened after other asynchronous action. Use of blocking call
- * {@link ListenableFuture#get()} is discouraged for most uses and you should
- * use
- * {@link com.google.common.util.concurrent.Futures#addCallback(ListenableFuture, com.google.common.util.concurrent.FutureCallback)}
+ * {@link com.google.common.util.concurrent.ListenableFuture#get()} is discouraged for most
+ * uses and you should use
+ * {@link com.google.common.util.concurrent.Futures#addCallback(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.FutureCallback)}
* or other functions from {@link com.google.common.util.concurrent.Futures} to
* register more specific listeners.
*
* tree
* @param <D>
* Type of data (payload), which represents data payload
+ *
+ * @see org.opendaylight.controller.md.sal.binding.api.ReadTransaction
+ * @see org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction
*/
public interface AsyncReadTransaction<P extends Path<P>, D> extends AsyncTransaction<P, D> {
- /**
- *
- * Reads data from provided logical data store located at the provided path.
- *<p>
- * If the target is a subtree, then the whole subtree is read (and will be
- * accessible from the returned data object).
- *
- * @param store
- * Logical data store from which read should occur.
- * @param path
- * Path which uniquely identifies subtree which client want to
- * read
- * @return Listenable Future which contains read result
- * <ul>
- * <li>If data at supplied path exists the
- * {@link ListeblaFuture#get()} returns Optional object containing
- * data once read is done.
- * <li>If data at supplied path does not exists the
- * {@link ListenbleFuture#get()} returns {@link Optional#absent()}.
- * </ul>
- */
- ListenableFuture<Optional<D>> read(LogicalDatastoreType store, P path);
-
}
import org.opendaylight.yangtools.concepts.Path;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
/**
* change for the data tree and it is not visible to any other concurrently running
* transaction.
* <p>
- * Applications publish the changes proposed in the transaction by calling {@link #commit}
- * on the transaction. This seals the transaction
+ * Applications make changes to the local data tree in the transaction by via the
+ * <b>put</b>, <b>merge</b>, and <b>delete</b> operations.
+ *
+ * <h2>Put operation</h2>
+ * Stores a piece of data at a specified path. This acts as an add / replace
+ * operation, which is to say that whole subtree will be replaced by the
+ * specified data.
+ * <p>
+ * Performing the following put operations:
+ *
+ * <pre>
+ * 1) container { list [ a ] }
+ * 2) container { list [ b ] }
+ * </pre>
+ *
+ * will result in the following data being present:
+ *
+ * <pre>
+ * container { list [ b ] }
+ * </pre>
+ * <h2>Merge operation</h2>
+ * Merges a piece of data with the existing data at a specified path. Any pre-existing data
+ * which is not explicitly overwritten will be preserved. This means that if you store a container,
+ * its child lists will be merged.
+ * <p>
+ * Performing the following merge operations:
+ *
+ * <pre>
+ * 1) container { list [ a ] }
+ * 2) container { list [ b ] }
+ * </pre>
+ *
+ * will result in the following data being present:
+ *
+ * <pre>
+ * container { list [ a, b ] }
+ * </pre>
+ *
+ * This also means that storing the container will preserve any
+ * augmentations which have been attached to it.
+ *
+ * <h2>Delete operation</h2>
+ * Removes a piece of data from a specified path.
+ * <p>
+ * After applying changes to the local data tree, applications publish the changes proposed in the
+ * transaction by calling {@link #submit} on the transaction. This seals the transaction
* (preventing any further writes using this transaction) and submits it to be
* processed and applied to global conceptual data tree.
* <p>
* The transaction commit may fail due to a concurrent transaction modifying and committing data in
- * an incompatible way. See {@link #commit()} for more concrete commit failure examples.
- *
- *
+ * an incompatible way. See {@link #submit} for more concrete commit failure examples.
* <p>
* <b>Implementation Note:</b> This interface is not intended to be implemented
* by users of MD-SAL, but only to be consumed by them.
* {@link TransactionStatus#CANCELED} will have no effect, and transaction
* is considered cancelled.
*
- * Invoking cancel() on finished transaction (future returned by {@link #commit()}
+ * Invoking cancel() on finished transaction (future returned by {@link #submit()}
* already completed with {@link TransactionStatus#COMMITED}) will always
* fail (return false).
*
* <tt>true</tt> otherwise
*
*/
- public boolean cancel();
-
- /**
- * Store a piece of data at specified path. This acts as an add / replace
- * operation, which is to say that whole subtree will be replaced by
- * specified path. Performing the following put operations:
- *
- * <pre>
- * 1) container { list [ a ] }
- * 2) container { list [ b ] }
- * </pre>
- *
- * will result in the following data being present:
- *
- * <pre>
- * container { list [ b ] }
- * </pre>
- *
- *
- * If you need to make sure that a parent object exists, but you do not want modify
- * its preexisting state by using put, consider using
- * {@link #merge(LogicalDatastoreType, Path, Object)}
- *
- * @param store
- * Logical data store which should be modified
- * @param path
- * Data object path
- * @param data
- * Data object to be written to specified path
- * @throws IllegalStateException
- * if the transaction is no longer {@link TransactionStatus#NEW}
- */
- public void put(LogicalDatastoreType store, P path, D data);
-
- /**
- * Store a piece of data at the specified path. This acts as a merge operation,
- * which is to say that any pre-existing data which is not explicitly
- * overwritten will be preserved. This means that if you store a container,
- * its child lists will be merged. Performing the following merge
- * operations:
- *
- * <pre>
- * 1) container { list [ a ] }
- * 2) container { list [ b ] }
- * </pre>
- *
- * will result in the following data being present:
- *
- * <pre>
- * container { list [ a, b ] }
- * </pre>
- *
- * This also means that storing the container will preserve any
- * augmentations which have been attached to it.
- *<p>
- * If you require an explicit replace operation, use
- * {@link #put(LogicalDatastoreType, Path, Object)} instead.
- *
- * @param store
- * Logical data store which should be modified
- * @param path
- * Data object path
- * @param data
- * Data object to be written to specified path
- * @throws IllegalStateException
- * if the transaction is no longer {@link TransactionStatus#NEW}
- */
- public void merge(LogicalDatastoreType store, P path, D data);
+ boolean cancel();
/**
- * Remove a piece of data from specified path. This operation does not fail
+ * Removes a piece of data from specified path. This operation does not fail
* if the specified path does not exist.
*
* @param store
* @throws IllegalStateException
* if the transaction is no longer {@link TransactionStatus#NEW}
*/
- public void delete(LogicalDatastoreType store, P path);
+ void delete(LogicalDatastoreType store, P path);
/**
- * Submits transaction to be applied to update logical data tree.
+ * Submits this transaction to be asynchronously applied to update the logical data tree.
+ * The returned CheckedFuture conveys the result of applying the data changes.
+ * <p>
+ * <b>Note:</b> It is strongly recommended to process the CheckedFuture result in an asynchronous
+ * manner rather than using the blocking get() method. See example usage below.
* <p>
* This call logically seals the transaction, which prevents the client from
* further changing data tree using this transaction. Any subsequent calls to
* {@link IllegalStateException}.
*
* The transaction is marked as {@link TransactionStatus#SUBMITED} and
- * enqueued into the data store backed for processing.
+ * enqueued into the data store back-end for processing.
*
* <p>
* Whether or not the commit is successful is determined by versioning
- * of data tree and validation of registered commit participants
- * {@link AsyncConfigurationCommitHandler}
- * if transaction changes {@link LogicalDatastoreType#CONFIGURATION} data tree.
- *<p>
- * The effects of successful commit of data depends on
- * other data change listeners {@link AsyncDataChangeListener} and
- * {@link AsyncConfigurationCommitHandler}, which was registered to the
- * same {@link AsyncDataBroker}, to which this transaction belongs.
- *
+ * of the data tree and validation of registered commit participants
+ * ({@link AsyncConfigurationCommitHandler})
+ * if the transaction changes the data tree.
+ * <p>
+ * The effects of a successful commit of data depends on data change listeners
+ * ({@link AsyncDataChangeListener}) and commit participants
+ * ({@link AsyncConfigurationCommitHandler}) that are registered with the data broker.
+ * <p>
+ * <h3>Example usage:</h3>
+ * <pre>
+ * private void doWrite( final int tries ) {
+ * WriteTransaction writeTx = dataBroker.newWriteOnlyTransaction();
+ *
+ * MyDataObject data = ...;
+ * InstanceIdentifier<MyDataObject> path = ...;
+ * writeTx.put( LogicalDatastoreType.OPERATIONAL, path, data );
+ *
+ * Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
+ * public void onSuccess( Void result ) {
+ * // succeeded
+ * }
+ *
+ * public void onFailure( Throwable t ) {
+ * if( t instanceof OptimisticLockFailedException ) {
+ * if( ( tries - 1 ) > 0 ) {
+ * // do retry
+ * doWrite( tries - 1 );
+ * } else {
+ * // out of retries
+ * }
+ * } else {
+ * // failed due to another type of TransactionCommitFailedException.
+ * }
+ * } );
+ * }
+ * ...
+ * doWrite( 2 );
+ * </pre>
* <h2>Failure scenarios</h2>
* <p>
* Transaction may fail because of multiple reasons, such as
* <ul>
- * <li>Another transaction finished earlier and modified the same node in
- * non-compatible way (see below). In this case the returned future will fail with
+ * <li>Another transaction finished earlier and modified the same node in a
+ * non-compatible way (see below). In this case the returned future will fail with an
* {@link OptimisticLockFailedException}. It is the responsibility of the
* caller to create a new transaction and submit the same modification again in
- * order to update data tree.</li>
+ * order to update data tree. <i><b>Warning</b>: In most cases, retrying after an
+ * OptimisticLockFailedException will result in a high probability of success.
+ * However, there are scenarios, albeit unusual, where any number of retries will
+ * not succeed. Therefore it is strongly recommended to limit the number of retries (2 or 3)
+ * to avoid an endless loop.</i>
+ * </li>
* <li>Data change introduced by this transaction did not pass validation by
* commit handlers or data was incorrectly structured. Returned future will
- * fail with {@link DataValidationFailedException}. User should not retry to
+ * fail with a {@link DataValidationFailedException}. User should not retry to
* create new transaction with same data, since it probably will fail again.
* </li>
* </ul>
* txA.put(CONFIGURATION, PATH, A); // writes to PATH value A
* txB.put(CONFIGURATION, PATH, B) // writes to PATH value B
*
- * ListenableFuture futureA = txA.commit(); // transaction A is sealed and committed
- * ListenebleFuture futureB = txB.commit(); // transaction B is sealed and committed
+ * ListenableFuture futureA = txA.submit(); // transaction A is sealed and submitted
+ * ListenebleFuture futureB = txB.submit(); // transaction B is sealed and submitted
* </pre>
*
* Commit of transaction A will be processed asynchronously and data tree
* with {@link OptimisticLockFailedException} exception, which indicates to
* client that concurrent transaction prevented the submitted transaction from being
* applied.
- *
- * @return Result of the Commit, containing success information or list of
- * encountered errors, if commit was not successful. The Future
- * blocks until {@link TransactionStatus#COMMITED} is reached.
- * Future will fail with {@link TransactionCommitFailedException} if
- * Commit of this transaction failed. TODO: Usability: Consider
- * change from ListenableFuture to
- * {@link com.google.common.util.concurrent.CheckedFuture} which
- * will throw {@link TransactionCommitFailedException}.
+ * <br>
+ * @return a CheckFuture containing the result of the commit. The Future blocks until the
+ * commit operation is complete. A successful commit returns nothing. On failure,
+ * the Future will fail with a {@link TransactionCommitFailedException} or an exception
+ * derived from TransactionCommitFailedException.
*
* @throws IllegalStateException
* if the transaction is not {@link TransactionStatus#NEW}
*/
- public ListenableFuture<RpcResult<TransactionStatus>> commit();
+ CheckedFuture<Void,TransactionCommitFailedException> submit();
+
+ /**
+ * @deprecated Use {@link #submit()} instead.
+ */
+ @Deprecated
+ ListenableFuture<RpcResult<TransactionStatus>> commit();
}
package org.opendaylight.controller.md.sal.common.api.data;
import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import com.google.common.base.Preconditions;
private Class<? extends Path<?>> pathType;
- public <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path, final String message, final Throwable cause) {
- super(message, cause);
+ public <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path,
+ final String message, final Throwable cause) {
+ super(message, cause, RpcResultBuilder.newError(ErrorType.APPLICATION, "invalid-value", message, null,
+ path != null ? path.toString() : null, cause));
this.pathType = Preconditions.checkNotNull(pathType, "path type must not be null");
this.path = Preconditions.checkNotNull(path,"path must not be null.");
}
- public <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path,final String message) {
- this(pathType,path,message,null);
+ public <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path,
+ final String message) {
+ this(pathType, path, message, null);
}
public final Path<?> getPath() {
package org.opendaylight.controller.md.sal.common.api.data;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+
/**
*
* Failure of asynchronous transaction commit caused by failure
private static final long serialVersionUID = 1L;
- protected OptimisticLockFailedException(final String message, final Throwable cause, final boolean enableSuppression,
- final boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
public OptimisticLockFailedException(final String message, final Throwable cause) {
- super(message, cause);
+ super(message, cause, RpcResultBuilder.newError(ErrorType.APPLICATION, "resource-denied",
+ message, null, null, cause));
}
public OptimisticLockFailedException(final String message) {
- super(message);
+ this(message, null);
}
}
*/
package org.opendaylight.controller.md.sal.common.api.data;
+import java.util.Arrays;
+import java.util.List;
+
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+
+import com.google.common.collect.ImmutableList;
+
/**
*
* Failed commit of asynchronous transaction
*/
public class TransactionCommitFailedException extends Exception {
- private static final long serialVersionUID = -6138306275373237068L;
+ private static final long serialVersionUID = 1L;
- protected TransactionCommitFailedException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
+ private final List<RpcError> errorList;
+
+ public TransactionCommitFailedException(final String message, final RpcError... errors) {
+ this(message, null, errors);
}
- public TransactionCommitFailedException(final String message, final Throwable cause) {
+ public TransactionCommitFailedException(final String message, final Throwable cause,
+ final RpcError... errors) {
super(message, cause);
+
+ if( errors != null && errors.length > 0 ) {
+ errorList = ImmutableList.<RpcError>builder().addAll( Arrays.asList( errors ) ).build();
+ }
+ else {
+ // Add a default RpcError.
+ errorList = ImmutableList.of(RpcResultBuilder.newError(ErrorType.APPLICATION, null,
+ getMessage(), null, null, getCause()));
+ }
}
- public TransactionCommitFailedException(final String message) {
- super(message);
+ /**
+ * Returns additional error information about this exception.
+ *
+ * @return a List of RpcErrors. There is always at least one RpcError.
+ */
+ public List<RpcError> getErrorList() {
+ return errorList;
}
+ @Override
+ public String getMessage() {
+ return new StringBuilder( super.getMessage() ).append(", errors: ").append( errorList ).toString();
+ }
}
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification;
import org.opendaylight.yangtools.concepts.Path;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
public abstract class AbstractDataTransaction<P extends Path<P>, D extends Object> extends
AbstractDataModification<P, D> {
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
AbstractDataTransaction<?, ?> other = (AbstractDataTransaction<?, ?>) obj;
if (identifier == null) {
- if (other.identifier != null)
+ if (other.identifier != null) {
return false;
- } else if (!identifier.equals(other.identifier))
+ }
+ } else if (!identifier.equals(other.identifier)) {
return false;
+ }
return true;
}
this.status = status;
this.onStatusChange(status);
}
+
+ public static ListenableFuture<RpcResult<TransactionStatus>> convertToLegacyCommitFuture(
+ CheckedFuture<Void,TransactionCommitFailedException> from ) {
+ return Futures.transform(from, new AsyncFunction<Void, RpcResult<TransactionStatus>>() {
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> apply(Void input) throws Exception {
+ return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>
+ success(TransactionStatus.COMMITED).build());
+ }
+ } );
+ }
}
package org.opendaylight.controller.md.sal.dom.api;
import org.opendaylight.controller.md.sal.common.api.data.AsyncReadTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * A transaction that provides read access to a logical data store.
+ * <p>
+ * For more information on usage and examples, please see the documentation in {@link AsyncReadTransaction}.
+ */
public interface DOMDataReadTransaction extends AsyncReadTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
+ /**
+ * Reads data from provided logical data store located at the provided path.
+ *<p>
+ * If the target is a subtree, then the whole subtree is read (and will be
+ * accessible from the returned data object).
+ *
+ * @param store
+ * Logical data store from which read should occur.
+ * @param path
+ * Path which uniquely identifies subtree which client want to
+ * read
+ * @return Listenable Future which contains read result
+ * <ul>
+ * <li>If data at supplied path exists the
+ * {@link ListeblaFuture#get()} returns Optional object containing
+ * data once read is done.
+ * <li>If data at supplied path does not exists the
+ * {@link ListenbleFuture#get()} returns {@link Optional#absent()}.
+ * </ul>
+ */
+ ListenableFuture<Optional<NormalizedNode<?,?>>> read(LogicalDatastoreType store,InstanceIdentifier path);
}
package org.opendaylight.controller.md.sal.dom.api;
import org.opendaylight.controller.md.sal.common.api.data.AsyncWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+/**
+ * A transaction that provides mutation capabilities on a data tree.
+ * <p>
+ * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
+ */
public interface DOMDataWriteTransaction extends AsyncWriteTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
+ /**
+ * Stores a piece of data at the specified path. This acts as an add / replace
+ * operation, which is to say that whole subtree will be replaced by the specified data.
+ * <p>
+ * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
+ * <p>
+ * If you need to make sure that a parent object exists but you do not want modify
+ * its pre-existing state by using put, consider using {@link #merge} instead.
+ *
+ * @param store
+ * the logical data store which should be modified
+ * @param path
+ * the data object path
+ * @param data
+ * the data object to be written to the specified path
+ * @throws IllegalStateException
+ * if the transaction has already been submitted
+ */
+ void put(LogicalDatastoreType store, InstanceIdentifier path, NormalizedNode<?, ?> data);
+
+ /**
+ * Merges a piece of data with the existing data at a specified path. Any pre-existing data
+ * which is not explicitly overwritten will be preserved. This means that if you store a container,
+ * its child lists will be merged.
+ * <p>
+ * For more information on usage and examples, please see the documentation in {@link AsyncWriteTransaction}.
+ *<p>
+ * If you require an explicit replace operation, use {@link #put} instead.
+ *
+ * @param store
+ * the logical data store which should be modified
+ * @param path
+ * the data object path
+ * @param data
+ * the data object to be merged to the specified path
+ * @throws IllegalStateException
+ * if the transaction has already been submitted
+ */
+ void merge(LogicalDatastoreType store, InstanceIdentifier path, NormalizedNode<?, ?> data);
}
* <li> {@link DOMDataWriteTransaction#commit()} - results in invoking
* {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts
* and then invoking finalized implementation callback
- * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which
+ * {@link #submit(DOMDataWriteTransaction, Iterable)} with transaction which
* was commited and gathered results.
* </ul>
*
* <li> {@link DOMDataWriteTransaction#commit()} - results in invoking
* {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts
* and then invoking finalized implementation callback
- * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which
+ * {@link #submit(DOMDataWriteTransaction, Iterable)} with transaction which
* was commited and gathered results.
* <li>
* </ul>
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DOMStore> implements DOMDataBroker,
}
@Override
- public ListenableFuture<RpcResult<TransactionStatus>> commit(final DOMDataWriteTransaction transaction,
+ public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
* NormalizedNode implementation of {@link org.opendaylight.controller.md.sal.common.api.data.TransactionChain} which is backed
}
@Override
- public synchronized ListenableFuture<RpcResult<TransactionStatus>> commit(
+ public synchronized CheckedFuture<Void,TransactionCommitFailedException> submit(
final DOMDataWriteTransaction transaction, final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> of(this));
}
import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
+ public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
Preconditions.checkArgument(listener != null, "Listener must not be null");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
- ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
+ ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
transaction, cohorts, listener));
if (listener.isPresent()) {
Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
}
- return commitFuture;
+
+ return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
/**
* support of cancelation.
*
*/
- private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
+ private static class CommitCoordinationTask implements Callable<Void> {
private final DOMDataWriteTransaction tx;
private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
}
@Override
- public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
+ public Void call() throws TransactionCommitFailedException {
try {
canCommitBlocking();
preCommitBlocking();
- return commitBlocking();
+ commitBlocking();
+ return null;
} catch (TransactionCommitFailedException e) {
LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
abortBlocking(e);
* If one of cohorts failed preCommit
*
*/
- private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
+ private void commitBlocking() throws TransactionCommitFailedException {
commitAll().checkedGet();
- return RpcResultBuilder.<TransactionStatus>success(TransactionStatus.COMMITED).build();
}
/**
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
* callback is invoked with associated transaction and throwable is invoked on listener.
*
*/
-class DOMDataCommitErrorInvoker implements FutureCallback<RpcResult<TransactionStatus>> {
+class DOMDataCommitErrorInvoker implements FutureCallback<Void> {
private final DOMDataWriteTransaction tx;
private final DOMDataCommitErrorListener listener;
}
@Override
- public void onSuccess(RpcResult<TransactionStatus> result) {
+ public void onSuccess(Void result) {
// NOOP
}
}
\ No newline at end of file
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
* Executor of Three Phase Commit coordination for
* subtransactoins.
* @param listener
* Error listener which should be notified if transaction failed.
- * @return ListenableFuture which contains RpcResult with
- * {@link TransactionStatus#COMMITED} if commit coordination on
- * cohorts finished successfully.
+ * @return a CheckedFuture. if commit coordination on cohorts finished successfully,
+ * nothing is returned from the Future, On failure,
+ * the Future fails with a {@link TransactionCommitFailedException}.
*
*/
- ListenableFuture<RpcResult<TransactionStatus>> submit(DOMDataWriteTransaction tx,
+ CheckedFuture<Void,TransactionCommitFailedException> submit(DOMDataWriteTransaction tx,
Iterable<DOMStoreThreePhaseCommitCohort> cohort, Optional<DOMDataCommitErrorListener> listener);
}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
*
public interface DOMDataCommitImplementation {
/**
- * User-supplied implementation of {@link DOMDataWriteTransaction#commit()}
+ * User-supplied implementation of {@link DOMDataWriteTransaction#submit()}
* for transaction.
*
- * Callback invoked when {@link DOMDataWriteTransaction#commit()} is invoked
+ * Callback invoked when {@link DOMDataWriteTransaction#submit()} is invoked
* on transaction created by this factory.
*
* @param transaction
* commited transaction.
*
*/
- ListenableFuture<RpcResult<TransactionStatus>> commit(final DOMDataWriteTransaction transaction,
+ CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts);
}
* <li>{@link #merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
* </ul>
* {@link #commit()} will result in invocation of
- * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
+ * {@link DOMDataCommitImplementation#submit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
* invocation with all {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort} for underlying
* transactions.
*
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
/**
* </ul>
* <p>
* {@link #commit()} will result in invocation of
- * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
+ * {@link DOMDataCommitImplementation#submit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
* invocation with all {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort} for underlying
* transactions.
*
*
*/
@GuardedBy("this")
- private volatile ListenableFuture<RpcResult<TransactionStatus>> commitFuture;
+ private volatile CheckedFuture<Void, TransactionCommitFailedException> commitFuture;
protected DOMForwardedWriteTransaction(final Object identifier,
final ImmutableMap<LogicalDatastoreType, T> backingTxs, final DOMDataCommitImplementation commitImpl) {
@Override
public synchronized ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ return AbstractDataTransaction.convertToLegacyCommitFuture(submit());
+ }
+
+ @Override
+ public CheckedFuture<Void,TransactionCommitFailedException> submit() {
checkNotReady();
ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
cohortsBuilder.add(subTx.ready());
}
ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts = cohortsBuilder.build();
- commitFuture = commitImpl.commit(this, cohorts);
+ commitFuture = commitImpl.submit(this, cohorts);
/*
*We remove reference to Commit Implementation in order
private void checkNotCommited() {
checkState(commitFuture == null, "Transaction was already submited.");
}
-
}
\ No newline at end of file
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
public Future<RpcResult<TransactionStatus>> commit() {
Preconditions.checkState(status == TransactionStatus.NEW);
status = TransactionStatus.SUBMITED;
- return getDelegate().commit();
+ return AbstractDataTransaction.convertToLegacyCommitFuture(getDelegate().submit());
}
@Override
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.dom.broker;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.BrokerService;
-import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;
-import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
-import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
-import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
-import org.opendaylight.controller.sal.core.api.notify.NotificationService;
-import org.opendaylight.controller.sal.core.spi.BrokerModule;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-
-public class NotificationModule implements BrokerModule {
- private static Logger log = LoggerFactory
- .getLogger(NotificationModule.class);
-
- private final Multimap<QName, NotificationListener> listeners = HashMultimap
- .create();
-
- private static final Set<Class<? extends BrokerService>> PROVIDED_SERVICE_TYPE = ImmutableSet
- .<Class<? extends BrokerService>>of(NotificationService.class,
- NotificationPublishService.class);
-
- private static final Set<Class<? extends ConsumerFunctionality>> SUPPORTED_CONSUMER_FUNCTIONALITY = ImmutableSet
- .of((Class<? extends ConsumerFunctionality>) NotificationListener.class,
- NotificationListener.class); // Workaround: if we use the
- // version of method with only
- // one argument, the generics
- // inference will not work
-
- @Override
- public Set<Class<? extends BrokerService>> getProvidedServices() {
- return PROVIDED_SERVICE_TYPE;
- }
-
- @Override
- public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {
- return SUPPORTED_CONSUMER_FUNCTIONALITY;
- }
-
- @Override
- public <T extends BrokerService> T getServiceForSession(Class<T> service,
- ConsumerSession session) {
- if (NotificationPublishService.class.equals(service)
- && session instanceof ProviderSession) {
- @SuppressWarnings("unchecked")
- T ret = (T) newNotificationPublishService(session);
- return ret;
- } else if (NotificationService.class.equals(service)) {
-
- @SuppressWarnings("unchecked")
- T ret = (T) newNotificationConsumerService(session);
- return ret;
- }
-
- throw new IllegalArgumentException(
- "The requested session-specific service is not provided by this module.");
- }
-
- private void sendNotification(CompositeNode notification) {
- QName type = notification.getNodeType();
- Collection<NotificationListener> toNotify = listeners.get(type);
- log.trace("Publishing notification " + type);
-
- if (toNotify == null) {
- // No listeners were registered - returns.
- return;
- }
-
- for (NotificationListener listener : toNotify) {
- try {
- // FIXME: ensure that notification is immutable
- listener.onNotification(notification);
- } catch (Exception e) {
- log.error("Uncaught exception in NotificationListener", e);
- }
- }
-
- }
-
- private NotificationService newNotificationConsumerService(
- ConsumerSession session) {
- return new NotificationConsumerSessionImpl();
- }
-
- private NotificationPublishService newNotificationPublishService(
- ConsumerSession session) {
- return new NotificationProviderSessionImpl();
- }
-
- private class NotificationConsumerSessionImpl implements
- NotificationService {
-
- private final Multimap<QName, NotificationListener> consumerListeners = HashMultimap
- .create();
- private boolean closed = false;
-
-
- @Override
- public Registration<NotificationListener> addNotificationListener(QName notification,
- NotificationListener listener) {
- checkSessionState();
- if (notification == null) {
- throw new IllegalArgumentException(
- "Notification type must not be null.");
- }
- if (listener == null) {
- throw new IllegalArgumentException("Listener must not be null.");
- }
-
- consumerListeners.put(notification, listener);
- listeners.put(notification, listener);
- log.trace("Registered listener for notification: " + notification);
- return null; // Return registration Object.
- }
-
- public void removeNotificationListener(QName notification,
- NotificationListener listener) {
- checkSessionState();
- if (notification == null) {
- throw new IllegalArgumentException(
- "Notification type must not be null.");
- }
- if (listener == null) {
- throw new IllegalArgumentException("Listener must not be null.");
- }
- consumerListeners.remove(notification, listener);
- listeners.remove(notification, listener);
- }
-
- public void closeSession() {
- closed = true;
- Map<QName, Collection<NotificationListener>> toRemove = consumerListeners
- .asMap();
- for (Entry<QName, Collection<NotificationListener>> entry : toRemove
- .entrySet()) {
- listeners.remove(entry.getKey(), entry.getValue());
- }
- }
-
- protected void checkSessionState() {
- if (closed)
- throw new IllegalStateException("Session is closed");
- }
- }
-
- private class NotificationProviderSessionImpl extends
- NotificationConsumerSessionImpl implements
- NotificationPublishService {
-
- @Override
- public void publish(CompositeNode notification) {
- checkSessionState();
- if (notification == null)
- throw new IllegalArgumentException(
- "Notification must not be null.");
- NotificationModule.this.sendNotification(notification);
- }
- }
-
- @Override
- public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {
- return Collections.emptySet();
- }
-}
public List<ListenableFuture<?>> call() throws Exception {
List<ListenableFuture<?>> builder = new ArrayList<>(txNum);
for (DOMDataReadWriteTransaction tx :transactions) {
- builder.add(tx.commit());
+ builder.add(tx.submit());
}
return builder;
}
measure("Txs:1 Submit", new Callable<ListenableFuture<?>>() {
@Override
public ListenableFuture<?> call() throws Exception {
- return writeTx.commit();
+ return writeTx.submit();
}
}).get();
return null;
TestModel.TEST_PATH);
assertTrue(writeTxContainer.get().isPresent());
- writeTx.commit().get();
+ writeTx.submit().get();
Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
.read(OPERATIONAL, TestModel.TEST_PATH).get();
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
* First transaction is marked as ready, we are able to allocate chained
* transactions
*/
- ListenableFuture<RpcResult<TransactionStatus>> firstWriteTxFuture = firstTx.commit();
+ ListenableFuture<Void> firstWriteTxFuture = firstTx.submit();
/**
* We alocate chained transaction - read transaction.
/**
* third transaction is sealed and commited
*/
- ListenableFuture<RpcResult<TransactionStatus>> thirdDeleteTxFuture = thirdDeleteTx.commit();
+ ListenableFuture<Void> thirdDeleteTxFuture = thirdDeleteTx.submit();
assertCommitSuccessful(thirdDeleteTxFuture);
/**
return tx;
}
- private static void assertCommitSuccessful(final ListenableFuture<RpcResult<TransactionStatus>> future)
+ private static void assertCommitSuccessful(final ListenableFuture<Void> future)
throws InterruptedException, ExecutionException {
- RpcResult<TransactionStatus> rpcResult = future.get();
- assertTrue(rpcResult.isSuccessful());
- assertEquals(TransactionStatus.COMMITED, rpcResult.getResult());
+ future.get();
}
private static void assertTestContainerExists(final DOMDataReadTransaction readTx) throws InterruptedException,
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.rpcservice.rev140701.RockTheHouseInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
+import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
+import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class SchemaUpdateForTransactionTest {
+
+ private static final InstanceIdentifier TOP_PATH = InstanceIdentifier.of(Top.QNAME);
+ private SchemaContext schemaContext;
+ private InMemoryDOMDataStore domStore;
+
+ @Before
+ public void setupStore() {
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+ loadSchemas(RockTheHouseInput.class);
+ }
+
+ public void loadSchemas(final Class<?>... classes) {
+ YangModuleInfo moduleInfo;
+ try {
+ ModuleInfoBackedContext context = ModuleInfoBackedContext.create();
+ for (Class<?> clz : classes) {
+ moduleInfo = BindingReflections.getModuleInfo(clz);
+
+ context.registerModuleInfo(moduleInfo);
+ }
+ schemaContext = context.tryToCreateSchemaContext().get();
+ domStore.onGlobalContextUpdated(schemaContext);
+ } catch (Exception e) {
+ Throwables.propagateIfPossible(e);
+ }
+ }
+
+ /**
+ * Test suite tests allocating transaction when schema context
+ * does not contain module necessary for client write,
+ * then triggering update of global schema context
+ * and then performing write (according to new module).
+ *
+ * If transaction between allocation and schema context was
+ * unmodified, it is safe to change its schema context
+ * to new one (e.g. it will be same as if allocated after
+ * schema context update.)
+ *
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ @Test
+ public void testTransactionSchemaUpdate() throws InterruptedException, ExecutionException {
+
+ assertNotNull(domStore);
+
+ // We allocate transaction, initial schema context does not
+ // contain Lists model
+ DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+ assertNotNull(writeTx);
+
+ // we trigger schema context update to contain Lists model
+ loadSchemas(RockTheHouseInput.class, Top.class);
+
+ /**
+ *
+ * Writes /test in writeTx, this write should not fail
+ * with IllegalArgumentException since /test is in
+ * schema context.
+ *
+ */
+ writeTx.write(TOP_PATH, ImmutableNodes.containerNode(Top.QNAME));
+
+ }
+
+}
*/
package org.opendaylight.controller.sal.connect.netconf;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
-
import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.controller.sal.connect.api.MessageTransformer;
import org.opendaylight.controller.sal.connect.api.RemoteDevice;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
/**
* This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
*/
private final MessageTransformer<NetconfMessage> messageTransformer;
private final SchemaContextProviderFactory schemaContextProviderFactory;
private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+ private final NotificationHandler notificationHandler;
public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
this.sourceProviderFactory = sourceProviderFactory;
this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
this.schemaContextProviderFactory = schemaContextProviderFactory;
+ this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
}
@Override
final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
updateMessageTransformer(schemaContextProvider);
salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
+ notificationHandler.onRemoteSchemaUp();
}
});
@Override
public void onNotification(final NetconfMessage notification) {
- final CompositeNode parsedNotification = messageTransformer.toNotification(notification);
- salFacade.onNotification(parsedNotification);
+ notificationHandler.handleNotification(notification);
+ }
+
+ /**
+ * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
+ */
+ private final static class NotificationHandler {
+
+ private final RemoteDeviceHandler<?> salFacade;
+ private final List<NetconfMessage> cache = new LinkedList<>();
+ private final MessageTransformer<NetconfMessage> messageTransformer;
+ private boolean passNotifications = false;
+ private final RemoteDeviceId id;
+
+ NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
+ this.salFacade = salFacade;
+ this.messageTransformer = messageTransformer;
+ this.id = id;
+ }
+
+ synchronized void handleNotification(final NetconfMessage notification) {
+ if(passNotifications) {
+ passNotification(messageTransformer.toNotification(notification));
+ } else {
+ cacheNotification(notification);
+ }
+ }
+
+ /**
+ * Forward all cached notifications and pass all notifications from this point directly to sal facade.
+ */
+ synchronized void onRemoteSchemaUp() {
+ passNotifications = true;
+
+ for (final NetconfMessage cachedNotification : cache) {
+ passNotification(messageTransformer.toNotification(cachedNotification));
+ }
+
+ cache.clear();
+ }
+
+ private void cacheNotification(final NetconfMessage notification) {
+ Preconditions.checkState(passNotifications == false);
+
+ logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
+ if(logger.isTraceEnabled()) {
+ logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
+ }
+
+ cache.add(notification);
+ }
+
+ private void passNotification(final CompositeNode parsedNotification) {
+ logger.debug("{}: Forwarding notification {}", id, parsedNotification);
+ Preconditions.checkNotNull(parsedNotification);
+ salFacade.onNotification(parsedNotification);
+ }
+
}
}
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.ModifyAction;
import org.opendaylight.yangtools.yang.data.api.Node;
import org.opendaylight.yangtools.yang.data.api.SimpleNode;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
}
private void sendMerge(final InstanceIdentifier key, final CompositeNode value) throws InterruptedException, ExecutionException {
- sendEditRpc(createEditConfigStructure(key, Optional.<String>absent(), Optional.of(value)), Optional.<String>absent());
+ sendEditRpc(createEditConfigStructure(key, Optional.<ModifyAction>absent(), Optional.of(value)), Optional.<ModifyAction>absent());
}
private void sendDelete(final InstanceIdentifier toDelete) throws InterruptedException, ExecutionException {
- // FIXME use org.opendaylight.yangtools.yang.data.api.ModifyAction instead of strings
- // TODO add string lowercase value to ModifyAction enum entries
- sendEditRpc(createEditConfigStructure(toDelete, Optional.of("delete"), Optional.<CompositeNode>absent()), Optional.of("none"));
+ sendEditRpc(createEditConfigStructure(toDelete, Optional.of(ModifyAction.DELETE), Optional.<CompositeNode>absent()), Optional.of(ModifyAction.NONE));
}
- private void sendEditRpc(final CompositeNode editStructure, final Optional<String> defaultOperation) throws InterruptedException, ExecutionException {
+ private void sendEditRpc(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) throws InterruptedException, ExecutionException {
final ImmutableCompositeNode editConfigRequest = createEditConfigRequest(editStructure, defaultOperation);
final RpcResult<CompositeNode> rpcResult = rpc.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, editConfigRequest).get();
}
}
- private ImmutableCompositeNode createEditConfigRequest(final CompositeNode editStructure, final Optional<String> defaultOperation) {
+ private ImmutableCompositeNode createEditConfigRequest(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) {
final CompositeNodeBuilder<ImmutableCompositeNode> ret = ImmutableCompositeNode.builder();
// Target
// Default operation
if(defaultOperation.isPresent()) {
- SimpleNode<String> defOp = NodeFactory.createImmutableSimpleNode(NETCONF_DEFAULT_OPERATION_QNAME, null, defaultOperation.get());
+ final SimpleNode<String> defOp = NodeFactory.createImmutableSimpleNode(NETCONF_DEFAULT_OPERATION_QNAME, null, modifyOperationToXmlString(defaultOperation.get()));
ret.add(defOp);
}
return ret.toInstance();
}
- private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<String> operation,
+ private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<ModifyAction> operation,
final Optional<CompositeNode> lastChildOverride) {
Preconditions.checkArgument(Iterables.isEmpty(dataPath.getPathArguments()) == false, "Instance identifier with empty path %s", dataPath);
return predicates;
}
- private CompositeNode getDeepestEditElement(final PathArgument arg, final Optional<String> operation, final Optional<CompositeNode> lastChildOverride) {
+ private CompositeNode getDeepestEditElement(final PathArgument arg, final Optional<ModifyAction> operation, final Optional<CompositeNode> lastChildOverride) {
final CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
builder.setQName(arg.getNodeType());
addPredicatesToCompositeNodeBuilder(predicates, builder);
if (operation.isPresent()) {
- builder.setAttribute(NETCONF_OPERATION_QNAME, operation.get());
+ builder.setAttribute(NETCONF_OPERATION_QNAME, modifyOperationToXmlString(operation.get()));
}
if (lastChildOverride.isPresent()) {
final List<Node<?>> children = lastChildOverride.get().getValue();
return builder.toInstance();
}
+ private String modifyOperationToXmlString(final ModifyAction operation) {
+ return operation.name().toLowerCase();
+ }
+
/**
* Send commit rpc to finish the transaction
* In case of failure or unexpected error response, ExecutionException is thrown
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.InputStream;
Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected();
}
+ @Test
+ public void testNotificationBeforeSchema() throws Exception {
+ final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+ final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+
+ final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
+ final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, getSchemaContextProviderFactory(), getSourceProviderFactory());
+
+ device.onNotification(netconfMessage);
+ device.onNotification(netconfMessage);
+
+ verify(facade, times(0)).onNotification(any(CompositeNode.class));
+
+ final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+ Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION));
+
+ device.onRemoteSessionUp(sessionCaps, listener);
+
+ verify(messageTransformer, timeout(10000).times(2)).toNotification(netconfMessage);
+ verify(facade, times(2)).onNotification(compositeNode);
+
+ device.onNotification(netconfMessage);
+ verify(messageTransformer, times(3)).toNotification(netconfMessage);
+ verify(facade, times(3)).onNotification(compositeNode);
+ }
+
@Test
public void testNetconfDeviceReconnect() throws Exception {
final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
final RemoteDeviceHandler<NetconfSessionCapabilities> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContextProvider.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
doNothing().when(remoteDeviceHandler).onDeviceDisconnected();
+ doNothing().when(remoteDeviceHandler).onNotification(any(CompositeNode.class));
return remoteDeviceHandler;
}
final MessageTransformer<NetconfMessage> messageTransformer = mockClass(MessageTransformer.class);
doReturn(netconfMessage).when(messageTransformer).toRpcRequest(any(QName.class), any(CompositeNode.class));
doReturn(rpcResultC).when(messageTransformer).toRpcResult(any(NetconfMessage.class), any(QName.class));
+ doReturn(compositeNode).when(messageTransformer).toNotification(any(NetconfMessage.class));
doNothing().when(messageTransformer).onGlobalContextUpdated(any(SchemaContext.class));
return messageTransformer;
}
doReturn(Futures.immediateFuture(rpcResult)).when(remoteDeviceCommunicator).sendRequest(any(NetconfMessage.class), any(QName.class));
return remoteDeviceCommunicator;
}
-}
\ No newline at end of file
+}
*/
package org.opendaylight.controller.cluster.datastore.util;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.net.URI;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.TransformerFactoryConfigurationError;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
import org.custommonkey.xmlunit.Diff;
import org.custommonkey.xmlunit.XMLUnit;
import org.junit.Test;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.TransformerFactoryConfigurationError;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.URI;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/**
}
}
- public static DataSchemaNode getSchemaNode(SchemaContext context,
- String moduleName, String childNodeName) {
+ public static DataSchemaNode getSchemaNode(final SchemaContext context,
+ final String moduleName, final String childNodeName) {
for (Module module : context.getModules()) {
if (module.getName().equals(moduleName)) {
DataSchemaNode found =
+ childNodeName);
}
- static DataSchemaNode findChildNode(Set<DataSchemaNode> children, String name) {
+ static DataSchemaNode findChildNode(final Set<DataSchemaNode> children, final String name) {
List<DataNodeContainer> containers = Lists.newArrayList();
for (DataSchemaNode dataSchemaNode : children) {
- if (dataSchemaNode.getQName().getLocalName().equals(name))
+ if (dataSchemaNode.getQName().getLocalName().equals(name)) {
return dataSchemaNode;
+ }
if (dataSchemaNode instanceof DataNodeContainer) {
containers.add((DataNodeContainer) dataSchemaNode);
} else if (dataSchemaNode instanceof ChoiceNode) {
}
private static InstanceIdentifier.NodeIdentifier getNodeIdentifier(
- String localName) {
- return new InstanceIdentifier.NodeIdentifier(new QName(
+ final String localName) {
+ return new InstanceIdentifier.NodeIdentifier(QName.create(
URI.create(NAMESPACE), revision, localName));
}
public static InstanceIdentifier.AugmentationIdentifier getAugmentIdentifier(
- String... childNames) {
+ final String... childNames) {
Set<QName> qn = Sets.newHashSet();
for (String childName : childNames) {
- public void init(String yangPath, String xmlPath, ContainerNode expectedNode)
+ public void init(final String yangPath, final String xmlPath, final ContainerNode expectedNode)
throws Exception {
SchemaContext schema = parseTestSchema(yangPath);
this.xmlPath = xmlPath;
this.expectedNode = expectedNode;
}
- SchemaContext parseTestSchema(String yangPath) throws Exception {
+ SchemaContext parseTestSchema(final String yangPath) throws Exception {
YangParserImpl yangParserImpl = new YangParserImpl();
InputStream stream =
.parse(Collections.singletonList(doc.getDocumentElement()),
containerNode);
- if (expectedNode != null)
- junit.framework.Assert.assertEquals(expectedNode, built);
+ if (expectedNode != null) {
+ junit.framework.Assert.assertEquals(expectedNode, built);
+ }
logger.info("{}", built);
System.out.println(toString(doc.getDocumentElement()));
System.out.println(toString(el));
- boolean diff =
- new Diff(
- XMLUnit.buildControlDocument(toString(doc.getDocumentElement())),
- XMLUnit.buildTestDocument(toString(el))).similar();
+ new Diff(
+ XMLUnit.buildControlDocument(toString(doc.getDocumentElement())),
+ XMLUnit.buildTestDocument(toString(el))).similar();
}
private static ContainerNode listLeafListWithAttributes() {
.parse(Collections.singletonList(doc.getDocumentElement()),
containerNode);
- if (expectedNode != null)
- junit.framework.Assert.assertEquals(expectedNode, built);
+ if (expectedNode != null) {
+ junit.framework.Assert.assertEquals(expectedNode, built);
+ }
logger.info("{}", built);
System.out.println(toString(doc.getDocumentElement()));
System.out.println(toString(el));
- boolean diff =
- new Diff(
- XMLUnit.buildControlDocument(toString(doc.getDocumentElement())),
- XMLUnit.buildTestDocument(toString(el))).similar();
+ new Diff(
+ XMLUnit.buildControlDocument(toString(doc.getDocumentElement())),
+ XMLUnit.buildTestDocument(toString(el))).similar();
}
- private Document loadDocument(String xmlPath) throws Exception {
+ private Document loadDocument(final String xmlPath) throws Exception {
InputStream resourceAsStream =
NormalizedNodeXmlConverterTest.class.getResourceAsStream(xmlPath);
BUILDERFACTORY = factory;
}
- private Document readXmlToDocument(InputStream xmlContent)
+ private Document readXmlToDocument(final InputStream xmlContent)
throws IOException, SAXException {
DocumentBuilder dBuilder;
try {
return doc;
}
- public static String toString(Element xml) {
+ public static String toString(final Element xml) {
try {
Transformer transformer =
TransformerFactory.newInstance().newTransformer();
System.out.println(toString(convertedDoc.getDocumentElement()));
XMLUnit.setIgnoreWhitespace(true);
XMLUnit.setIgnoreComments(true);
- boolean diff =
- new Diff(XMLUnit.buildControlDocument(toString(expectedDoc
- .getDocumentElement())),
- XMLUnit.buildTestDocument(toString(convertedDoc
- .getDocumentElement()))).similar();
+ new Diff(XMLUnit.buildControlDocument(toString(expectedDoc
+ .getDocumentElement())),
+ XMLUnit.buildTestDocument(toString(convertedDoc
+ .getDocumentElement()))).similar();
System.out.println(toString(expectedDoc.getDocumentElement()));
}
System.out.println(toString(convertedDoc.getDocumentElement()));
XMLUnit.setIgnoreWhitespace(true);
XMLUnit.setIgnoreComments(true);
- boolean diff =
- new Diff(XMLUnit.buildControlDocument(toString(expectedDoc
- .getDocumentElement())),
- XMLUnit.buildTestDocument(toString(convertedDoc
- .getDocumentElement()))).similar();
+ new Diff(XMLUnit.buildControlDocument(toString(expectedDoc
+ .getDocumentElement())),
+ XMLUnit.buildTestDocument(toString(convertedDoc
+ .getDocumentElement()))).similar();
System.out.println(toString(expectedDoc.getDocumentElement()));
// now we will try to convert xml back to normalize node.
if (input instanceof IdentityValuesDTO) {
return identityrefCodec.deserialize(input);
}
- logger.info(
+ logger.debug(
"Value is not instance of IdentityrefTypeDefinition but is {}. Therefore NULL is used as translation of - {}",
input == null ? "null" : input.getClass(), String.valueOf(input));
return null;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
+
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.BeforeClass;
import org.opendaylight.yangtools.yang.data.api.Node;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
public class RestGetOperationTest extends JerseyTest {
static class NodeData {
private static SchemaContext schemaContextModules;
private static SchemaContext schemaContextBehindMountPoint;
+ private static final String RESTCONF_NS = "urn:ietf:params:xml:ns:yang:ietf-restconf";
+
@BeforeClass
public static void init() throws FileNotFoundException {
schemaContextYangsIetf = TestUtils.loadSchemaContext("/full-versions/yangs");
validateModulesResponseJson(response);
response = target(uri).request("application/yang.api+xml").get();
- validateModulesResponseXml(response);
+ validateModulesResponseXml(response,schemaContextModules);
}
// /streams/
assertTrue(responseBody.contains("streams"));
response = target(uri).request("application/yang.api+xml").get();
- responseBody = response.readEntity(String.class);
- assertNotNull(responseBody);
- assertTrue(responseBody.contains("<streams xmlns=\"urn:ietf:params:xml:ns:yang:ietf-restconf\""));
+ Document responseXmlBody = response.readEntity(Document.class);
+ assertNotNull(responseXmlBody);
+ Element rootNode = responseXmlBody.getDocumentElement();
+
+ assertEquals("streams", rootNode.getLocalName());
+ assertEquals(RESTCONF_NS, rootNode.getNamespaceURI());
}
// /modules/module
Response response = target(uri).request("application/yang.api+xml").get();
assertEquals(200, response.getStatus());
- String responseBody = response.readEntity(String.class);
- assertTrue("Module2 in xml wasn't found", prepareXmlRegex("module2", "2014-01-02", "module:2", responseBody)
- .find());
- String[] split = responseBody.split("<module");
- assertEquals("<module element is returned more then once", 2, split.length);
+ Document responseXml = response.readEntity(Document.class);
+
+
+
+ QName qname = assertedModuleXmlToModuleQName(responseXml.getDocumentElement());
+ assertNotNull(qname);
+
+ assertEquals("module2", qname.getLocalName());
+ assertEquals("module:2", qname.getNamespace().toString());
+ assertEquals("2014-01-02", qname.getFormattedRevision());
response = target(uri).request("application/yang.api+json").get();
assertEquals(200, response.getStatus());
- responseBody = response.readEntity(String.class);
+ String responseBody = response.readEntity(String.class);
assertTrue("Module2 in json wasn't found", prepareJsonRegex("module2", "2014-01-02", "module:2", responseBody)
.find());
- split = responseBody.split("\"module\"");
+ String[] split = responseBody.split("\"module\"");
assertEquals("\"module\" element is returned more then once", 2, split.length);
}
Response response = target(uri).request("application/yang.api+xml").get();
assertEquals(200, response.getStatus());
- String responseBody = response.readEntity(String.class);
- assertTrue("Xml response for /operations dummy-rpc1-module1 is incorrect",
- validateOperationsResponseXml(responseBody, "dummy-rpc1-module1", "module:1").find());
- assertTrue("Xml response for /operations dummy-rpc2-module1 is incorrect",
- validateOperationsResponseXml(responseBody, "dummy-rpc2-module1", "module:1").find());
- assertTrue("Xml response for /operations dummy-rpc1-module2 is incorrect",
- validateOperationsResponseXml(responseBody, "dummy-rpc1-module2", "module:2").find());
- assertTrue("Xml response for /operations dummy-rpc2-module2 is incorrect",
- validateOperationsResponseXml(responseBody, "dummy-rpc2-module2", "module:2").find());
+ Document responseDoc = response.readEntity(Document.class);
+ validateOperationsResponseXml(responseDoc, schemaContextModules);
response = target(uri).request("application/yang.api+json").get();
assertEquals(200, response.getStatus());
- responseBody = response.readEntity(String.class);
+ String responseBody = response.readEntity(String.class);
assertTrue("Json response for /operations dummy-rpc1-module1 is incorrect",
validateOperationsResponseJson(responseBody, "dummy-rpc1-module1", "module1").find());
assertTrue("Json response for /operations dummy-rpc2-module1 is incorrect",
}
+ private void validateOperationsResponseXml(final Document responseDoc, final SchemaContext schemaContext) {
+ Element operationsElem = responseDoc.getDocumentElement();
+ assertEquals(RESTCONF_NS, operationsElem.getNamespaceURI());
+ assertEquals("operations", operationsElem.getLocalName());
+
+
+ HashSet<QName> foundOperations = new HashSet<>();
+
+ NodeList operationsList = operationsElem.getChildNodes();
+ for(int i = 0;i < operationsList.getLength();i++) {
+ org.w3c.dom.Node operation = operationsList.item(i);
+
+ String namespace = operation.getNamespaceURI();
+ String name = operation.getLocalName();
+ QName opQName = QName.create(URI.create(namespace), null, name);
+ foundOperations.add(opQName);
+ }
+
+ for(RpcDefinition schemaOp : schemaContext.getOperations()) {
+ assertTrue(foundOperations.contains(schemaOp.getQName().withoutRevision()));
+ }
+
+ }
+
// /operations/pathToMountPoint/yang-ext:mount
@Test
public void getOperationsBehindMountPointTest() throws FileNotFoundException, UnsupportedEncodingException {
Response response = target(uri).request("application/yang.api+xml").get();
assertEquals(200, response.getStatus());
- String responseBody = response.readEntity(String.class);
- assertTrue("Xml response for /operations/mount_point rpc-behind-module1 is incorrect",
- validateOperationsResponseXml(responseBody, "rpc-behind-module1", "module:1:behind:mount:point").find());
- assertTrue("Xml response for /operations/mount_point rpc-behind-module2 is incorrect",
- validateOperationsResponseXml(responseBody, "rpc-behind-module2", "module:2:behind:mount:point").find());
+
+ Document responseDoc = response.readEntity(Document.class);
+ validateOperationsResponseXml(responseDoc, schemaContextBehindMountPoint);
response = target(uri).request("application/yang.api+json").get();
assertEquals(200, response.getStatus());
- responseBody = response.readEntity(String.class);
+ String responseBody = response.readEntity(String.class);
assertTrue("Json response for /operations/mount_point rpc-behind-module1 is incorrect",
validateOperationsResponseJson(responseBody, "rpc-behind-module1", "module1-behind-mount-point").find());
assertTrue("Json response for /operations/mount_point rpc-behind-module2 is incorrect",
response = target(uri).request("application/yang.api+xml").get();
assertEquals(200, response.getStatus());
- responseBody = response.readEntity(String.class);
- assertTrue(
- "module1-behind-mount-point in json wasn't found",
- prepareXmlRegex("module1-behind-mount-point", "2014-02-03", "module:1:behind:mount:point", responseBody)
- .find());
- assertTrue(
- "module2-behind-mount-point in json wasn't found",
- prepareXmlRegex("module2-behind-mount-point", "2014-02-04", "module:2:behind:mount:point", responseBody)
- .find());
+ validateModulesResponseXml(response, schemaContextBehindMountPoint);
}
response = target(uri).request("application/yang.api+xml").get();
assertEquals(200, response.getStatus());
- responseBody = response.readEntity(String.class);
- assertTrue(
- "module1-behind-mount-point in json wasn't found",
- prepareXmlRegex("module1-behind-mount-point", "2014-02-03", "module:1:behind:mount:point", responseBody)
- .find());
- split = responseBody.split("<module");
- assertEquals("<module element is returned more then once", 2, split.length);
+ Document responseXml = response.readEntity(Document.class);
+
+ QName module = assertedModuleXmlToModuleQName(responseXml.getDocumentElement());
+
+ assertEquals("module1-behind-mount-point", module.getLocalName());
+ assertEquals("2014-02-03", module.getFormattedRevision());
+ assertEquals("module:1:behind:mount:point", module.getNamespace().toString());
+
}
- private void validateModulesResponseXml(final Response response) {
+ private void validateModulesResponseXml(final Response response, final SchemaContext schemaContext) {
assertEquals(200, response.getStatus());
- String responseBody = response.readEntity(String.class);
+ Document responseBody = response.readEntity(Document.class);
+ NodeList moduleNodes = responseBody.getDocumentElement().getElementsByTagNameNS(RESTCONF_NS, "module");
- assertTrue("Module1 in xml wasn't found", prepareXmlRegex("module1", "2014-01-01", "module:1", responseBody)
- .find());
- assertTrue("Module2 in xml wasn't found", prepareXmlRegex("module2", "2014-01-02", "module:2", responseBody)
- .find());
- assertTrue("Module3 in xml wasn't found", prepareXmlRegex("module3", "2014-01-03", "module:3", responseBody)
- .find());
+ assertTrue(moduleNodes.getLength() > 0);
+
+ HashSet<QName> foundModules = new HashSet<>();
+
+ for(int i=0;i < moduleNodes.getLength();i++) {
+ org.w3c.dom.Node module = moduleNodes.item(i);
+
+ QName name = assertedModuleXmlToModuleQName(module);
+ foundModules.add(name);
+ }
+
+ assertAllModules(foundModules,schemaContext);
+ }
+
+ private void assertAllModules(final Set<QName> foundModules, final SchemaContext schemaContext) {
+ for(Module module : schemaContext.getModules()) {
+ QName current = QName.create(module.getQNameModule(),module.getName());
+ assertTrue("Module not found in response.",foundModules.contains(current));
+ }
+
+ }
+
+ private QName assertedModuleXmlToModuleQName(final org.w3c.dom.Node module) {
+ assertEquals("module", module.getLocalName());
+ assertEquals(RESTCONF_NS, module.getNamespaceURI());
+ String revision = null;
+ String namespace = null;
+ String name = null;
+
+
+ NodeList childNodes = module.getChildNodes();
+
+ for(int i =0;i < childNodes.getLength(); i++) {
+ org.w3c.dom.Node child = childNodes.item(i);
+ assertEquals(RESTCONF_NS, child.getNamespaceURI());
+
+ switch(child.getLocalName()) {
+ case "name":
+ assertNull("Name element appeared multiple times",name);
+ name = child.getTextContent().trim();
+ break;
+ case "revision":
+ assertNull("Revision element appeared multiple times",revision);
+ revision = child.getTextContent().trim();
+ break;
+
+ case "namespace":
+ assertNull("Namespace element appeared multiple times",namespace);
+ namespace = child.getTextContent().trim();
+ break;
+ }
+ }
+
+ assertNotNull("Revision was not part of xml",revision);
+ assertNotNull("Module namespace was not part of xml",namespace);
+ assertNotNull("Module identiffier was not part of xml",name);
+
+
+ // TODO Auto-generated method stub
+
+ return QName.create(namespace,revision,name);
}
private void validateModulesResponseJson(final Response response) {
}
- private Matcher prepareXmlRegex(final String module, final String revision, final String namespace,
- final String searchIn) {
- StringBuilder regex = new StringBuilder();
- regex.append("^");
-
- regex.append(".*<module.*");
- regex.append(".*>");
-
- regex.append(".*<name>");
- regex.append(".*" + module);
- regex.append(".*<\\/name>");
-
- regex.append(".*<revision>");
- regex.append(".*" + revision);
- regex.append(".*<\\/revision>");
-
- regex.append(".*<namespace>");
- regex.append(".*" + namespace);
- regex.append(".*<\\/namespace>");
-
- regex.append(".*<\\/module.*>");
-
- regex.append(".*");
- regex.append("$");
-
- Pattern ptrn = Pattern.compile(regex.toString(), Pattern.DOTALL);
- return ptrn.matcher(searchIn);
- }
private void prepareMockForModulesTest(final ControllerContext mockedControllerContext)
throws FileNotFoundException {
getDataWithUriIncludeWhiteCharsParameter("operational");
}
- private void getDataWithUriIncludeWhiteCharsParameter(String target) throws UnsupportedEncodingException {
+ private void getDataWithUriIncludeWhiteCharsParameter(final String target) throws UnsupportedEncodingException {
mockReadConfigurationDataMethod();
String uri = "/" + target + "/ietf-interfaces:interfaces/interface/eth0";
Response response = target(uri).queryParam("prettyPrint", "false").request("application/xml").get();
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.DisplayString;
import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInput;
executor.shutdown();
if (dataProvider != null) {
- WriteTransaction t = dataProvider.newWriteOnlyTransaction();
- t.delete(LogicalDatastoreType.OPERATIONAL,TOASTER_IID);
- ListenableFuture<RpcResult<TransactionStatus>> future = t.commit();
- Futures.addCallback( future, new FutureCallback<RpcResult<TransactionStatus>>() {
+ WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
+ tx.delete(LogicalDatastoreType.OPERATIONAL,TOASTER_IID);
+ Futures.addCallback( tx.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess( RpcResult<TransactionStatus> result ) {
+ public void onSuccess( final Void result ) {
LOG.debug( "Delete Toaster commit result: " + result );
}
@Override
- public void onFailure( Throwable t ) {
+ public void onFailure( final Throwable t ) {
LOG.error( "Delete of Toaster failed", t );
}
} );
}
}
- private Toaster buildToaster( ToasterStatus status ) {
+ private Toaster buildToaster( final ToasterStatus status ) {
// note - we are simulating a device whose manufacture and model are
// fixed (embedded) into the hardware.
final SettableFuture<RpcResult<Void>> futureResult = SettableFuture.create();
- checkStatusAndMakeToast( input, futureResult );
+ checkStatusAndMakeToast( input, futureResult, 2 );
return futureResult;
}
}
private void checkStatusAndMakeToast( final MakeToastInput input,
- final SettableFuture<RpcResult<Void>> futureResult ) {
+ final SettableFuture<RpcResult<Void>> futureResult,
+ final int tries ) {
// Read the ToasterStatus and, if currently Up, try to write the status to Down.
// If that succeeds, then we essentially have an exclusive lock and can proceed
// to make toast.
final ReadWriteTransaction tx = dataProvider.newReadWriteTransaction();
- ListenableFuture<Optional<DataObject>> readFuture =
+ ListenableFuture<Optional<Toaster>> readFuture =
tx.read( LogicalDatastoreType.OPERATIONAL, TOASTER_IID );
- final ListenableFuture<RpcResult<TransactionStatus>> commitFuture =
- Futures.transform( readFuture, new AsyncFunction<Optional<DataObject>,
- RpcResult<TransactionStatus>>() {
+ final ListenableFuture<Void> commitFuture =
+ Futures.transform( readFuture, new AsyncFunction<Optional<Toaster>,Void>() {
@Override
- public ListenableFuture<RpcResult<TransactionStatus>> apply(
- Optional<DataObject> toasterData ) throws Exception {
+ public ListenableFuture<Void> apply(
+ final Optional<Toaster> toasterData ) throws Exception {
ToasterStatus toasterStatus = ToasterStatus.Up;
if( toasterData.isPresent() ) {
- toasterStatus = ((Toaster)toasterData.get()).getToasterStatus();
+ toasterStatus = toasterData.get().getToasterStatus();
}
LOG.debug( "Read toaster status: {}", toasterStatus );
if( outOfBread() ) {
LOG.debug( "Toaster is out of bread" );
- return Futures.immediateFuture( RpcResultBuilder.<TransactionStatus>failed()
- .withRpcError( makeToasterOutOfBreadError() ).build() );
+ return Futures.immediateFailedCheckedFuture(
+ new TransactionCommitFailedException( "", makeToasterOutOfBreadError() ) );
}
LOG.debug( "Setting Toaster status to Down" );
// concurrent toasting.
tx.put( LogicalDatastoreType.OPERATIONAL, TOASTER_IID,
buildToaster( ToasterStatus.Down ) );
- return tx.commit();
+ return tx.submit();
}
LOG.debug( "Oops - already making toast!" );
// Return an error since we are already making toast. This will get
// propagated to the commitFuture below which will interpret the null
// TransactionStatus in the RpcResult as an error condition.
- return Futures.immediateFuture( RpcResultBuilder.<TransactionStatus>failed()
- .withRpcError( makeToasterInUseError() ).build() );
+ return Futures.immediateFailedCheckedFuture(
+ new TransactionCommitFailedException( "", makeToasterInUseError() ) );
}
} );
- Futures.addCallback( commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
+ Futures.addCallback( commitFuture, new FutureCallback<Void>() {
@Override
- public void onSuccess( RpcResult<TransactionStatus> result ) {
- if( result.getResult() == TransactionStatus.COMMITED ) {
-
- // OK to make toast
- currentMakeToastTask.set( executor.submit(
- new MakeToastTask( input, futureResult ) ) );
- } else {
-
- LOG.debug( "Setting error result" );
-
- // Either the transaction failed to commit for some reason or, more likely,
- // the read above returned ToasterStatus.Down. Either way, fail the
- // futureResult and copy the errors.
-
- futureResult.set( RpcResultBuilder.<Void>failed().withRpcErrors(
- result.getErrors() ).build() );
- }
+ public void onSuccess( final Void result ) {
+ // OK to make toast
+ currentMakeToastTask.set( executor.submit( new MakeToastTask( input, futureResult ) ) );
}
@Override
- public void onFailure( Throwable ex ) {
+ public void onFailure( final Throwable ex ) {
if( ex instanceof OptimisticLockFailedException ) {
// Another thread is likely trying to make toast simultaneously and updated the
// status before us. Try reading the status again - if another make toast is
// now in progress, we should get ToasterStatus.Down and fail.
- LOG.debug( "Got OptimisticLockFailedException - trying again" );
+ if( ( tries - 1 ) > 0 ) {
+ LOG.debug( "Got OptimisticLockFailedException - trying again" );
- checkStatusAndMakeToast( input, futureResult );
+ checkStatusAndMakeToast( input, futureResult, tries - 1 );
+ }
+ else {
+ futureResult.set( RpcResultBuilder.<Void> failed()
+ .withError( ErrorType.APPLICATION, ex.getMessage() ).build() );
+ }
} else {
- LOG.error( "Failed to commit Toaster status", ex );
+ LOG.debug( "Failed to commit Toaster status", ex );
- // Got some unexpected error so fail.
+ // Probably already making toast.
futureResult.set( RpcResultBuilder.<Void> failed()
- .withError( ErrorType.APPLICATION, ex.getMessage() ).build() );
+ .withRpcErrors( ((TransactionCommitFailedException)ex).getErrorList() )
+ .build() );
}
}
} );
WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
tx.put( LogicalDatastoreType.OPERATIONAL,TOASTER_IID, buildToaster( ToasterStatus.Up ) );
- ListenableFuture<RpcResult<TransactionStatus>> commitFuture = tx.commit();
-
- Futures.addCallback( commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
+ Futures.addCallback( tx.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess( RpcResult<TransactionStatus> result ) {
- if( result.getResult() != TransactionStatus.COMMITED ) {
- LOG.error( "Failed to update toaster status: " + result.getErrors() );
- }
-
- notifyCallback( result.getResult() == TransactionStatus.COMMITED );
+ public void onSuccess( final Void result ) {
+ notifyCallback( true );
}
@Override
- public void onFailure( Throwable t ) {
+ public void onFailure( final Throwable t ) {
// We shouldn't get an OptimisticLockFailedException (or any ex) as no
// other component should be updating the operational state.
LOG.error( "Failed to update toaster status", t );
notifyCallback( false );
}
- void notifyCallback( boolean result ) {
+ void notifyCallback( final boolean result ) {
if( resultCallback != null ) {
resultCallback.apply( result );
}
setToasterStatusUp( new Function<Boolean,Void>() {
@Override
- public Void apply( Boolean result ) {
+ public Void apply( final Boolean result ) {
currentMakeToastTask.set( null );
package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
import java.lang.ref.SoftReference;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class YangStoreServiceImpl implements YangStoreService {
+ private static final Logger LOG = LoggerFactory.getLogger(YangStoreServiceImpl.class);
+
+ /**
+ * This is a rather interesting locking model. We need to guard against both the
+ * cache expiring from GC and being invalidated by schema context change. The
+ * context can change while we are doing processing, so we do not want to block
+ * it, so no synchronization can happen on the methods.
+ *
+ * So what we are doing is the following:
+ *
+ * We synchronize with GC as usual, using a SoftReference.
+ *
+ * The atomic reference is used to synchronize with {@link #refresh()}, e.g. when
+ * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now
+ * that may happen while the getter is already busy acting on the old schema context,
+ * so it needs to understand that a refresh has happened and retry. To do that, it
+ * attempts a CAS operation -- if it fails, in knows that the SoftReference has
+ * been replaced and thus it needs to retry.
+ *
+ * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally
+ * to stop multiple threads doing the same work.
+ */
+ private final AtomicReference<SoftReference<YangStoreSnapshotImpl>> ref = new AtomicReference<>(new SoftReference<YangStoreSnapshotImpl>(null));
private final SchemaContextProvider service;
- @GuardedBy("this")
- private SoftReference<YangStoreSnapshotImpl> cache = new SoftReference<>(null);
- public YangStoreServiceImpl(SchemaContextProvider service) {
+ public YangStoreServiceImpl(final SchemaContextProvider service) {
this.service = service;
}
@Override
public synchronized YangStoreSnapshotImpl getYangStoreSnapshot() throws YangStoreException {
- YangStoreSnapshotImpl yangStoreSnapshot = cache.get();
- if (yangStoreSnapshot == null) {
- yangStoreSnapshot = new YangStoreSnapshotImpl(service.getSchemaContext());
- cache = new SoftReference<>(yangStoreSnapshot);
+ SoftReference<YangStoreSnapshotImpl> r = ref.get();
+ YangStoreSnapshotImpl ret = r.get();
+
+ while (ret == null) {
+ // We need to be compute a new value
+ ret = new YangStoreSnapshotImpl(service.getSchemaContext());
+
+ if (!ref.compareAndSet(r, new SoftReference<>(ret))) {
+ LOG.debug("Concurrent refresh detected, recomputing snapshot");
+ r = ref.get();
+ ret = null;
+ }
}
- return yangStoreSnapshot;
+
+ return ret;
}
/**
* Called when schema context changes, invalidates cache.
*/
- public synchronized void refresh() {
- cache.clear();
+ public void refresh() {
+ ref.set(new SoftReference<YangStoreSnapshotImpl>(null));
}
}
*/
package org.opendaylight.controller.netconf.cli.commands.local;
-import com.google.common.collect.Lists;
import org.opendaylight.controller.netconf.cli.NetconfDeviceConnectionManager;
import org.opendaylight.controller.netconf.cli.commands.AbstractCommand;
import org.opendaylight.controller.netconf.cli.commands.Command;
import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import com.google.common.collect.Lists;
+
/**
* Local disconnect command
*/
connectionManager.disconnect();
return new Output(new CompositeNodeTOImpl(getCommandId(), null,
- Lists.<Node<?>> newArrayList(new SimpleNodeTOImpl<>(new QName(getCommandId(), "status"), null,
+ Lists.<Node<?>> newArrayList(new SimpleNodeTOImpl<>(QName.create(getCommandId(), "status"), null,
"Connection disconnected"))));
}
*/
package org.opendaylight.controller.netconf.cli.io;
-import com.google.common.collect.Maps;
import java.util.Map;
+
import junit.framework.Assert;
+
import org.junit.Test;
import org.opendaylight.controller.netconf.cli.commands.CommandConstants;
import org.opendaylight.yangtools.yang.common.QName;
+import com.google.common.collect.Maps;
+
public class IOUtilTest {
@Test
public void testQNameFromKeyStringNew() throws Exception {
final String s = IOUtil.qNameToKeyString(CommandConstants.HELP_QNAME, "module");
final Map<String, QName> modulesMap = Maps.newHashMap();
- modulesMap.put("module", new QName(CommandConstants.HELP_QNAME, "module"));
+ modulesMap.put("module", QName.create(CommandConstants.HELP_QNAME, "module"));
final QName qName = IOUtil.qNameFromKeyString(s, modulesMap);
Assert.assertEquals(CommandConstants.HELP_QNAME, qName);
}
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import org.opendaylight.controller.netconf.nettyutil.AbstractChannelInitializer;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.SshHandler;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.Invoker;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshHandler;
import org.opendaylight.protocol.framework.SessionListenerFactory;
final class SshClientChannelInitializer extends AbstractChannelInitializer<NetconfClientSession> {
@Override
public void initialize(final Channel ch, final Promise<NetconfClientSession> promise) {
try {
- final Invoker invoker = Invoker.subsystem("netconf");
- ch.pipeline().addFirst(new SshHandler(authenticationHandler, invoker));
+ ch.pipeline().addFirst(SshHandler.createForNetconfSubsystem(authenticationHandler));
super.initialize(ch,promise);
} catch (final IOException e) {
throw new RuntimeException(e);
this.label = clientLabel;
sessionListener = config.getSessionListener();
Future<NetconfClientSession> clientFuture = netconfClientDispatcher.createClient(config);
- clientSession = get(clientFuture);
+ clientSession = get(clientFuture);//TODO: make static
this.sessionId = clientSession.getSessionId();
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
private void start() {
final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage();
- logger.debug("Session negotiation started with hello message {}", XmlUtil.toString(helloMessage.getDocument()));
+ logger.debug("Session negotiation started with hello message {} on channel {}", XmlUtil.toString(helloMessage.getDocument()), channel);
channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
// Do not fail negotiation if promise is done or canceled
// It would result in setting result of the promise second time and that throws exception
if (isPromiseFinished() == false) {
- // FIXME BUG-1365 calling "negotiation failed" closes the channel, but the channel does not get closed if data is still being transferred
- // Loopback connection initiation might
negotiationFailed(new IllegalStateException("Session was not established after " + timeout));
+ changeState(State.FAILED);
+
+ channel.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(future.isSuccess()) {
+ logger.debug("Channel {} closed: success", future.channel());
+ } else {
+ logger.warn("Channel {} closed: fail", future.channel());
+ }
+ }
+ });
}
-
- changeState(State.FAILED);
} else if(channel.isOpen()) {
channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
}
protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException;
private synchronized void changeState(final State newState) {
- logger.debug("Changing state from : {} to : {}", state, newState);
- Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state,
- newState);
+ logger.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
+ Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s for chanel %s", state,
+ newState, channel);
this.state = newState;
}
package org.opendaylight.controller.netconf.nettyutil.handler;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
+public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder {
-public class NetconfEOMAggregator extends ByteToMessageDecoder {
- private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class);
+ public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(NetconfMessageConstants.END_OF_MESSAGE);
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
- int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE);
- if (index == -1) {
- logger.debug("Message is not complete, read again.");
- if (logger.isTraceEnabled()) {
- String str = in.toString(Charsets.UTF_8);
- logger.trace("Message read so far: {}", str);
- }
- ctx.read();
- } else {
- ByteBuf msg = in.readBytes(index);
- in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length);
- in.discardReadBytes();
- logger.debug("Message is complete.");
- out.add(msg);
- }
+ public NetconfEOMAggregator() {
+ super(Integer.MAX_VALUE, DELIMITER);
}
-
- private int indexOfSequence(ByteBuf in, byte[] sequence) {
- int index = -1;
- for (int i = 0; i < in.readableBytes() - sequence.length + 1; i++) {
- if (in.getByte(i) == sequence[0]) {
- index = i;
- for (int j = 1; j < sequence.length; j++) {
- if (in.getByte(i + j) != sequence[j]) {
- index = -1;
- break;
- }
- }
- if (index != -1) {
- return index;
- }
- }
- }
- return index;
- }
-
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
+import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
- StreamResult result = new StreamResult(new OutputStreamWriter(os));
+ // Wrap OutputStreamWriter with BufferedWriter as suggested in javadoc for OutputStreamWriter
+ StreamResult result = new StreamResult(new BufferedWriter(new OutputStreamWriter(os)));
DOMSource source = new DOMSource(msg.getDocument());
transformer.transform(source, result);
}
*/
package org.opendaylight.controller.netconf.nettyutil.handler.exi;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.openexi.proc.common.AlignmentType;
import org.openexi.proc.common.EXIOptions;
import org.openexi.proc.common.EXIOptionsException;
-import com.google.common.base.Preconditions;
-
public final class EXIParameters {
private static final String EXI_PARAMETER_ALIGNMENT = "alignment";
private static final String EXI_PARAMETER_BYTE_ALIGNED = "byte-aligned";
private static final String EXI_FIDELITY_PIS = "pis";
private static final String EXI_FIDELITY_PREFIXES = "prefixes";
- private static final String EXI_PARAMETER_SCHEMA = "schema";
- private static final String EXI_PARAMETER_SCHEMA_NONE = "none";
- private static final String EXI_PARAMETER_SCHEMA_BUILT_IN = "builtin";
- private static final String EXI_PARAMETER_SCHEMA_BASE_1_1 = "base:1.1";
-
private final EXIOptions options;
private EXIParameters(final EXIOptions options) {
this.options = Preconditions.checkNotNull(options);
}
- public static EXIParameters fromNetconfMessage(final NetconfMessage root) throws EXIOptionsException {
- return fromXmlElement(XmlElement.fromDomDocument(root.getDocument()));
- }
public static EXIParameters fromXmlElement(final XmlElement root) throws EXIOptionsException {
final EXIOptions options = new EXIOptions();
options.setPreserveNS(true);
}
}
-
- if (root.getElementsByTagName(EXI_PARAMETER_SCHEMA).getLength() > 0) {
-/*
- GrammarFactory grammarFactory = GrammarFactory.newInstance();
- if (operationElement
- .getElementsByTagName(EXI_PARAMETER_SCHEMA_NONE)
- .getLength() > 0) {
- this.grammars = grammarFactory.createSchemaLessGrammars();
- }
-
- if (operationElement.getElementsByTagName(
- EXI_PARAMETER_SCHEMA_BUILT_IN).getLength() > 0) {
- this.grammars = grammarFactory.createXSDTypesOnlyGrammars();
- }
-
- if (operationElement.getElementsByTagName(
- EXI_PARAMETER_SCHEMA_BASE_1_1).getLength() > 0) {
- this.grammars = grammarFactory
- .createGrammars(NETCONF_XSD_LOCATION);
- }
-*/
-
- }
-
return new EXIParameters(options);
}
Element startExiElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
START_EXI);
- addAlignemnt(exiOptions, doc, startExiElement);
+ addAlignment(exiOptions, doc, startExiElement);
addFidelity(exiOptions, doc, startExiElement);
rpcElement.appendChild(startExiElement);
}
}
- private static void addAlignemnt(EXIOptions exiOptions, Document doc, Element startExiElement) {
+ private static void addAlignment(EXIOptions exiOptions, Document doc, Element startExiElement) {
Element alignmentElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
ALIGNMENT_KEY);
alignmentElement.setTextContent(exiOptions.getAlignmentType().toString());
/**
* Class Providing username/password authentication option to
- * {@link org.opendaylight.controller.netconf.nettyutil.handler.ssh.SshHandler}
+ * {@link org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshHandler}
*/
public class LoginPassword extends AuthenticationHandler {
private final String username;
* Abstract class providing mechanism of invoking various SSH level services.
* Class is not allowed to be extended, as it provides its own implementations via instance initiators.
*/
-public abstract class Invoker {
+abstract class Invoker {
private boolean invoked = false;
- private Invoker(){}
+ private Invoker() {
+ }
protected boolean isInvoked() {
- // TODO invoked is always false
return invoked;
}
+ public void setInvoked() {
+ this.invoked = true;
+ }
+
abstract void invoke(SshSession session) throws IOException;
- /**
- * Invoker implementation to invokes subsystem SSH service.
- *
- * @param subsystem
- * @return
- */
+ public static Invoker netconfSubsystem(){
+ return subsystem("netconf");
+ }
+
public static Invoker subsystem(final String subsystem) {
return new Invoker() {
@Override
- void invoke(SshSession session) throws IOException {
+ synchronized void invoke(SshSession session) throws IOException {
if (isInvoked()) {
throw new IllegalStateException("Already invoked.");
}
-
- session.startSubSystem(subsystem);
+ try {
+ session.startSubSystem(subsystem);
+ } finally {
+ setInvoked();
+ }
}
};
}
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;
-import ch.ethz.ssh2.channel.Channel;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocket;
-
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocket;
/**
* Wrapper class around GANYMED SSH java library.
*/
-public class SshClient {
+class SshClient {
private final VirtualSocket socket;
private final Map<Integer, SshSession> openSessions = new HashMap<>();
private final AuthenticationHandler authenticationHandler;
authenticationHandler.authenticate(connection);
}
- public void closeSession(SshSession session) {
- if (session.getState() == Channel.STATE_OPEN || session.getState() == Channel.STATE_OPENING) {
- session.close();
- }
- }
public void close() {
for (SshSession session : openSessions.values()){
- closeSession(session);
+ session.close();
}
openSessions.clear();
connection.close();
}
}
+
+ @Override
+ public String toString() {
+ return "SshClient{" +
+ "socket=" + socket +
+ '}';
+ }
}
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocketException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Worker thread class. Handles all downstream and upstream events in SSH Netty
* pipeline.
*/
-public class SshClientAdapter implements Runnable {
+class SshClientAdapter implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
private static final int BUFFER_SIZE = 1024;
this.invoker = invoker;
}
+ // TODO: refactor
public void run() {
try {
SshSession session = sshClient.openSession();
byteBuf.writeBytes(tranBuff);
ctx.fireChannelRead(byteBuf);
}
-
- } catch (VirtualSocketException e) {
- // Netty closed connection prematurely.
- // Or maybe tried to open ganymed connection without having initialized session
- // (ctx.channel().remoteAddress() is null)
- // Just pass and move on.
} catch (Exception e) {
logger.error("Unexpected exception", e);
} finally {
}
}
- public void start(ChannelHandlerContext ctx) {
- if (this.ctx != null) {
- // context is already associated.
- return;
+ public Thread start(ChannelHandlerContext ctx, ChannelFuture channelFuture) {
+ checkArgument(channelFuture.isSuccess());
+ checkNotNull(ctx.channel().remoteAddress());
+ synchronized (this) {
+ checkState(this.ctx == null);
+ this.ctx = ctx;
}
- this.ctx = ctx;
- new Thread(this).start();
+ String threadName = toString();
+ Thread thread = new Thread(this, threadName);
+ thread.start();
+ return thread;
+ }
+
+ @Override
+ public String toString() {
+ return "SshClientAdapter{" +
+ "sshClient=" + sshClient +
+ '}';
}
}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.netconf.nettyutil.handler.ssh;
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
-
import java.io.IOException;
import java.net.SocketAddress;
-
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshClient;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.Invoker;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshClientAdapter;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Netty SSH handler class. Acts as interface between Netty and SSH library. All standard Netty message handling
* stops at instance of this class. All downstream events are handed of to wrapped {@link org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshClientAdapter};
*/
public class SshHandler extends ChannelOutboundHandlerAdapter {
+ private static final Logger logger = LoggerFactory.getLogger(SshHandler.class);
private static final String SOCKET = "socket";
private final VirtualSocket virtualSocket = new VirtualSocket();
private final SshClientAdapter sshClientAdapter;
+
+ public static SshHandler createForNetconfSubsystem(AuthenticationHandler authenticationHandler) throws IOException {
+ return new SshHandler(authenticationHandler, Invoker.netconfSubsystem());
+ }
+
+
public SshHandler(AuthenticationHandler authenticationHandler, Invoker invoker) throws IOException {
SshClient sshClient = new SshClient(virtualSocket, authenticationHandler);
this.sshClientAdapter = new SshClientAdapter(sshClient, invoker);
promise.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) {
- sshClientAdapter.start(ctx);
+ if (channelFuture.isSuccess()) {
+ sshClientAdapter.start(ctx, channelFuture);
+ } else {
+ logger.debug("Failed to connect to remote host");
+ }
}}
);
}
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
+import ch.ethz.ssh2.channel.Channel;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
/**
* Wrapper class for proprietary SSH sessions implementations
*/
-public class SshSession implements Closeable {
+class SshSession implements Closeable {
private final Session session;
public SshSession(Session session) {
this.session = session;
}
- public void execCommand(String cmd) throws IOException {
- session.execCommand(cmd);
- }
-
- public void execCommand(String cmd, String charsetName) throws IOException {
- session.execCommand(cmd, charsetName);
- }
-
- public void startShell() throws IOException {
- session.startShell();
- }
public void startSubSystem(String name) throws IOException {
session.startSubSystem(name);
}
- public int getState() {
- return session.getState();
- }
-
public InputStream getStdout() {
return new StreamGobbler(session.getStdout());
}
return session.getStdin();
}
- public int waitUntilDataAvailable(long timeout) throws IOException {
- return session.waitUntilDataAvailable(timeout);
- }
-
- public int waitForCondition(int conditionSet, long timeout) {
- return session.waitForCondition(conditionSet, timeout);
- }
-
- public Integer getExitStatus() {
- return session.getExitStatus();
- }
-
- public String getExitSignal() {
- return session.getExitSignal();
- }
-
@Override
public void close() {
- session.close();
+ if (session.getState() == Channel.STATE_OPEN || session.getState() == Channel.STATE_OPENING) {
+ session.close();
+ }
}
}
* use OIO application in asynchronous environment and NIO EventLoop. Using VirtualSocket OIO applications
* are able to use full potential of NIO environment.
*/
+//TODO: refactor - socket should be created when connection is established
public class VirtualSocket extends Socket implements ChannelHandler {
private static final String INPUT_STREAM = "inputStream";
private static final String OUTPUT_STREAM = "outputStream";
- private final ChannelInputStream chis = new ChannelInputStream();
- private final ChannelOutputStream chos = new ChannelOutputStream();
+ private final ChannelInputStream chais = new ChannelInputStream();
+ private final ChannelOutputStream chaos = new ChannelOutputStream();
private ChannelHandlerContext ctx;
public InputStream getInputStream() {
- return this.chis;
+ return this.chais;
}
public OutputStream getOutputStream() {
- return this.chos;
+ return this.chaos;
}
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
if (ctx.channel().pipeline().get(OUTPUT_STREAM) == null) {
- ctx.channel().pipeline().addFirst(OUTPUT_STREAM, chos);
+ ctx.channel().pipeline().addFirst(OUTPUT_STREAM, chaos);
}
if (ctx.channel().pipeline().get(INPUT_STREAM) == null) {
- ctx.channel().pipeline().addFirst(INPUT_STREAM, chis);
+ ctx.channel().pipeline().addFirst(INPUT_STREAM, chais);
}
}
ctx.fireExceptionCaught(throwable);
}
- public VirtualSocket() {super();}
@Override
public void connect(SocketAddress endpoint) throws IOException {}
@Override
public InetAddress getInetAddress() {
InetSocketAddress isa = getInetSocketAddress();
-
- if (isa == null) {
- throw new VirtualSocketException();
- }
-
- return getInetSocketAddress().getAddress();
+ return isa.getAddress();
}
@Override
@Override
public String toString() {
- return "Virtual socket InetAdress["+getInetAddress()+"], Port["+getPort()+"]";
+ return "VirtualSocket{" + getInetAddress() + ":" + getPort() + "}";
}
@Override
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket;
-
-/**
- * Exception class which provides notification about exceptional situations at the virtual socket layer.
- */
-// FIXME: Switch to checked exception, create a runtime exception to workaround Socket API
-public class VirtualSocketException extends RuntimeException {
- private static final long serialVersionUID = 1L;
-}
<artifactId>mockito-configuration</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-netty-util</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-client</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
*/
package org.opendaylight.controller.netconf.ssh;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
logger.trace("SSH server socket closed.");
}
+ @VisibleForTesting
+ public InetSocketAddress getLocalSocketAddress() {
+ return (InetSocketAddress) serverSocket.getLocalSocketAddress();
+ }
+
@Override
public void run() {
while (up) {
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.handler.stream.ChunkedStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
class SSHClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(SSHClientHandler.class);
private final AutoCloseable remoteConnection;
- private final OutputStream remoteOutputStream;
+ private final BufferedOutputStream remoteOutputStream;
private final String session;
private ChannelHandlerContext channelHandlerContext;
public SSHClientHandler(AutoCloseable remoteConnection, OutputStream remoteOutputStream,
String session) {
this.remoteConnection = remoteConnection;
- this.remoteOutputStream = remoteOutputStream;
+ this.remoteOutputStream = new BufferedOutputStream(remoteOutputStream);
this.session = session;
}
}
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
ByteBuf bb = (ByteBuf) msg;
// we can block the server here so that slow client does not cause memory pressure
try {
* traffic between the echo client and server by sending the first message to
* the server.
*/
-public class EchoClient implements Runnable {
+public class EchoClient extends Thread {
private static final Logger logger = LoggerFactory.getLogger(EchoClient.class);
- private final ChannelHandler clientHandler;
+ private final ChannelInitializer<LocalChannel> channelInitializer;
- public EchoClient(ChannelHandler clientHandler) {
- this.clientHandler = clientHandler;
+
+ public EchoClient(final ChannelHandler clientHandler) {
+ channelInitializer = new ChannelInitializer<LocalChannel>() {
+ @Override
+ public void initChannel(LocalChannel ch) throws Exception {
+ ch.pipeline().addLast(clientHandler);
+ }
+ };
+ }
+
+ public EchoClient(ChannelInitializer<LocalChannel> channelInitializer) {
+ this.channelInitializer = channelInitializer;
}
+ @Override
public void run() {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
+
b.group(group)
.channel(LocalChannel.class)
- .handler(new ChannelInitializer<LocalChannel>() {
- @Override
- public void initChannel(LocalChannel ch) throws Exception {
- ch.pipeline().addLast(clientHandler);
- }
- });
+ .handler(channelInitializer);
// Start the client.
LocalAddress localAddress = new LocalAddress("foo");
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
* traffic between the echo client and server by sending the first message to
* the server.
*/
-public class EchoClientHandler extends ChannelInboundHandlerAdapter {
+public class EchoClientHandler extends ChannelInboundHandlerAdapter implements ChannelFutureListener {
private static final Logger logger = LoggerFactory.getLogger(EchoClientHandler.class);
private ChannelHandlerContext ctx;
+ private final StringBuilder fromServer = new StringBuilder();
+
+ public static enum State {CONNECTING, CONNECTED, FAILED_TO_CONNECT, CONNECTION_CLOSED}
+
+
+ private State state = State.CONNECTING;
@Override
- public void channelActive(ChannelHandlerContext ctx) {
+ public synchronized void channelActive(ChannelHandlerContext ctx) {
checkState(this.ctx == null);
- logger.info("client active");
+ logger.info("channelActive");
this.ctx = ctx;
+ state = State.CONNECTED;
}
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf bb = (ByteBuf) msg;
- logger.info(">{}", bb.toString(Charsets.UTF_8));
- bb.release();
+ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ state = State.CONNECTION_CLOSED;
}
@Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ ByteBuf bb = (ByteBuf) msg;
+ String string = bb.toString(Charsets.UTF_8);
+ fromServer.append(string);
+ logger.info(">{}", string);
+ bb.release();
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ public synchronized void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.warn("Unexpected exception from downstream.", cause);
checkState(this.ctx.equals(ctx));
this.ctx = null;
}
- public void write(String message) {
+ public synchronized void write(String message) {
ByteBuf byteBuf = Unpooled.copiedBuffer(message.getBytes());
ctx.writeAndFlush(byteBuf);
}
+
+ public synchronized boolean isConnected() {
+ return state == State.CONNECTED;
+ }
+
+ public synchronized String read() {
+ return fromServer.toString();
+ }
+
+ @Override
+ public synchronized void operationComplete(ChannelFuture future) throws Exception {
+ checkState(state == State.CONNECTING);
+ if (future.isSuccess()) {
+ logger.trace("Successfully connected, state will be switched in channelActive");
+ } else {
+ state = State.FAILED_TO_CONNECT;
+ }
+ }
+
+ public State getState() {
+ return state;
+ }
}
package org.opendaylight.controller.netconf.netty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import com.google.common.base.Stopwatch;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.HashedWheelTimer;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.controller.netconf.netty.EchoClientHandler.State;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshHandler;
import org.opendaylight.controller.netconf.ssh.NetconfSSHServer;
import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider;
import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
public class SSHTest {
public static final Logger logger = LoggerFactory.getLogger(SSHTest.class);
+ public static final String AHOJ = "ahoj\n";
+ private EventLoopGroup nettyGroup;
+ HashedWheelTimer hashedWheelTimer;
+
+ @Before
+ public void setUp() throws Exception {
+ hashedWheelTimer = new HashedWheelTimer();
+ nettyGroup = new NioEventLoopGroup();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ hashedWheelTimer.stop();
+ nettyGroup.shutdownGracefully();
+ }
@Test
public void test() throws Exception {
AuthProvider authProvider = mock(AuthProvider.class);
doReturn(PEMGenerator.generate().toCharArray()).when(authProvider).getPEMAsCharArray();
doReturn(true).when(authProvider).authenticated(anyString(), anyString());
- NetconfSSHServer thread = NetconfSSHServer.start(10831, NetconfConfigUtil.getNetconfLocalAddress(), authProvider, new NioEventLoopGroup());
- Thread.sleep(2000);
- logger.info("Closing socket");
- thread.close();
- thread.join();
+ NetconfSSHServer netconfSSHServer = NetconfSSHServer.start(10831, NetconfConfigUtil.getNetconfLocalAddress(),
+ authProvider, new NioEventLoopGroup());
+
+ InetSocketAddress address = netconfSSHServer.getLocalSocketAddress();
+ final EchoClientHandler echoClientHandler = connectClient(address);
+ Stopwatch stopwatch = new Stopwatch().start();
+ while(echoClientHandler.isConnected() == false && stopwatch.elapsed(TimeUnit.SECONDS) < 5) {
+ Thread.sleep(100);
+ }
+ assertTrue(echoClientHandler.isConnected());
+ logger.info("connected, writing to client");
+ echoClientHandler.write(AHOJ);
+ // check that server sent back the same string
+ stopwatch = stopwatch.reset().start();
+ while (echoClientHandler.read().endsWith(AHOJ) == false && stopwatch.elapsed(TimeUnit.SECONDS) < 5) {
+ Thread.sleep(100);
+ }
+ try {
+ String read = echoClientHandler.read();
+ assertTrue(read + " should end with " + AHOJ, read.endsWith(AHOJ));
+ } finally {
+ logger.info("Closing socket");
+ netconfSSHServer.close();
+ netconfSSHServer.join();
+ }
}
+
+ public EchoClientHandler connectClient(InetSocketAddress address) {
+ final EchoClientHandler echoClientHandler = new EchoClientHandler();
+ ChannelInitializer<NioSocketChannel> channelInitializer = new ChannelInitializer<NioSocketChannel>() {
+ @Override
+ public void initChannel(NioSocketChannel ch) throws Exception {
+ ch.pipeline().addFirst(SshHandler.createForNetconfSubsystem(new LoginPassword("a", "a")));
+ ch.pipeline().addLast(echoClientHandler);
+ }
+ };
+ Bootstrap b = new Bootstrap();
+
+ b.group(nettyGroup)
+ .channel(NioSocketChannel.class)
+ .handler(channelInitializer);
+
+ // Start the client.
+ b.connect(address).addListener(echoClientHandler);
+ return echoClientHandler;
+ }
+
+ @Test
+ public void testClientWithoutServer() throws Exception {
+ InetSocketAddress address = new InetSocketAddress(12345);
+ final EchoClientHandler echoClientHandler = connectClient(address);
+ Stopwatch stopwatch = new Stopwatch().start();
+ while(echoClientHandler.getState() == State.CONNECTING && stopwatch.elapsed(TimeUnit.SECONDS) < 5) {
+ Thread.sleep(100);
+ }
+ assertFalse(echoClientHandler.isConnected());
+ assertEquals(State.FAILED_TO_CONNECT, echoClientHandler.getState());
+ }
+
}