import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.config.threadpool.ThreadPool;
+import com.google.common.base.Optional;
+
/**
* Implementation of {@link ThreadPool} using flexible number of threads wraps
* {@link ExecutorService}.
public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
ThreadFactory threadFactory) {
+ this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.<Integer>absent()));
+ }
+
+ public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+ ThreadFactory threadFactory, Optional<Integer> queueCapacity) {
+ this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
+ }
+
+ private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+ ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
- new SynchronousQueue<Runnable>(), threadFactory);
+ queue, threadFactory, new FlexibleRejectionHandler());
executor.prestartAllCoreThreads();
}
+ /**
+ * Overriding the queue:
+ * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+ * occurs in RejectedExecutionHandler.
+ * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+ */
+ private static ForwardingBlockingQueue getQueue(Optional<Integer> capacity) {
+ final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<Runnable>(capacity.get()) : new LinkedBlockingQueue<Runnable>();
+ return new ForwardingBlockingQueue(delegate);
+ }
+
@Override
public ExecutorService getExecutor() {
return Executors.unconfigurableExecutorService(executor);
executor.shutdown();
}
+ /**
+ * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+ */
+ private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
+ }
+ }
+ }
+
+ private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
+ private final BlockingQueue<Runnable> delegate;
+
+ public ForwardingBlockingQueue(BlockingQueue<Runnable> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected BlockingQueue<Runnable> delegate() {
+ return delegate;
+ }
+
+ @Override
+ public boolean offer(final Runnable r) {
+ // ThreadPoolExecutor will spawn a new thread after core size is reached only
+ // if the queue.offer returns false.
+ return false;
+ }
+ }
}
*/
package org.opendaylight.controller.config.yang.threadpool.impl.flexible;
+import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.config.api.JmxAttributeValidationException;
JmxAttributeValidationException.checkNotNull(getMaxThreadCount(), maxThreadCountJmxAttribute);
JmxAttributeValidationException.checkCondition(getMaxThreadCount() > 0, "must be greater than zero",
maxThreadCountJmxAttribute);
+
+ if(getQueueCapacity() != null) {
+ JmxAttributeValidationException.checkCondition(getQueueCapacity() > 0, "Queue capacity cannot be < 1", queueCapacityJmxAttribute);
+ }
}
@Override
public java.lang.AutoCloseable createInstance() {
return new FlexibleThreadPoolWrapper(getMinThreadCount(), getMaxThreadCount(), getKeepAliveMillis(),
- TimeUnit.MILLISECONDS, getThreadFactoryDependency());
+ TimeUnit.MILLISECONDS, getThreadFactoryDependency(), Optional.fromNullable(getQueueCapacity()));
}
}
type uint32;
}
+ leaf queueCapacity {
+ type uint16;
+ mandatory false;
+ description "Capacity of queue that holds waiting tasks";
+ }
+
container threadFactory {
uses config:service-ref {
refine type {
<data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
- <!-- Netconf dispatcher to be used by all netconf-connectors -->
+ <!-- Netconf dispatcher to be used by all netconf-connectors -->
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf:client:dispatcher">prefix:netconf-client-dispatcher</type>
<name>global-netconf-dispatcher</name>
</timer>
</module>
- <!-- Thread factory to be used by all threadpools in netconf-connectors -->
+ <!-- Thread factory to be used by all threadpools in netconf-connectors -->
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
<name>global-netconf-processing-executor-threadfactory</name>
<name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
- </module>
- <!-- Flexible threadpool for all netconf connectors, Max thread count is set to 4 -->
+ </module>
+ <!-- flexible threadpool for all netconf connectors, Max thread count is set to 4. -->
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">prefix:threadpool-flexible</type>
<name>global-netconf-processing-executor</name>
<minThreadCount xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">1</minThreadCount>
<max-thread-count xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">4</max-thread-count>
<keepAliveMillis xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">600000</keepAliveMillis>
+
<threadFactory xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadfactory</type>
<name>global-netconf-processing-executor-threadfactory</name>
</threadFactory>
- </module>
+ </module>
</modules>
<services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
import java.util.EventListener;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RpcProviderRegistryImpl implements //
- RpcProviderRegistry, //
- RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
+public class RpcProviderRegistryImpl implements RpcProviderRegistry, RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
private RuntimeCodeGenerator rpcFactory = SingletonHolder.RPC_GENERATOR_IMPL;
- // publicProxies is a cache of proxy objects where each value in the map corresponds to a specific RpcService
- private final Map<Class<? extends RpcService>, RpcService> publicProxies = new WeakHashMap<>();
+ // cache of proxy objects where each value in the map corresponds to a specific RpcService
+ private final LoadingCache<Class<? extends RpcService>, RpcService> publicProxies = CacheBuilder.newBuilder().weakKeys().
+ build(new CacheLoader<Class<? extends RpcService>, RpcService>() {
+ @Override
+ public RpcService load(final Class<? extends RpcService> type) {
+ final RpcService proxy = rpcFactory.getDirectProxyFor(type);
+ LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
+ return proxy;
+ }
+ });
+
private final Map<Class<? extends RpcService>, RpcRouter<?>> rpcRouters = new WeakHashMap<>();
private final ListenerRegistry<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> routeChangeListeners = ListenerRegistry
.create();
@SuppressWarnings("unchecked")
@Override
public final <T extends RpcService> T getRpcService(final Class<T> type) {
-
- T potentialProxy = (T) publicProxies.get(type);
- if (potentialProxy != null) {
- return potentialProxy;
- }
- synchronized (this) {
- /**
- * Potential proxy could be instantiated by other thread while we
- * were waiting for the lock.
- */
-
- potentialProxy = (T) publicProxies.get(type);
- if (potentialProxy != null) {
- return potentialProxy;
- }
- T proxy = rpcFactory.getDirectProxyFor(type);
- LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
- publicProxies.put(type, proxy);
- return proxy;
- }
+ return (T) publicProxies.getUnchecked(type);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
}
- private class RouteChangeForwarder<T extends RpcService> implements
- RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
+ private class RouteChangeForwarder<T extends RpcService> implements RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
private final Class<T> type;
}
}
- public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements
- RpcRegistration<T> {
+ public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
private final Class<T> serviceType;
private RpcProviderRegistryImpl registry;
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Executors;
+
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import com.google.common.util.concurrent.MoreExecutors;
+
public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule {
- public InMemoryConfigDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ public InMemoryConfigDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public InMemoryConfigDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.inmemory_datastore_provider.InMemoryConfigDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public InMemoryConfigDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, final org.opendaylight.controller.config.yang.inmemory_datastore_provider.InMemoryConfigDataStoreProviderModule oldModule, final java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
@Override
public java.lang.AutoCloseable createInstance() {
- InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-CFG", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-CFG", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
getSchemaServiceDependency().registerSchemaServiceListener(ids);
return ids;
}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Executors;
+
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import com.google.common.util.concurrent.MoreExecutors;
+
public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule {
- public InMemoryOperationalDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ public InMemoryOperationalDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public InMemoryOperationalDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.inmemory_datastore_provider.InMemoryOperationalDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public InMemoryOperationalDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, final org.opendaylight.controller.config.yang.inmemory_datastore_provider.InMemoryOperationalDataStoreProviderModule oldModule, final java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
@Override
public java.lang.AutoCloseable createInstance() {
- InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-OPER", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
getSchemaServiceDependency().registerSchemaServiceListener(ids);
return ids;
}
*/
package org.opendaylight.controller.sal.connect.netconf;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
-
import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.controller.sal.connect.api.MessageTransformer;
import org.opendaylight.controller.sal.connect.api.RemoteDevice;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
/**
* This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
*/
private final MessageTransformer<NetconfMessage> messageTransformer;
private final SchemaContextProviderFactory schemaContextProviderFactory;
private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+ private final NotificationHandler notificationHandler;
public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
this.sourceProviderFactory = sourceProviderFactory;
this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
this.schemaContextProviderFactory = schemaContextProviderFactory;
+ this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
}
@Override
final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
updateMessageTransformer(schemaContextProvider);
salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
+ notificationHandler.onRemoteSchemaUp();
}
});
@Override
public void onNotification(final NetconfMessage notification) {
- final CompositeNode parsedNotification = messageTransformer.toNotification(notification);
- salFacade.onNotification(parsedNotification);
+ notificationHandler.handleNotification(notification);
+ }
+
+ /**
+ * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
+ */
+ private final static class NotificationHandler {
+
+ private final RemoteDeviceHandler<?> salFacade;
+ private final List<NetconfMessage> cache = new LinkedList<>();
+ private final MessageTransformer<NetconfMessage> messageTransformer;
+ private boolean passNotifications = false;
+ private final RemoteDeviceId id;
+
+ NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
+ this.salFacade = salFacade;
+ this.messageTransformer = messageTransformer;
+ this.id = id;
+ }
+
+ synchronized void handleNotification(final NetconfMessage notification) {
+ if(passNotifications) {
+ passNotification(messageTransformer.toNotification(notification));
+ } else {
+ cacheNotification(notification);
+ }
+ }
+
+ /**
+ * Forward all cached notifications and pass all notifications from this point directly to sal facade.
+ */
+ synchronized void onRemoteSchemaUp() {
+ passNotifications = true;
+
+ for (final NetconfMessage cachedNotification : cache) {
+ passNotification(messageTransformer.toNotification(cachedNotification));
+ }
+
+ cache.clear();
+ }
+
+ private void cacheNotification(final NetconfMessage notification) {
+ Preconditions.checkState(passNotifications == false);
+
+ logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
+ if(logger.isTraceEnabled()) {
+ logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
+ }
+
+ cache.add(notification);
+ }
+
+ private void passNotification(final CompositeNode parsedNotification) {
+ logger.debug("{}: Forwarding notification {}", id, parsedNotification);
+ Preconditions.checkNotNull(parsedNotification);
+ salFacade.onNotification(parsedNotification);
+ }
+
}
}
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.ModifyAction;
import org.opendaylight.yangtools.yang.data.api.Node;
import org.opendaylight.yangtools.yang.data.api.SimpleNode;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
}
private void sendMerge(final InstanceIdentifier key, final CompositeNode value) throws InterruptedException, ExecutionException {
- sendEditRpc(createEditConfigStructure(key, Optional.<String>absent(), Optional.of(value)), Optional.<String>absent());
+ sendEditRpc(createEditConfigStructure(key, Optional.<ModifyAction>absent(), Optional.of(value)), Optional.<ModifyAction>absent());
}
private void sendDelete(final InstanceIdentifier toDelete) throws InterruptedException, ExecutionException {
- // FIXME use org.opendaylight.yangtools.yang.data.api.ModifyAction instead of strings
- // TODO add string lowercase value to ModifyAction enum entries
- sendEditRpc(createEditConfigStructure(toDelete, Optional.of("delete"), Optional.<CompositeNode>absent()), Optional.of("none"));
+ sendEditRpc(createEditConfigStructure(toDelete, Optional.of(ModifyAction.DELETE), Optional.<CompositeNode>absent()), Optional.of(ModifyAction.NONE));
}
- private void sendEditRpc(final CompositeNode editStructure, final Optional<String> defaultOperation) throws InterruptedException, ExecutionException {
+ private void sendEditRpc(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) throws InterruptedException, ExecutionException {
final ImmutableCompositeNode editConfigRequest = createEditConfigRequest(editStructure, defaultOperation);
final RpcResult<CompositeNode> rpcResult = rpc.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, editConfigRequest).get();
}
}
- private ImmutableCompositeNode createEditConfigRequest(final CompositeNode editStructure, final Optional<String> defaultOperation) {
+ private ImmutableCompositeNode createEditConfigRequest(final CompositeNode editStructure, final Optional<ModifyAction> defaultOperation) {
final CompositeNodeBuilder<ImmutableCompositeNode> ret = ImmutableCompositeNode.builder();
// Target
// Default operation
if(defaultOperation.isPresent()) {
- SimpleNode<String> defOp = NodeFactory.createImmutableSimpleNode(NETCONF_DEFAULT_OPERATION_QNAME, null, defaultOperation.get());
+ final SimpleNode<String> defOp = NodeFactory.createImmutableSimpleNode(NETCONF_DEFAULT_OPERATION_QNAME, null, modifyOperationToXmlString(defaultOperation.get()));
ret.add(defOp);
}
return ret.toInstance();
}
- private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<String> operation,
+ private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<ModifyAction> operation,
final Optional<CompositeNode> lastChildOverride) {
Preconditions.checkArgument(Iterables.isEmpty(dataPath.getPathArguments()) == false, "Instance identifier with empty path %s", dataPath);
return predicates;
}
- private CompositeNode getDeepestEditElement(final PathArgument arg, final Optional<String> operation, final Optional<CompositeNode> lastChildOverride) {
+ private CompositeNode getDeepestEditElement(final PathArgument arg, final Optional<ModifyAction> operation, final Optional<CompositeNode> lastChildOverride) {
final CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
builder.setQName(arg.getNodeType());
addPredicatesToCompositeNodeBuilder(predicates, builder);
if (operation.isPresent()) {
- builder.setAttribute(NETCONF_OPERATION_QNAME, operation.get());
+ builder.setAttribute(NETCONF_OPERATION_QNAME, modifyOperationToXmlString(operation.get()));
}
if (lastChildOverride.isPresent()) {
final List<Node<?>> children = lastChildOverride.get().getValue();
return builder.toInstance();
}
+ private String modifyOperationToXmlString(final ModifyAction operation) {
+ return operation.name().toLowerCase();
+ }
+
/**
* Send commit rpc to finish the transaction
* In case of failure or unexpected error response, ExecutionException is thrown
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.InputStream;
Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected();
}
+ @Test
+ public void testNotificationBeforeSchema() throws Exception {
+ final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+ final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+
+ final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
+ final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, getSchemaContextProviderFactory(), getSourceProviderFactory());
+
+ device.onNotification(netconfMessage);
+ device.onNotification(netconfMessage);
+
+ verify(facade, times(0)).onNotification(any(CompositeNode.class));
+
+ final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+ Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION));
+
+ device.onRemoteSessionUp(sessionCaps, listener);
+
+ verify(messageTransformer, timeout(10000).times(2)).toNotification(netconfMessage);
+ verify(facade, times(2)).onNotification(compositeNode);
+
+ device.onNotification(netconfMessage);
+ verify(messageTransformer, times(3)).toNotification(netconfMessage);
+ verify(facade, times(3)).onNotification(compositeNode);
+ }
+
@Test
public void testNetconfDeviceReconnect() throws Exception {
final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
final RemoteDeviceHandler<NetconfSessionCapabilities> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContextProvider.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
doNothing().when(remoteDeviceHandler).onDeviceDisconnected();
+ doNothing().when(remoteDeviceHandler).onNotification(any(CompositeNode.class));
return remoteDeviceHandler;
}
final MessageTransformer<NetconfMessage> messageTransformer = mockClass(MessageTransformer.class);
doReturn(netconfMessage).when(messageTransformer).toRpcRequest(any(QName.class), any(CompositeNode.class));
doReturn(rpcResultC).when(messageTransformer).toRpcResult(any(NetconfMessage.class), any(QName.class));
+ doReturn(compositeNode).when(messageTransformer).toNotification(any(NetconfMessage.class));
doNothing().when(messageTransformer).onGlobalContextUpdated(any(SchemaContext.class));
return messageTransformer;
}
doReturn(Futures.immediateFuture(rpcResult)).when(remoteDeviceCommunicator).sendRequest(any(NetconfMessage.class), any(QName.class));
return remoteDeviceCommunicator;
}
-}
\ No newline at end of file
+}
if (statsFlow == storedFlow) {
return true;
}
+ if (storedFlow == null && statsFlow != null) return false;
+ if (statsFlow == null && storedFlow != null) return false;
if (storedFlow.getClass() != statsFlow.getClass()) {
return false;
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
private void start() {
final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage();
- logger.debug("Session negotiation started with hello message {}", XmlUtil.toString(helloMessage.getDocument()));
+ logger.debug("Session negotiation started with hello message {} on channel {}", XmlUtil.toString(helloMessage.getDocument()), channel);
channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
// Do not fail negotiation if promise is done or canceled
// It would result in setting result of the promise second time and that throws exception
if (isPromiseFinished() == false) {
- // FIXME BUG-1365 calling "negotiation failed" closes the channel, but the channel does not get closed if data is still being transferred
- // Loopback connection initiation might
negotiationFailed(new IllegalStateException("Session was not established after " + timeout));
+ changeState(State.FAILED);
+
+ channel.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(future.isSuccess()) {
+ logger.debug("Channel {} closed: success", future.channel());
+ } else {
+ logger.warn("Channel {} closed: fail", future.channel());
+ }
+ }
+ });
}
-
- changeState(State.FAILED);
} else if(channel.isOpen()) {
channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
}
protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException;
private synchronized void changeState(final State newState) {
- logger.debug("Changing state from : {} to : {}", state, newState);
- Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state,
- newState);
+ logger.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
+ Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s for chanel %s", state,
+ newState, channel);
this.state = newState;
}
package org.opendaylight.controller.netconf.nettyutil.handler;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
+public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder {
-public class NetconfEOMAggregator extends ByteToMessageDecoder {
- private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class);
+ public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(NetconfMessageConstants.END_OF_MESSAGE);
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
- int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE);
- if (index == -1) {
- logger.debug("Message is not complete, read again.");
- if (logger.isTraceEnabled()) {
- String str = in.toString(Charsets.UTF_8);
- logger.trace("Message read so far: {}", str);
- }
- ctx.read();
- } else {
- ByteBuf msg = in.readBytes(index);
- in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length);
- in.discardReadBytes();
- logger.debug("Message is complete.");
- out.add(msg);
- }
+ public NetconfEOMAggregator() {
+ super(Integer.MAX_VALUE, DELIMITER);
}
-
- private int indexOfSequence(ByteBuf in, byte[] sequence) {
- int index = -1;
- for (int i = 0; i < in.readableBytes() - sequence.length + 1; i++) {
- if (in.getByte(i) == sequence[0]) {
- index = i;
- for (int j = 1; j < sequence.length; j++) {
- if (in.getByte(i + j) != sequence[j]) {
- index = -1;
- break;
- }
- }
- if (index != -1) {
- return index;
- }
- }
- }
- return index;
- }
-
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
+import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
- StreamResult result = new StreamResult(new OutputStreamWriter(os));
+ // Wrap OutputStreamWriter with BufferedWriter as suggested in javadoc for OutputStreamWriter
+ StreamResult result = new StreamResult(new BufferedWriter(new OutputStreamWriter(os)));
DOMSource source = new DOMSource(msg.getDocument());
transformer.transform(source, result);
}