<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.catalina</artifactId>
- <version>7.0.53.v201406061610</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.catalina.ha</artifactId>
- <version>7.0.53.v201406070630</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.catalina.tribes</artifactId>
- <version>7.0.53.v201406070630</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.coyote</artifactId>
- <version>7.0.53.v201406070630</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.el</artifactId>
- <version>7.0.53.v201406060720</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.jasper</artifactId>
- <version>7.0.53.v201406070630</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.juli.extras</artifactId>
- <version>7.0.53.v201406060720</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.tomcat.api</artifactId>
- <version>7.0.53.v201406060720</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.tomcat.util</artifactId>
- <version>7.0.53.v201406070630</version>
</dependency>
<dependency>
<groupId>org.aopalliance</groupId>
<feature name="odl-base-tomcat" description="OpenDaylight Tomcat" version="7.0.53">
<feature>odl-base-gemini-web</feature>
<feature>odl-base-eclipselink-persistence</feature>
- <bundle start="true">mvn:orbit/org.apache.catalina/${commons.karaf.catalina}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.catalina/${commons.catalina}</bundle>
<bundle start="true">mvn:geminiweb/org.eclipse.gemini.web.tomcat/${geminiweb.version}</bundle>
- <bundle start="true">mvn:orbit/org.apache.catalina.ha/${commons.karaf.catalina.ha}</bundle>
- <bundle start="true">mvn:orbit/org.apache.catalina.tribes/${commons.karaf.catalina.tribes}</bundle>
- <bundle start="true">mvn:orbit/org.apache.coyote/${commons.karaf.coyote}</bundle>
- <bundle start="true">mvn:orbit/org.apache.el/${commons.karaf.el}</bundle>
- <bundle start="true">mvn:orbit/org.apache.jasper/${commons.karaf.jasper}</bundle>
- <bundle start="true">mvn:orbit/org.apache.juli.extras/${commons.karaf.juli.version}</bundle>
- <bundle start="true">mvn:orbit/org.apache.tomcat.api/${commons.karaf.tomcat.api}</bundle>
- <bundle start="true">mvn:orbit/org.apache.tomcat.util/${commons.karaf.tomcat.util}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.catalina.ha/${commons.catalina.ha}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.catalina.tribes/${commons.catalina.tribes}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.coyote/${commons.coyote}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.el/${commons.el}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.jasper/${commons.jasper}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.juli.extras/${commons.juli.version}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.tomcat.api/${commons.tomcat.api}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.tomcat.util/${commons.tomcat.util}</bundle>
<bundle start="true" >mvn:org.opendaylight.controller/karaf-tomcat-security/${karaf.security.version}</bundle>
<bundle start="true">wrap:mvn:virgomirror/org.eclipse.jdt.core.compiler.batch/${eclipse.jdt.core.compiler.batch.version}</bundle>
</feature>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-dom-xsql-config</artifactId>
+ <classifier>config</classifier>
+ <type>xml</type>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<feature>war</feature>
<bundle>mvn:org.opendaylight.controller/sal-rest-connector/${project.version}</bundle>
<bundle>mvn:com.google.code.gson/gson/${gson.version}</bundle>
+ <bundle>mvn:org.opendaylight.yangtools/yang-data-codec-gson/${yangtools.version}</bundle>
<bundle>mvn:com.sun.jersey/jersey-core/${jersey.version}</bundle>
<bundle>mvn:com.sun.jersey/jersey-server/${jersey.version}</bundle>
<bundle>mvn:com.sun.jersey/jersey-servlet/${jersey.version}</bundle>
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-auth</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>ietf-netconf-monitoring</artifactId>
<commmons.northbound.version>0.4.2-SNAPSHOT</commmons.northbound.version>
<!-- Third Party Versions -->
<codahale.metrics.version>3.0.1</codahale.metrics.version>
- <commons.catalina>7.0.32.v201211201336</commons.catalina>
- <commons.catalina.ha>7.0.32.v201211201952</commons.catalina.ha>
- <commons.catalina.tribes>7.0.32.v201211201952</commons.catalina.tribes>
- <commons.coyote>7.0.32.v201211201952</commons.coyote>
- <commons.el>7.0.32.v201211081135</commons.el>
- <commons.jasper>7.0.32.v201211201952</commons.jasper>
- <commons.juli.version>7.0.32.v201211081135</commons.juli.version>
- <commons.tomcat.api>7.0.32.v201211081135</commons.tomcat.api>
- <commons.tomcat.util>7.0.32.v201211201952</commons.tomcat.util>
- <commons.karaf.catalina>7.0.53.v201406061610</commons.karaf.catalina>
- <commons.karaf.catalina.ha>7.0.53.v201406070630</commons.karaf.catalina.ha>
- <commons.karaf.catalina.tribes>7.0.53.v201406070630</commons.karaf.catalina.tribes>
- <commons.karaf.coyote>7.0.53.v201406070630</commons.karaf.coyote>
- <commons.karaf.el>7.0.53.v201406060720</commons.karaf.el>
- <commons.karaf.jasper>7.0.53.v201406070630</commons.karaf.jasper>
- <commons.karaf.juli.version>7.0.53.v201406060720</commons.karaf.juli.version>
- <commons.karaf.tomcat.api>7.0.53.v201406060720</commons.karaf.tomcat.api>
- <commons.karaf.tomcat.util>7.0.53.v201406070630</commons.karaf.tomcat.util>
+ <commons.catalina>7.0.53.v201406061610</commons.catalina>
+ <commons.catalina.ha>7.0.53.v201406070630</commons.catalina.ha>
+ <commons.catalina.tribes>7.0.53.v201406070630</commons.catalina.tribes>
+ <commons.coyote>7.0.53.v201406070630</commons.coyote>
+ <commons.el>7.0.53.v201406060720</commons.el>
+ <commons.jasper>7.0.53.v201406070630</commons.jasper>
+ <commons.juli.version>7.0.53.v201406060720</commons.juli.version>
+ <commons.tomcat.api>7.0.53.v201406060720</commons.tomcat.api>
+ <commons.tomcat.util>7.0.53.v201406070630</commons.tomcat.util>
<commons.checkstyle.version>0.0.3-SNAPSHOT</commons.checkstyle.version>
<commons.fileupload.version>1.2.2</commons.fileupload.version>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-dom-xsql-config</artifactId>
<version>${mdsal.version}</version>
+ <classifier>config</classifier>
+ <type>xml</type>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
--- /dev/null
+package org.opendaylight.controller.config.manager.impl.osgi;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.config.manager.impl.osgi.mapping.RefreshingSCPModuleInfoRegistry;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.sal.binding.generator.api.ModuleInfoRegistry;
+import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+import java.util.*;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class RefreshingSCPModuleInfoRegistryTest {
+ @Test
+ public void testConstructor() throws Exception {
+ ModuleInfoRegistry reg = mock(ModuleInfoRegistry.class);
+ SchemaContextProvider prov = mock(SchemaContextProvider.class);
+ doReturn("string").when(prov).toString();
+
+ BundleContext ctxt = mock(BundleContext.class);
+ Dictionary dict = new Hashtable();
+ ServiceRegistration servReg = mock(ServiceRegistration.class);
+ doReturn(servReg).when(ctxt).registerService(Mockito.any(Class.class), Mockito.any(SchemaContextProvider.class), Mockito.any(Dictionary.class));
+ doReturn(servReg).when(ctxt).registerService(Mockito.anyString(), Mockito.any(Object.class), Mockito.any(Dictionary.class));
+ RefreshingSCPModuleInfoRegistry scpreg = new RefreshingSCPModuleInfoRegistry(reg, prov, ctxt);
+
+ YangModuleInfo modInfo = mock(YangModuleInfo.class);
+ doNothing().when(servReg).setProperties(null);
+ doNothing().when(servReg).unregister();
+ doReturn("").when(modInfo).toString();
+ ObjectRegistration<YangModuleInfo> ymi = mock(ObjectRegistration.class);
+ doReturn(ymi).when(reg).registerModuleInfo(modInfo);
+
+ scpreg.registerModuleInfo(modInfo);
+ scpreg.close();
+
+ Mockito.verify(servReg, Mockito.times(1)).setProperties(null);
+ Mockito.verify(servReg, Mockito.times(1)).unregister();
+ }
+}
<groupId>org.opendaylight.controller.thirdparty</groupId>
<artifactId>net.sf.jung2</artifactId>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller.thirdparty</groupId>
- <artifactId>org.apache.catalina.filters.CorsFilter</artifactId>
- </dependency>
<dependency>
<groupId>org.opendaylight.controller.thirdparty</groupId>
<artifactId>org.openflow.openflowj</artifactId>
-Xmx*) jvmMaxMemory="$1"; shift;;
-D*) extraJVMOpts="${extraJVMOpts} $1"; shift;;
-X*) extraJVMOpts="${extraJVMOpts} $1"; shift;;
+ -J*) extraJVMOpts="${extraJVMOpts} -$(echo "$1" | cut -d'J' -f2)"; shift;;
-agentpath:*) agentPath="$1"; shift;;
"") break ;;
*) echo "Unknown option $1"; unknown_option=1; break ;;
public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
bindingDataChangeListener.onDataChanged(new TranslatedDataChangeEvent(change, path));
}
+
+ @Override
+ public String toString() {
+ return bindingDataChangeListener.getClass().getName();
+ }
}
private class TranslatedDataChangeEvent implements AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> {
*/
package org.opendaylight.controller.md.sal.binding.impl;
-import java.util.Map;
-import java.util.WeakHashMap;
-
-import javax.annotation.concurrent.GuardedBy;
-
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.yangtools.concepts.Delegator;
-import com.google.common.base.Preconditions;
-
class BindingTranslatedTransactionChain implements BindingTransactionChain, Delegator<DOMTransactionChain> {
private final DOMTransactionChain delegate;
-
- @GuardedBy("this")
- private final Map<AsyncTransaction<?, ?>, AsyncTransaction<?, ?>> delegateTxToBindingTx = new WeakHashMap<>();
private final BindingToNormalizedNodeCodec codec;
+ private final DelegateChainListener delegatingListener;
+ private final TransactionChainListener listener;
public BindingTranslatedTransactionChain(final DOMDataBroker chainFactory,
final BindingToNormalizedNodeCodec codec, final TransactionChainListener listener) {
Preconditions.checkNotNull(chainFactory, "DOM Transaction chain factory must not be null");
- this.delegate = chainFactory.createTransactionChain(new ListenerInvoker(listener));
+ this.delegatingListener = new DelegateChainListener();
+ this.listener = listener;
+ this.delegate = chainFactory.createTransactionChain(listener);
this.codec = codec;
}
public ReadOnlyTransaction newReadOnlyTransaction() {
DOMDataReadOnlyTransaction delegateTx = delegate.newReadOnlyTransaction();
ReadOnlyTransaction bindingTx = new BindingDataReadTransactionImpl(delegateTx, codec);
- putDelegateToBinding(delegateTx, bindingTx);
return bindingTx;
}
@Override
public ReadWriteTransaction newReadWriteTransaction() {
DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
- ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec);
- putDelegateToBinding(delegateTx, bindingTx);
+ ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec) {
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ return listenForFailure(this,super.submit());
+ }
+
+ };
return bindingTx;
}
@Override
public WriteTransaction newWriteOnlyTransaction() {
- DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction();
- WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<>(delegateTx, codec);
- putDelegateToBinding(delegateTx, bindingTx);
+ final DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction();
+ WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<DOMDataWriteTransaction>(delegateTx, codec) {
+
+ @Override
+ public CheckedFuture<Void,TransactionCommitFailedException> submit() {
+ return listenForFailure(this,super.submit());
+ };
+
+ };
return bindingTx;
}
- @Override
- public void close() {
- delegate.close();
+ protected CheckedFuture<Void, TransactionCommitFailedException> listenForFailure(
+ final WriteTransaction tx, CheckedFuture<Void, TransactionCommitFailedException> future) {
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onFailure(Throwable t) {
+ failTransactionChain(tx,t);
+ }
+
+ @Override
+ public void onSuccess(Void result) {
+ // Intentionally NOOP
+ }
+ });
+
+ return future;
}
- private synchronized void putDelegateToBinding(final AsyncTransaction<?, ?> domTx,
- final AsyncTransaction<?, ?> bindingTx) {
- final Object previous = delegateTxToBindingTx.put(domTx, bindingTx);
- Preconditions.checkState(previous == null, "DOM Transaction %s has already associated binding transation %s",domTx,previous);
+ protected void failTransactionChain(WriteTransaction tx, Throwable t) {
+ // We asume correct state change for underlaying transaction
+ // chain, so we are not changing any of our internal state
+ // to mark that we failed.
+ this.delegatingListener.onTransactionChainFailed(this, tx, t);
}
- private synchronized AsyncTransaction<?, ?> getBindingTransaction(final AsyncTransaction<?, ?> transaction) {
- return delegateTxToBindingTx.get(transaction);
+ @Override
+ public void close() {
+ delegate.close();
}
- private final class ListenerInvoker implements TransactionChainListener {
-
- private final TransactionChainListener listener;
-
- public ListenerInvoker(final TransactionChainListener listener) {
- this.listener = Preconditions.checkNotNull(listener, "Listener must not be null.");
- }
+ private final class DelegateChainListener implements TransactionChainListener {
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
final AsyncTransaction<?, ?> transaction, final Throwable cause) {
- Preconditions.checkState(delegate.equals(chain),
- "Illegal state - listener for %s was invoked for incorrect chain %s.", delegate, chain);
- AsyncTransaction<?, ?> bindingTx = getBindingTransaction(transaction);
- listener.onTransactionChainFailed(chain, bindingTx, cause);
+ /*
+ * Intentionally NOOP, callback for failure, since we
+ * are also listening on each transaction for failure.
+ *
+ * by listening on submit future for Binding transaction
+ * in order to provide Binding transaction (which was seen by client
+ * of this transaction chain, instead of
+ */
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import java.lang.management.ManagementFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Abstract base for an MXBean implementation class.
+ * <p>
+ * This class is not intended for use outside of MD-SAL and its part of private
+ * implementation (still exported as public to be reused across MD-SAL implementation
+ * components) and may be removed in subsequent
+ * releases.
+ *
+ * @author Thomas Pantelis
+ */
+@Beta
+public abstract class AbstractMXBean {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractMXBean.class);
+
+ public static String BASE_JMX_PREFIX = "org.opendaylight.controller:";
+
+ private final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+ private final String mBeanName;
+ private final String mBeanType;
+ private final String mBeanCategory;
+
+ /**
+ * Constructor.
+ *
+ * @param mBeanName Used as the <code>name</code> property in the bean's ObjectName.
+ * @param mBeanType Used as the <code>type</code> property in the bean's ObjectName.
+ * @param mBeanCategory Used as the <code>Category</code> property in the bean's ObjectName.
+ */
+ protected AbstractMXBean(@Nonnull String mBeanName, @Nonnull String mBeanType,
+ @Nullable String mBeanCategory) {
+ this.mBeanName = mBeanName;
+ this.mBeanType = mBeanType;
+ this.mBeanCategory = mBeanCategory;
+ }
+
+ private ObjectName getMBeanObjectName() throws MalformedObjectNameException {
+ StringBuilder builder = new StringBuilder(BASE_JMX_PREFIX)
+ .append("type=").append(getMBeanType());
+
+ if(getMBeanCategory() != null) {
+ builder.append(",Category=").append(getMBeanCategory());
+ }
+
+ builder.append(",name=").append(getMBeanName());
+ return new ObjectName(builder.toString());
+ }
+
+ /**
+ * Registers this bean with the platform MBean server with the domain defined by
+ * {@link #BASE_JMX_PREFIX}.
+ *
+ * @return true is successfully registered, false otherwise.
+ */
+ public boolean registerMBean() {
+ boolean registered = false;
+ try {
+ // Object to identify MBean
+ final ObjectName mbeanName = this.getMBeanObjectName();
+
+ LOG.debug("Register MBean {}", mbeanName);
+
+ // unregistered if already registered
+ if(server.isRegistered(mbeanName)) {
+
+ LOG.debug("MBean {} found to be already registered", mbeanName);
+
+ try {
+ unregisterMBean(mbeanName);
+ } catch(Exception e) {
+
+ LOG.warn("unregister mbean {} resulted in exception {} ", mbeanName, e);
+ }
+ }
+ server.registerMBean(this, mbeanName);
+ registered = true;
+
+ LOG.debug("MBean {} registered successfully", mbeanName.getCanonicalName());
+ } catch(Exception e) {
+
+ LOG.error("registration failed {}", e);
+
+ }
+ return registered;
+ }
+
+ /**
+ * Unregisters this bean with the platform MBean server.
+ *
+ * @return true is successfully unregistered, false otherwise.
+ */
+ public boolean unregisterMBean() {
+ boolean unregister = false;
+ try {
+ ObjectName mbeanName = this.getMBeanObjectName();
+ unregisterMBean(mbeanName);
+ unregister = true;
+ } catch(Exception e) {
+
+ LOG.error("Failed when unregistering MBean {}", e);
+ }
+
+ return unregister;
+ }
+
+ private void unregisterMBean(ObjectName mbeanName) throws MBeanRegistrationException,
+ InstanceNotFoundException {
+ server.unregisterMBean(mbeanName);
+ }
+
+ /**
+ * Returns the <code>name</code> property of the bean's ObjectName.
+ */
+ public String getMBeanName() {
+ return mBeanName;
+ }
+
+ /**
+ * Returns the <code>type</code> property of the bean's ObjectName.
+ */
+ public String getMBeanType() {
+ return mBeanType;
+ }
+
+ /**
+ * Returns the <code>Category</code> property of the bean's ObjectName.
+ */
+ public String getMBeanCategory() {
+ return mBeanCategory;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import java.util.List;
+
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+
+/**
+ * MXBean interface for {@link QueuedNotificationManager} statistic metrics.
+ *
+ * @author Thomas Pantelis
+ */
+public interface QueuedNotificationManagerMXBean {
+
+ /**
+ * Returns a list of stat instances for each current listener notification task in progress.
+ */
+ List<ListenerNotificationQueueStats> getCurrentListenerQueueStats();
+
+ /**
+ * Returns the configured maximum listener queue size.
+ */
+ int getMaxListenerQueueSize();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import java.util.List;
+
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of the QueuedNotificationManagerMXBean interface.
+ *
+ * <p>
+ * This class is not intended for use outside of MD-SAL and its part of private
+ * implementation (still exported as public to be reused across MD-SAL implementation
+ * components) and may be removed in subsequent
+ * releases.
+ *
+ * @author Thomas Pantelis
+ */
+public class QueuedNotificationManagerMXBeanImpl extends AbstractMXBean
+ implements QueuedNotificationManagerMXBean {
+
+ private final QueuedNotificationManager<?,?> manager;
+
+ public QueuedNotificationManagerMXBeanImpl( QueuedNotificationManager<?,?> manager,
+ String mBeanName, String mBeanType, String mBeanCategory ) {
+ super(mBeanName, mBeanType, mBeanCategory);
+ this.manager = Preconditions.checkNotNull( manager );
+ }
+
+ @Override
+ public List<ListenerNotificationQueueStats> getCurrentListenerQueueStats() {
+ return manager.getListenerNotificationQueueStats();
+ }
+
+ @Override
+ public int getMaxListenerQueueSize() {
+ return manager.getMaxQueueCapacity();
+ }
+
+ public QueuedNotificationManagerStats toQueuedNotificationManagerStats() {
+ return new QueuedNotificationManagerStats( getMaxListenerQueueSize(),
+ getCurrentListenerQueueStats() );
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import java.beans.ConstructorProperties;
+import java.util.List;
+
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+
+/**
+ * A bean class that holds various QueuedNotificationManager statistic metrics. This class is
+ * suitable for mapping to the MXBean CompositeDataSupport type.
+ *
+ * <p>
+ * This class is not intended for use outside of MD-SAL and its part of private
+ * implementation (still exported as public to be reused across MD-SAL implementation
+ * components) and may be removed in subsequent
+ * releases.
+ * @author Thomas Pantelis
+ * @see QueuedNotificationManagerMXBeanImpl
+ */
+public class QueuedNotificationManagerStats {
+
+ private final int maxListenerQueueSize;
+ private final List<ListenerNotificationQueueStats> currentListenerQueueStats;
+
+ @ConstructorProperties({"maxListenerQueueSize","currentListenerQueueStats"})
+ public QueuedNotificationManagerStats( int maxListenerQueueSize,
+ List<ListenerNotificationQueueStats> currentListenerQueueStats ) {
+ super();
+ this.maxListenerQueueSize = maxListenerQueueSize;
+ this.currentListenerQueueStats = currentListenerQueueStats;
+ }
+
+ public List<ListenerNotificationQueueStats> getCurrentListenerQueueStats() {
+ return currentListenerQueueStats;
+ }
+
+ public int getMaxListenerQueueSize() {
+ return maxListenerQueueSize;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import java.beans.ConstructorProperties;
+
+/**
+ * A bean class that holds various thread executor statistic metrics. This class is suitable for
+ * mapping to the MXBean CompositeDataSupport type;
+ *
+ * @author Thomas Pantelis
+ * @see ThreadExecutorStatsMXBeanImpl
+ */
+public class ThreadExecutorStats {
+
+ private final long activeThreadCount;
+ private final long completedTaskCount;
+ private final long currentQueueSize;
+ private final long maxThreadPoolSize;
+ private final long totalTaskCount;
+ private final long largestThreadPoolSize;
+ private final long maxQueueSize;
+ private final long currentThreadPoolSize;
+
+ // The following fields are defined as Long because they may be null if we can't a value
+ // from the underlying executor.
+ private final Long largestQueueSize;
+ private final Long rejectedTaskCount;
+
+ @ConstructorProperties({"activeThreadCount","currentThreadPoolSize","largestThreadPoolSize",
+ "maxThreadPoolSize","currentQueueSize","largestQueueSize","maxQueueSize",
+ "completedTaskCount","totalTaskCount","rejectedTaskCount"})
+ public ThreadExecutorStats(long activeThreadCount, long currentThreadPoolSize,
+ long largestThreadPoolSize, long maxThreadPoolSize, long currentQueueSize,
+ Long largestQueueSize, long maxQueueSize, long completedTaskCount,
+ long totalTaskCount, Long rejectedTaskCount) {
+ this.activeThreadCount = activeThreadCount;
+ this.currentThreadPoolSize = currentThreadPoolSize;
+ this.largestQueueSize = largestQueueSize;
+ this.largestThreadPoolSize = largestThreadPoolSize;
+ this.maxThreadPoolSize = maxThreadPoolSize;
+ this.currentQueueSize = currentQueueSize;
+ this.maxQueueSize = maxQueueSize;
+ this.completedTaskCount = completedTaskCount;
+ this.totalTaskCount = totalTaskCount;
+ this.rejectedTaskCount = rejectedTaskCount;
+ }
+
+ public long getActiveThreadCount() {
+ return activeThreadCount;
+ }
+
+ public long getCompletedTaskCount() {
+ return completedTaskCount;
+ }
+
+ public Long getRejectedTaskCount() {
+ return rejectedTaskCount;
+ }
+
+ public long getCurrentQueueSize() {
+ return currentQueueSize;
+ }
+
+ public Long getLargestQueueSize() {
+ return largestQueueSize;
+ }
+
+ public long getMaxThreadPoolSize() {
+ return maxThreadPoolSize;
+ }
+
+ public long getTotalTaskCount() {
+ return totalTaskCount;
+ }
+
+ public long getLargestThreadPoolSize() {
+ return largestThreadPoolSize;
+ }
+
+ public long getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public long getCurrentThreadPoolSize() {
+ return currentThreadPoolSize;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+/**
+ * MXBean interface for thread executor statistic metrics.
+ *
+ * @author Thomas Pantelis
+ */
+public interface ThreadExecutorStatsMXBean {
+
+ /**
+ * Returns the current thread pool size.
+ */
+ long getCurrentThreadPoolSize();
+
+ /**
+ * Returns the largest thread pool size.
+ */
+ long getLargestThreadPoolSize();
+
+ /**
+ * Returns the maximum thread pool size.
+ */
+ long getMaxThreadPoolSize();
+
+ /**
+ * Returns the current queue size.
+ */
+ long getCurrentQueueSize();
+
+ /**
+ * Returns the largest queue size, if available.
+ */
+ Long getLargestQueueSize();
+
+ /**
+ * Returns the maximum queue size.
+ */
+ long getMaxQueueSize();
+
+ /**
+ * Returns the active thread count.
+ */
+ long getActiveThreadCount();
+
+ /**
+ * Returns the completed task count.
+ */
+ long getCompletedTaskCount();
+
+ /**
+ * Returns the total task count.
+ */
+ long getTotalTaskCount();
+
+ /**
+ * Returns the rejected task count, if available.
+ */
+ Long getRejectedTaskCount();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import javax.annotation.Nullable;
+import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
+import org.opendaylight.yangtools.util.concurrent.TrackingLinkedBlockingQueue;
+
+/**
+ * MXBean implementation of the ThreadExecutorStatsMXBean interface that retrieves statistics
+ * from a backing {@link java.util.concurrent.ExecutorService}.
+ *
+ * @author Thomas Pantelis
+ */
+public class ThreadExecutorStatsMXBeanImpl extends AbstractMXBean
+ implements ThreadExecutorStatsMXBean {
+
+ private final ThreadPoolExecutor executor;
+
+ /**
+ * Constructs an instance for the given {@link Executor}.
+ *
+ * @param executor the backing {@link Executor}
+ * @param mBeanName Used as the <code>name</code> property in the bean's ObjectName.
+ * @param mBeanType Used as the <code>type</code> property in the bean's ObjectName.
+ * @param mBeanCategory Used as the <code>Category</code> property in the bean's ObjectName.
+ */
+ public ThreadExecutorStatsMXBeanImpl(Executor executor, String mBeanName,
+ String mBeanType, @Nullable String mBeanCategory) {
+ super(mBeanName, mBeanType, mBeanCategory);
+
+ Preconditions.checkArgument(executor instanceof ThreadPoolExecutor,
+ "The ExecutorService of type {} is not an instanceof ThreadPoolExecutor",
+ executor.getClass());
+ this.executor = (ThreadPoolExecutor)executor;
+ }
+
+ @Override
+ public long getCurrentThreadPoolSize() {
+ return executor.getPoolSize();
+ }
+
+ @Override
+ public long getLargestThreadPoolSize() {
+ return executor.getLargestPoolSize();
+ }
+
+ @Override
+ public long getMaxThreadPoolSize() {
+ return executor.getMaximumPoolSize();
+ }
+
+ @Override
+ public long getCurrentQueueSize() {
+ return executor.getQueue().size();
+ }
+
+ @Override
+ public Long getLargestQueueSize() {
+ BlockingQueue<Runnable> queue = executor.getQueue();
+ if(queue instanceof TrackingLinkedBlockingQueue) {
+ return Long.valueOf(((TrackingLinkedBlockingQueue<?>)queue).getLargestQueueSize());
+ }
+
+ return null;
+ }
+
+ @Override
+ public long getMaxQueueSize() {
+ long queueSize = executor.getQueue().size();
+ return executor.getQueue().remainingCapacity() + queueSize;
+ }
+
+ @Override
+ public long getActiveThreadCount() {
+ return executor.getActiveCount();
+ }
+
+ @Override
+ public long getCompletedTaskCount() {
+ return executor.getCompletedTaskCount();
+ }
+
+ @Override
+ public long getTotalTaskCount() {
+ return executor.getTaskCount();
+ }
+
+ @Override
+ public Long getRejectedTaskCount() {
+ RejectedExecutionHandler rejectedHandler = executor.getRejectedExecutionHandler();
+ if(rejectedHandler instanceof CountingRejectedExecutionHandler) {
+ return Long.valueOf(((CountingRejectedExecutionHandler)rejectedHandler)
+ .getRejectedTaskCount());
+ }
+
+ return null;
+ }
+
+ /**
+ * Returns a {@link ThreadExecutorStats} instance containing a snapshot of the statistic
+ * metrics.
+ */
+ public ThreadExecutorStats toThreadExecutorStats() {
+ return new ThreadExecutorStats(getActiveThreadCount(), getCurrentThreadPoolSize(),
+ getLargestThreadPoolSize(), getMaxThreadPoolSize(), getCurrentQueueSize(),
+ getLargestQueueSize(), getMaxQueueSize(), getCompletedTaskCount(),
+ getTotalTaskCount(), getRejectedTaskCount());
+ }
+}
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
<Export-package></Export-package>
<Private-Package></Private-Package>
- <Import-Package>!*snappy;!org.jboss.*;!com.jcraft.*;*</Import-Package>
+ <Import-Package>!*snappy;!org.jboss.*;!com.jcraft.*;!*jetty*;!sun.security.*;*</Import-Package>
<Embed-Dependency>
sal-clustering-commons;
sal-akka-raft;
actorSystem, actorSystem.actorOf(
ShardManager.props(type, cluster, configuration, datastoreContext).
withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
+
+ actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
}
public DistributedDataStore(ActorContext actorContext) {
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
Object result = actorContext.executeLocalShardOperation(shardName,
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
- ActorContext.ASK_DURATION);
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope));
if (result != null) {
RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
private final int maxShardDataChangeExecutorQueueSize;
private final int maxShardDataChangeExecutorPoolSize;
private final int shardTransactionIdleTimeoutInMinutes;
+ private final int operationTimeoutInSeconds;
public DistributedDataStoreProperties() {
maxShardDataChangeListenerQueueSize = 1000;
maxShardDataChangeExecutorQueueSize = 1000;
maxShardDataChangeExecutorPoolSize = 20;
shardTransactionIdleTimeoutInMinutes = 10;
+ operationTimeoutInSeconds = 5;
}
public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
- int shardTransactionIdleTimeoutInMinutes) {
+ int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
+ this.operationTimeoutInSeconds = operationTimeoutInSeconds;
}
public int getMaxShardDataChangeListenerQueueSize() {
public int getShardTransactionIdleTimeoutInMinutes() {
return shardTransactionIdleTimeoutInMinutes;
}
+
+ public int getOperationTimeoutInSeconds() {
+ return operationTimeoutInSeconds;
+ }
}
ActorSelection cohort = actorContext.actorSelection(actorPath);
- futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
- ActorContext.ASK_DURATION));
+ futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
}
return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
try {
Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
- ActorContext.ASK_DURATION);
+ new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable());
if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
CreateTransactionReply reply =
CreateTransactionReply.fromSerializable(response);
// Send the ReadyTransaction message to the Tx actor.
final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
- new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+ new ReadyTransaction().toSerializable());
// Combine all the previously recorded put/merge/delete operation reply Futures and the
// ReadyTransactionReply Future into one Future. If any one fails then the combined
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", identifier, path);
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
+ new DeleteData(path).toSerializable() ));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new MergeData(path, data, schemaContext).toSerializable(),
- ActorContext.ASK_DURATION));
+ new MergeData(path, data, schemaContext).toSerializable()));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new WriteData(path, data, schemaContext).toSerializable(),
- ActorContext.ASK_DURATION));
+ new WriteData(path, data, schemaContext).toSerializable()));
}
@Override
};
Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
- new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
+ new ReadData(path).toSerializable());
readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
};
Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
- new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
+ new DataExists(path).toSerializable());
future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
}
private static final Logger
LOG = LoggerFactory.getLogger(ActorContext.class);
- public static final FiniteDuration ASK_DURATION =
- Duration.create(5, TimeUnit.SECONDS);
- public static final Duration AWAIT_DURATION =
- Duration.create(5, TimeUnit.SECONDS);
+ private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
public static final String MAILBOX = "bounded-mailbox";
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
private volatile SchemaContext schemaContext;
+ private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
+ private Timeout operationTimeout = new Timeout(operationDuration);
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper,
}
}
+ public void setOperationTimeout(int timeoutInSeconds) {
+ operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+ }
+
public SchemaContext getSchemaContext() {
return schemaContext;
}
*/
public ActorRef findLocalShard(String shardName) {
Object result = executeLocalOperation(shardManager,
- new FindLocalShard(shardName), ASK_DURATION);
+ new FindLocalShard(shardName));
if (result instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound) result;
public String findPrimaryPath(String shardName) {
Object result = executeLocalOperation(shardManager,
- new FindPrimary(shardName).toSerializable(), ASK_DURATION);
+ new FindPrimary(shardName).toSerializable());
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);
*
* @param actor
* @param message
- * @param duration
* @return The response of the operation
*/
- public Object executeLocalOperation(ActorRef actor, Object message,
- FiniteDuration duration) {
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
+ public Object executeLocalOperation(ActorRef actor, Object message) {
+ Future<Object> future = ask(actor, message, operationTimeout);
try {
- return Await.result(future, AWAIT_DURATION);
+ return Await.result(future, operationDuration);
} catch (Exception e) {
throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
}
*
* @param actor
* @param message
- * @param duration
* @return
*/
- public Object executeRemoteOperation(ActorSelection actor, Object message,
- FiniteDuration duration) {
+ public Object executeRemoteOperation(ActorSelection actor, Object message) {
LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
+ Future<Object> future = ask(actor, message, operationTimeout);
try {
- return Await.result(future, AWAIT_DURATION);
+ return Await.result(future, operationDuration);
} catch (Exception e) {
- throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+ throw new TimeoutException("Sending message " + message.getClass().toString() +
+ " to actor " + actor.toString() + " failed" , e);
}
}
*
* @param actor the ActorSelection
* @param message the message to send
- * @param duration the maximum amount of time to send he message
* @return a Future containing the eventual result
*/
- public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
- FiniteDuration duration) {
+ public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
- return ask(actor, message, new Timeout(duration));
+ return ask(actor, message, operationTimeout);
}
/**
*
* @param shardName
* @param message
- * @param duration
* @return
* @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
* @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
*/
- public Object executeShardOperation(String shardName, Object message,
- FiniteDuration duration) {
+ public Object executeShardOperation(String shardName, Object message) {
ActorSelection primary = findPrimary(shardName);
- return executeRemoteOperation(primary, message, duration);
+ return executeRemoteOperation(primary, message);
}
/**
*
* @param shardName the name of the shard on which the operation needs to be executed
* @param message the message that needs to be sent to the shard
- * @param duration the time duration in which this operation should complete
* @return the message that was returned by the local actor on which the
* the operation was executed. If a local shard was not found then
* null is returned
* @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
* if the operation does not complete in a specified time duration
*/
- public Object executeLocalShardOperation(String shardName, Object message,
- FiniteDuration duration) {
+ public Object executeLocalShardOperation(String shardName, Object message) {
ActorRef local = findLocalShard(shardName);
if(local != null) {
- return executeLocalOperation(local, message, duration);
+ return executeLocalOperation(local, message);
}
return null;
}
return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
- new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
- props.getMaxShardDataChangeExecutorQueueSize(),
- props.getMaxShardDataChangeListenerQueueSize(),
- props.getShardTransactionIdleTimeoutInMinutes()));
+ new DistributedDataStoreProperties(
+ props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+ props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+ props.getMaxShardDataChangeListenerQueueSize().getValue(),
+ props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+ props.getOperationTimeoutInSeconds().getValue()));
}
}
return DistributedDataStoreFactory.createInstance("operational",
getOperationalSchemaServiceDependency(),
- new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
- props.getMaxShardDataChangeExecutorQueueSize(),
- props.getMaxShardDataChangeListenerQueueSize(),
- props.getShardTransactionIdleTimeoutInMinutes()));
+ new DistributedDataStoreProperties(
+ props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+ props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+ props.getMaxShardDataChangeListenerQueueSize().getValue(),
+ props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+ props.getOperationTimeoutInSeconds().getValue()));
}
}
config:java-name-prefix DistributedOperationalDataStoreProvider;
}
+ typedef non-zero-uint16-type {
+ type uint16 {
+ range "1..max";
+ }
+ }
+
+ typedef operation-timeout-type {
+ type uint16 {
+ range "5..max";
+ }
+ }
+
grouping data-store-properties {
leaf max-shard-data-change-executor-queue-size {
default 1000;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum queue size for each shard's data store data change notification executor.";
}
leaf max-shard-data-change-executor-pool-size {
default 20;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum thread pool size for each shard's data store data change notification executor.";
}
leaf max-shard-data-change-listener-queue-size {
default 1000;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum queue size for each shard's data store data change listeners.";
}
leaf shard-transaction-idle-timeout-in-minutes {
default 10;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
}
+
+ leaf operation-timeout-in-seconds {
+ default 5;
+ type operation-timeout-type;
+ description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
+ }
}
// Augments the 'configuration' choice node under modules/module.
ActorContext
testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ .executeLocalOperation(actorRef, "messages");
Assert.assertNotNull(messages);
ActorContext
testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ .executeLocalOperation(actorRef, "messages");
Assert.assertNotNull(messages);
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
).build();
+ Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
+
//This is done so that Modification list is updated which is used during commit
- Future future =
- akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
+ Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
//ready transaction creates the cohort so that we get into the
//block where in commmit is done
ShardTransactionMessages.ReadyTransaction readyTransaction =
ShardTransactionMessages.ReadyTransaction.newBuilder().build();
- future =
- akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
+ future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
//but when the message is sent it will have the MockCommit object
//so that we can simulate throwing of exception
when(mockModification.toSerializable()).thenReturn(
PersistentMessages.CompositeModification.newBuilder().build());
- future =
- akka.pattern.Patterns.ask(subject,
- mockForwardCommitTransaction
- , 3000);
+ future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
Await.result(future, ASK_RESULT_DURATION);
}
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.util.List;
import java.util.concurrent.ExecutionException;
}
stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
- isA(requestType), any(FiniteDuration.class));
+ isA(requestType));
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
- any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+ any(ActorSelection.class), isA(requestType));
}
private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.List;
import java.util.concurrent.TimeUnit;
return getSystem().actorSelection(actorRef.path());
}
- private FiniteDuration anyDuration() {
- return any(FiniteDuration.class);
- }
-
private CreateTransactionReply createTransactionReply(ActorRef actorRef){
return CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorRef.path().toString())
when(mockActorContext).actorSelection(actorRef.path().toString());
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
- eqCreateTransaction(memberName, type), anyDuration());
+ eqCreateTransaction(memberName, type));
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
anyString(), eq(actorRef.path().toString()));
doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
READ_ONLY);
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
throws Throwable {
doThrow(exToThrow).when(mockActorContext).executeShardOperation(
- anyString(), any(), anyDuration());
+ anyString(), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
- anyDuration());
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
} finally {
verify(mockActorContext, times(0)).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
}
}
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(expectedNode));
doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
READ_ONLY);
doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
assertEquals("Exists response", false, exists);
doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
- anyDuration());
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
} finally {
verify(mockActorContext, times(0)).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
}
}
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
verify(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
WriteDataReply.SERIALIZABLE_CLASS);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
verify(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+ eq(actorSelection(actorRef)), eqDeleteData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.delete(TestModel.TEST_PATH);
verify(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+ eq(actorSelection(actorRef)), eqDeleteData());
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
DeleteDataReply.SERIALIZABLE_CLASS);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
- anyDuration());
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeRemoteOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
public void testReadyWithInitialCreateTransactionFailure() throws Exception {
doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
- anyString(), any(), anyDuration());
+ anyString(), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeRemoteOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+ Object out = actorContext.executeLocalShardOperation("default", "hello");
assertEquals("hello", out);
new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+ Object out = actorContext.executeLocalShardOperation("default", "hello");
assertNull(out);
ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds"));
+ Object out = actorContext.executeRemoteOperation(actor, "hello");
assertEquals("hello", out);
ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello",
- Duration.create(3, TimeUnit.SECONDS));
+ Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello");
try {
Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import scala.concurrent.duration.FiniteDuration;
public class MockActorContext extends ActorContext {
@Override public Object executeShardOperation(String shardName,
- Object message, FiniteDuration duration) {
+ Object message) {
return executeShardOperationResponse;
}
@Override public Object executeRemoteOperation(ActorSelection actor,
- Object message, FiniteDuration duration) {
+ Object message) {
return executeRemoteOperationResponse;
}
@Override
public Object executeLocalOperation(ActorRef actor,
- Object message, FiniteDuration duration) {
+ Object message) {
return this.executeLocalOperationResponse;
}
@Override
public Object executeLocalShardOperation(String shardName,
- Object message, FiniteDuration duration) {
+ Object message) {
return this.executeLocalShardOperationResponse;
}
}
ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ .executeLocalOperation(actorRef, "messages");
Assert.assertNotNull(messages);
*/
package org.opendaylight.controller.config.yang.md.sal.dom.impl;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
public final class DomInmemoryDataBrokerModule extends
org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomInmemoryDataBrokerModule {
+ private static final String JMX_BEAN_TYPE = "DOMDataBroker";
+
public DomInmemoryDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
* nothing on success. The executor queue capacity is bounded and, if the capacity is
* reached, subsequent submitted tasks will block the caller.
*/
- Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(
+ ExecutorService listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(
getMaxDataBrokerFutureCallbackPoolSize(), getMaxDataBrokerFutureCallbackQueueSize(),
"CommitFutures");
TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION,
listenableFutureExecutor));
+ final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl(
+ newDataBroker.getCommitStatsTracker(), JMX_BEAN_TYPE);
+ commitStatsMXBean.registerMBean();
+
+ final ThreadExecutorStatsMXBeanImpl commitExecutorStatsMXBean =
+ new ThreadExecutorStatsMXBeanImpl(commitExecutor, "CommitExecutorStats",
+ JMX_BEAN_TYPE, null);
+ commitExecutorStatsMXBean.registerMBean();
+
+ final ThreadExecutorStatsMXBeanImpl commitFutureStatsMXBean =
+ new ThreadExecutorStatsMXBeanImpl(listenableFutureExecutor,
+ "CommitFutureExecutorStats", JMX_BEAN_TYPE, null);
+ commitFutureStatsMXBean.registerMBean();
+
+ newDataBroker.setCloseable(new AutoCloseable() {
+ @Override
+ public void close() {
+ commitStatsMXBean.unregisterMBean();
+ commitExecutorStatsMXBean.unregisterMBean();
+ commitFutureStatsMXBean.unregisterMBean();
+ }
+ });
+
return newDataBroker;
}
}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.DurationStatsTracker;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final DOMDataCommitCoordinatorImpl coordinator;
private final AtomicLong txNum = new AtomicLong();
private final AtomicLong chainNum = new AtomicLong();
+ private volatile AutoCloseable closeable;
public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
final ListeningExecutorService executor) {
this.coordinator = new DOMDataCommitCoordinatorImpl(executor);
}
+ public void setCloseable(AutoCloseable closeable) {
+ this.closeable = closeable;
+ }
+
+ public DurationStatsTracker getCommitStatsTracker() {
+ return coordinator.getCommitStatsTracker();
+ }
+
+ @Override
+ public void close() {
+ super.close();
+
+ if(closeable != null) {
+ try {
+ closeable.close();
+ } catch(Exception e) {
+ LOG.debug("Error closing instance", e);
+ }
+ }
+ }
+
@Override
protected Object newTransactionIdentifier() {
return "DOM-" + txNum.getAndIncrement();
LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
}
-
}
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.DurationStatsTracker;
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final ListeningExecutorService executor;
+ private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
+
/**
*
* Construct DOMDataCommitCoordinator which uses supplied executor to
this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
}
+ public DurationStatsTracker getCommitStatsTracker() {
+ return commitStatsTracker;
+ }
+
@Override
public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
ListenableFuture<Void> commitFuture = null;
try {
- commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener));
+ commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
+ listener, commitStatsTracker));
} catch(RejectedExecutionException e) {
LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
executor, e);
private final DOMDataWriteTransaction tx;
private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
+ private final DurationStatsTracker commitStatTracker;
@GuardedBy("this")
private CommitPhase currentPhase;
public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
- final Optional<DOMDataCommitErrorListener> listener) {
+ final Optional<DOMDataCommitErrorListener> listener,
+ final DurationStatsTracker commitStatTracker) {
this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
this.currentPhase = CommitPhase.SUBMITTED;
+ this.commitStatTracker = commitStatTracker;
}
@Override
public Void call() throws TransactionCommitFailedException {
+ long startTime = System.nanoTime();
try {
canCommitBlocking();
preCommitBlocking();
LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
abortBlocking(e);
throw e;
+ } finally {
+ if(commitStatTracker != null) {
+ commitStatTracker.addDuration(System.nanoTime() - startTime);
+ }
}
}
}
}
+ @Override
+ public String toString() {
+ return getDelegate().getClass().getName();
+ }
+
static final class TranslatingConfigListenerInvoker extends TranslatingListenerInvoker {
public TranslatingConfigListenerInvoker(final DataChangeListener listener, final DataNormalizer normalizer) {
super(listener, normalizer);
}
+ @Override
DataChangeEvent<YangInstanceIdentifier, CompositeNode> getLegacyEvent(final DataNormalizer normalizer, final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedChange) {
return TranslatingDataChangeEvent.createConfiguration(normalizedChange, normalizer);
}
super(listener, normalizer);
}
+ @Override
DataChangeEvent<YangInstanceIdentifier, CompositeNode> getLegacyEvent(final DataNormalizer normalizer, final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedChange) {
return TranslatingDataChangeEvent.createOperational(normalizedChange, normalizer);
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.jmx;
+
+/**
+ * MXBean interface for retrieving write Tx commit statistics.
+ *
+ * @author Thomas Pantelis
+ */
+public interface CommitStatsMXBean {
+
+ /**
+ * Returns the total number of commits that have occurred.
+ */
+ long getTotalCommits();
+
+ /**
+ * Returns a string representing the time duration of the longest commit, in the appropriate
+ * scaled units, along with the date/time that it occurred.
+ */
+ String getLongestCommitTime();
+
+ /**
+ * Returns a string representing the time duration of the shortest commit, in the appropriate
+ * scaled units, along with the date/time that it occurred.
+ */
+ String getShortestCommitTime();
+
+ /**
+ * Returns a string representing average commit time duration, in the appropriate
+ * scaled units.
+ */
+ String getAverageCommitTime();
+
+ /**
+ * Clears the current stats to their defaults.
+ */
+ void clearStats();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.jmx;
+
+import javax.annotation.Nonnull;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.yangtools.util.DurationStatsTracker;
+
+/**
+ * Implementation of the CommitStatsMXBean interface.
+ *
+ * @author Thomas Pantelis
+ */
+public class CommitStatsMXBeanImpl extends AbstractMXBean implements CommitStatsMXBean {
+
+ private final DurationStatsTracker commitStatsTracker;
+
+ /**
+ * Constructor.
+ *
+ * @param commitStatsTracker the DurationStatsTracker used to obtain the stats.
+ * @param mBeanType mBeanType Used as the <code>type</code> property in the bean's ObjectName.
+ */
+ public CommitStatsMXBeanImpl(@Nonnull DurationStatsTracker commitStatsTracker,
+ @Nonnull String mBeanType) {
+ super("CommitStats", mBeanType, null);
+ this.commitStatsTracker = commitStatsTracker;
+ }
+
+ @Override
+ public long getTotalCommits() {
+ return commitStatsTracker.getTotalDurations();
+ }
+
+ @Override
+ public String getLongestCommitTime() {
+ return commitStatsTracker.getDisplayableLongestDuration();
+ }
+
+ @Override
+ public String getShortestCommitTime() {
+ return commitStatsTracker.getDisplayableShortestDuration();
+ }
+
+ @Override
+ public String getAverageCommitTime() {
+ return commitStatsTracker.getDisplayableAverageDuration();
+ }
+
+ @Override
+ public void clearStats() {
+ commitStatsTracker.reset();
+ }
+}
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import org.opendaylight.controller.sal.dom.broker.impl.SchemaContextProvider;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
public class GlobalBundleScanningSchemaServiceImpl implements SchemaContextProvider, SchemaService, ServiceTrackerCustomizer<SchemaContextListener, SchemaContextListener>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(GlobalBundleScanningSchemaServiceImpl.class);
@VisibleForTesting
public static synchronized void destroyInstance() {
- instance = null;
+ try {
+ instance.close();
+ } finally {
+ instance = null;
+ }
}
public BundleContext getContext() {
}
@Override
- public void close() throws Exception {
+ public void close() {
if (bundleTracker != null) {
bundleTracker.close();
}
if (listenerTracker != null) {
listenerTracker.close();
}
- // FIXME: Add listeners.close();
- }
+ for (ListenerRegistration<SchemaContextListener> l : listeners.getListeners()) {
+ l.close();
+ }
+ }
private synchronized void updateContext(final SchemaContext snapshot) {
Object[] services = listenerTracker.getServices();
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.jmx;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.opendaylight.yangtools.util.DurationStatsTracker;
+
+/**
+ * Unit tests for CommitStatsMXBeanImpl.
+ *
+ * @author Thomas Pantelis
+ */
+public class CommitStatsMXBeanImplTest {
+
+ @Test
+ public void test() {
+
+ DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
+ CommitStatsMXBeanImpl bean =
+ new CommitStatsMXBeanImpl(commitStatsTracker, "Test");
+
+ commitStatsTracker.addDuration(100);
+
+ String prefix = "100.0 ns";
+ assertEquals("getTotalCommits", 1L, bean.getTotalCommits());
+ assertEquals("getLongestCommitTime starts with \"" + prefix + "\"", true,
+ bean.getLongestCommitTime().startsWith("100.0 ns"));
+ assertEquals("getShortestCommitTime starts with \"" + prefix + "\"", true,
+ bean.getShortestCommitTime().startsWith(prefix));
+ assertEquals("getAverageCommitTime starts with \"" + prefix + "\"", true,
+ bean.getAverageCommitTime().startsWith(prefix));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.core.spi.data.statistics;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nonnull;
+
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+
+/**
+ * Interface for a class that tracks statistics for a data store.
+ *
+ * @author Thomas Pantelis
+ */
+public interface DOMStoreStatsTracker {
+
+ /**
+ * Sets the executor used for DataChangeListener notifications.
+ *
+ * @param dclExecutor the executor
+ */
+ void setDataChangeListenerExecutor( @Nonnull ExecutorService dclExecutor );
+
+ /**
+ * Sets the executor used internally by the data store.
+ *
+ * @param dsExecutor the executor
+ */
+ void setDataStoreExecutor( @Nonnull ExecutorService dsExecutor );
+
+ /**
+ * Sets the QueuedNotificationManager use for DataChangeListener notifications,
+ *
+ * @param manager the manager
+ */
+ void setNotificationManager( @Nonnull QueuedNotificationManager<?, ?> manager );
+}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.jmx.InMemoryDataStoreStats;
public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency(),
+
+ InMemoryDOMDataStore dataStore = InMemoryDOMDataStoreFactory.create(
+ "DOM-CFG", getSchemaServiceDependency(),
InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
- getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize()));
+ getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
+ getMaxDataStoreExecutorQueueSize()));
+
+ InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryConfigDataStore",
+ dataStore.getDataChangeListenerNotificationManager(), dataStore.getDomStoreExecutor());
+
+ dataStore.setCloseable(statsBean);
+
+ return dataStore;
}
}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.jmx.InMemoryDataStoreStats;
public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return InMemoryDOMDataStoreFactory.create("DOM-OPER", getSchemaServiceDependency(),
+ InMemoryDOMDataStore dataStore = InMemoryDOMDataStoreFactory.create("DOM-OPER", getSchemaServiceDependency(),
InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
- getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize()));
- }
+ getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
+ getMaxDataStoreExecutorQueueSize()));
+
+
+ InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryOperationalDataStore",
+ dataStore.getDataChangeListenerNotificationManager(), dataStore.getDomStoreExecutor());
+ dataStore.setCloseable(statsBean);
+
+ return dataStore;
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.store.impl;
-
-import com.google.common.base.Preconditions;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.util.concurrent.NotificationManager;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class ChangeListenerNotifyTask implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
-
- @SuppressWarnings("rawtypes")
- private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr;
- private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
- private final DataChangeListenerRegistration<?> listener;
-
- @SuppressWarnings("rawtypes")
- public ChangeListenerNotifyTask(final DataChangeListenerRegistration<?> listener,
- final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event,
- final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr) {
- this.notificationMgr = Preconditions.checkNotNull(notificationMgr);
- this.listener = Preconditions.checkNotNull(listener);
- this.event = Preconditions.checkNotNull(event);
- }
-
- @Override
- public void run() {
- final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> l = listener.getInstance();
- if (l == null) {
- LOG.trace("Skipping event delivery to unregistered listener {}", l);
- return;
- }
- LOG.trace("Listener {} event {}", l, event);
-
- // FIXME: Yo dawg I heard you like queues, so this was queued to be queued
- notificationMgr.submitNotification(l, event);
- }
-
- @Override
- public String toString() {
- return "ChangeListenerNotifyTask [listener=" + listener + ", event=" + event + "]";
- }
-}
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
-import org.opendaylight.yangtools.util.ExecutorServiceUtil;
-import org.opendaylight.yangtools.util.concurrent.NotificationManager;
-import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.ExecutorServiceUtil;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.GuardedBy;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.google.common.base.Preconditions.checkState;
-
/**
* In-memory DOM Data Store
*
public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
TransactionReadyPrototype,AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+ private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
- @SuppressWarnings("rawtypes")
- private static final QueuedNotificationManager.Invoker<AsyncDataChangeListener,
- AsyncDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
- new QueuedNotificationManager.Invoker<AsyncDataChangeListener,
- AsyncDataChangeEvent>() {
-
- @SuppressWarnings("unchecked")
+ private static final Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
+ new Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>() {
@Override
- public void invokeListener( AsyncDataChangeListener listener,
- AsyncDataChangeEvent notification ) {
- listener.onDataChanged(notification);
+ public void invokeListener(final DataChangeListenerRegistration<?> listener,
+ final DOMImmutableDataChangeEvent notification ) {
+ final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> inst = listener.getInstance();
+ if (inst != null) {
+ inst.onDataChanged(notification);
+ }
}
};
private final AtomicLong txCounter = new AtomicLong(0);
private final ListeningExecutorService listeningExecutor;
- @SuppressWarnings("rawtypes")
- private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
- dataChangeListenerNotificationManager;
+ private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
private final ExecutorService dataChangeListenerExecutor;
+ private final ExecutorService domStoreExecutor;
+
private final String name;
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
+ private volatile AutoCloseable closeable;
+
+ public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
final ExecutorService dataChangeListenerExecutor) {
- this(name, listeningExecutor, dataChangeListenerExecutor,
- InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
+ this(name, domStoreExecutor, dataChangeListenerExecutor,
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
}
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
- final ExecutorService dataChangeListenerExecutor, int maxDataChangeListenerQueueSize) {
+ public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
+ final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize) {
this.name = Preconditions.checkNotNull(name);
- this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
-
+ this.domStoreExecutor = Preconditions.checkNotNull(domStoreExecutor);
+ this.listeningExecutor = MoreExecutors.listeningDecorator(this.domStoreExecutor);
this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
dataChangeListenerNotificationManager =
"DataChangeListenerQueueMgr");
}
+ public void setCloseable(AutoCloseable closeable) {
+ this.closeable = closeable;
+ }
+
+ public QueuedNotificationManager<?, ?> getDataChangeListenerNotificationManager() {
+ return dataChangeListenerNotificationManager;
+ }
+
+ public ExecutorService getDomStoreExecutor() {
+ return domStoreExecutor;
+ }
+
@Override
public final String getIdentifier() {
return name;
public void close() {
ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
+
+ if(closeable != null) {
+ try {
+ closeable.close();
+ } catch(Exception e) {
+ LOG.debug("Error closing instance", e);
+ }
+ }
}
+
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
.addCreated(path, data) //
.build();
- new ChangeListenerNotifyTask(reg, event,
- dataChangeListenerNotificationManager).run();
+ dataChangeListenerNotificationManager.submitNotification(reg, event);
}
}
}
public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
- // If commited transaction is latestOutstandingTx we clear
+ // If committed transaction is latestOutstandingTx we clear
// latestOutstandingTx
// field in order to base new transactions on Datastore Data Tree
// directly.
@Override
public Void call() {
candidate = dataTree.prepare(modification);
- listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree,
- dataChangeListenerNotificationManager);
+ listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
return null;
}
});
@Override
public ListenableFuture<Void> abort() {
candidate = null;
- return Futures.immediateFuture(null);
+ return SUCCESSFUL_FUTURE;
}
@Override
*/
synchronized (this) {
dataTree.commit(candidate);
-
- for (ChangeListenerNotifyTask task : listenerResolver.call()) {
- LOG.trace("Scheduling invocation of listeners: {}", task);
- task.run();
- }
+ listenerResolver.resolve(dataChangeListenerNotificationManager);
}
- return Futures.immediateFuture(null);
+ return SUCCESSFUL_FUTURE;
}
}
}
public static final int DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE = 1000;
public static final int DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE = 20;
public static final int DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE = 1000;
+ public static final int DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE = 5000;
private static final InMemoryDOMDataStoreConfigProperties DEFAULT =
create(DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE,
DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE,
- DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
+ DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE,
+ DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE);
private final int maxDataChangeExecutorQueueSize;
private final int maxDataChangeExecutorPoolSize;
private final int maxDataChangeListenerQueueSize;
+ private final int maxDataStoreExecutorQueueSize;
/**
* Constructs an instance with the given property values.
* maximum queue size for the data change notification executor.
* @param maxDataChangeListenerQueueSize
* maximum queue size for the data change listeners.
+ * @param maxDataStoreExecutorQueueSize
+ * maximum queue size for the data store executor.
*/
+ public static InMemoryDOMDataStoreConfigProperties create(int maxDataChangeExecutorPoolSize,
+ int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize,
+ int maxDataStoreExecutorQueueSize) {
+ return new InMemoryDOMDataStoreConfigProperties(maxDataChangeExecutorPoolSize,
+ maxDataChangeExecutorQueueSize, maxDataChangeListenerQueueSize,
+ maxDataStoreExecutorQueueSize);
+ }
+
public static InMemoryDOMDataStoreConfigProperties create(int maxDataChangeExecutorPoolSize,
int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize) {
return new InMemoryDOMDataStoreConfigProperties(maxDataChangeExecutorPoolSize,
- maxDataChangeExecutorQueueSize, maxDataChangeListenerQueueSize);
+ maxDataChangeExecutorQueueSize, maxDataChangeListenerQueueSize,
+ DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE);
}
/**
}
private InMemoryDOMDataStoreConfigProperties(int maxDataChangeExecutorPoolSize,
- int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize) {
+ int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize,
+ int maxDataStoreExecutorQueueSize) {
this.maxDataChangeExecutorQueueSize = maxDataChangeExecutorQueueSize;
this.maxDataChangeExecutorPoolSize = maxDataChangeExecutorPoolSize;
this.maxDataChangeListenerQueueSize = maxDataChangeListenerQueueSize;
+ this.maxDataStoreExecutorQueueSize = maxDataStoreExecutorQueueSize;
}
/**
public int getMaxDataChangeListenerQueueSize() {
return maxDataChangeListenerQueueSize;
}
+
+ /**
+ * Returns the maximum queue size for the data store executor.
+ */
+ public int getMaxDataStoreExecutorQueueSize() {
+ return maxDataStoreExecutorQueueSize;
+ }
}
package org.opendaylight.controller.md.sal.dom.store.impl;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import com.google.common.util.concurrent.MoreExecutors;
/**
* A factory for creating InMemoryDOMDataStore instances.
ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
+ ExecutorService domStoreExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
+ actualProperties.getMaxDataStoreExecutorQueueSize(), "DOMStore-" + name );
+
InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
- MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()),
- dataChangeListenerExecutor, actualProperties.getMaxDataChangeListenerQueueSize());
+ domStoreExecutor, dataChangeListenerExecutor,
+ actualProperties.getMaxDataChangeListenerQueueSize());
if(schemaService != null) {
schemaService.registerSchemaContextListener(dataStore);
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Map.Entry;
-import java.util.concurrent.Callable;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.SimpleEventFactory;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
* Computes data change events for all affected registered listeners in data
* tree.
*/
-final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListenerNotifyTask>> {
+final class ResolveDataChangeEventsTask {
private static final Logger LOG = LoggerFactory.getLogger(ResolveDataChangeEventsTask.class);
- @SuppressWarnings("rawtypes")
- private final NotificationManager<AsyncDataChangeListener, AsyncDataChangeEvent> notificationMgr;
private final DataTreeCandidate candidate;
private final ListenerTree listenerRoot;
private Multimap<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> collectedEvents;
- @SuppressWarnings("rawtypes")
- public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree,
- final NotificationManager<AsyncDataChangeListener, AsyncDataChangeEvent> notificationMgr) {
+ public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
this.candidate = Preconditions.checkNotNull(candidate);
this.listenerRoot = Preconditions.checkNotNull(listenerTree);
- this.notificationMgr = Preconditions.checkNotNull(notificationMgr);
}
/**
- * Resolves and creates Notification Tasks
- *
- * Implementation of done as Map-Reduce with two steps: 1. resolving events
- * and their mapping to listeners 2. merging events affecting same listener
- *
- * @return An {@link Iterable} of Notification Tasks which needs to be executed in
- * order to delivery data change events.
+ * Resolves and submits notification tasks to the specified manager.
*/
- @Override
- public synchronized Iterable<ChangeListenerNotifyTask> call() {
+ public synchronized void resolve(final NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> manager) {
try (final Walker w = listenerRoot.getWalker()) {
// Defensive: reset internal state
collectedEvents = ArrayListMultimap.create();
* Convert to tasks, but be mindful of multiple values -- those indicate multiple
* wildcard matches, which need to be merged.
*/
- final Collection<ChangeListenerNotifyTask> ret = new ArrayList<>();
for (Entry<DataChangeListenerRegistration<?>, Collection<DOMImmutableDataChangeEvent>> e : collectedEvents.asMap().entrySet()) {
final Collection<DOMImmutableDataChangeEvent> col = e.getValue();
final DOMImmutableDataChangeEvent event;
event = col.iterator().next();
}
- ret.add(new ChangeListenerNotifyTask(e.getKey(), event, notificationMgr));
+ manager.submitNotification(e.getKey(), event);
}
-
- // FIXME: so now we have tasks to submit tasks... Inception-style!
- LOG.debug("Created tasks {}", ret);
- return ret;
}
}
return scope != null;
}
- @SuppressWarnings("rawtypes")
- public static ResolveDataChangeEventsTask create(final DataTreeCandidate candidate,
- final ListenerTree listenerTree,
- final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr) {
- return new ResolveDataChangeEventsTask(candidate, listenerTree, notificationMgr);
+ public static ResolveDataChangeEventsTask create(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
+ return new ResolveDataChangeEventsTask(candidate, listenerTree);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.store.impl.jmx;
+
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+
+/**
+ * Wrapper class for data store MXbeans.
+ *
+ * @author Thomas Pantelis
+ */
+public class InMemoryDataStoreStats implements AutoCloseable {
+
+ private final ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
+ private final ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
+ private final QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
+
+ public InMemoryDataStoreStats(String mBeanType, QueuedNotificationManager<?, ?> manager,
+ ExecutorService dataStoreExecutor) {
+
+ this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
+ "notification-manager", mBeanType, null);
+ notificationManagerStatsBean.registerMBean();
+
+ this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+ "notification-executor", mBeanType, null);
+ this.notificationExecutorStatsBean.registerMBean();
+
+ this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dataStoreExecutor,
+ "data-store-executor", mBeanType, null);
+ this.dataStoreExecutorStatsBean.registerMBean();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if(notificationExecutorStatsBean != null) {
+ notificationExecutorStatsBean.unregisterMBean();
+ }
+
+ if(dataStoreExecutorStatsBean != null) {
+ dataStoreExecutorStatsBean.unregisterMBean();
+ }
+
+ if(notificationManagerStatsBean != null) {
+ notificationManagerStatsBean.unregisterMBean();
+ }
+ }
+}
type uint16;
description "The maximum queue size for the data change listeners.";
}
+
+ leaf max-data-store-executor-queue-size {
+ default 5000;
+ type uint16;
+ description "The maximum queue size for the data store executor.";
+ }
}
// Augments the 'configuration' choice node under modules/module.
QName childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_FORMAT.withoutRevision();
- final String formatAsString = getSingleChildNodeValue(schemaNode, childNode).get();
+ String formatAsString = getSingleChildNodeValue(schemaNode, childNode).get();
+ //This is HotFix for situations where format statement in netconf-monitoring might be passed with prefix.
+ if (formatAsString.contains(":")) {
+ String[] prefixedString = formatAsString.split(":");
+ //FIXME: might be good idea to check prefix against model namespace
+ formatAsString = prefixedString[1];
+ }
if(formatAsString.equals(Yang.QNAME.getLocalName()) == false) {
logger.debug("{}: Ignoring schema due to unsupported format: {}", id, formatAsString);
return Optional.absent();
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
<Export-package></Export-package>
<Private-Package></Private-Package>
- <Import-Package>!org.iq80.*;!*snappy;!org.jboss.*;!com.jcraft.*;!org.fusesource.*;*</Import-Package>
+ <Import-Package>!org.iq80.*;!*snappy;!org.jboss.*;!com.jcraft.*;!org.fusesource.*;!*jetty*;!sun.security.*;*</Import-Package>
<Embed-Dependency>
sal-clustering-commons;
sal-akka-raft;
package org.opendaylight.controller.config.yang.md.sal.rest.connector;
-import org.opendaylight.controller.sal.rest.impl.RestconfProviderImpl;
+import org.opendaylight.controller.sal.restconf.impl.RestconfProviderImpl;
public class RestConnectorModule extends org.opendaylight.controller.config.yang.md.sal.rest.connector.AbstractRestConnectorModule {
instance.setWebsocketPort(getWebsocketPort());
// Register it with the Broker
getDomBrokerDependency().registerProvider(instance);
+
+
+ getRootRuntimeBeanRegistratorWrapper().register(instance);
+
return instance;
}
}
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.net.URI;
+import java.util.Iterator;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import org.opendaylight.controller.sal.restconf.impl.InstanceIdentifierContext;
import org.opendaylight.controller.sal.restconf.impl.NormalizedNodeContext;
import org.opendaylight.controller.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
throw new RestconfDocumentedException(Response.Status.NOT_FOUND);
}
-
+ boolean isDataRoot = false;
URI initialNs = null;
outputWriter.write('{');
- if (!SchemaPath.ROOT.equals(path)) {
+ if (SchemaPath.ROOT.equals(path)) {
+ isDataRoot = true;
+ } else {
path = path.getParent();
// FIXME: Add proper handling of reading root.
}
- if(data instanceof MapEntryNode) {
- data = ImmutableNodes.mapNodeBuilder(data.getNodeType()).withChild(((MapEntryNode) data)).build();
- }
if(!schema.isAugmenting() && !(schema instanceof SchemaContext)) {
initialNs = schema.getQName().getNamespace();
}
NormalizedNodeStreamWriter jsonWriter = JSONNormalizedNodeStreamWriter.create(context.getSchemaContext(),path,initialNs,outputWriter);
NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(jsonWriter);
- nnWriter.write(data);
+ if(isDataRoot) {
+ writeDataRoot(outputWriter,nnWriter,(ContainerNode) data);
+ } else {
+ if(data instanceof MapEntryNode) {
+ data = ImmutableNodes.mapNodeBuilder(data.getNodeType()).withChild(((MapEntryNode) data)).build();
+ }
+ nnWriter.write(data);
+ }
nnWriter.flush();
-
outputWriter.write('}');
outputWriter.flush();
}
+ private void writeDataRoot(OutputStreamWriter outputWriter, NormalizedNodeWriter nnWriter, ContainerNode data) throws IOException {
+ Iterator<DataContainerChild<? extends PathArgument, ?>> iterator = data.getValue().iterator();
+ while(iterator.hasNext()) {
+ DataContainerChild<? extends PathArgument, ?> child = iterator.next();
+ nnWriter.write(child);
+ nnWriter.flush();
+ if(iterator.hasNext()) {
+ outputWriter.write(",");
+ }
+ }
+ }
+
}
*/
package org.opendaylight.controller.sal.rest.impl;
+import com.google.common.base.Throwables;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import org.opendaylight.controller.sal.restconf.impl.InstanceIdentifierContext;
import org.opendaylight.controller.sal.restconf.impl.NormalizedNodeContext;
import org.opendaylight.controller.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
@Provider
@Produces({ Draft02.MediaTypes.API + RestconfService.XML, Draft02.MediaTypes.DATA + RestconfService.XML,
- Draft02.MediaTypes.OPERATION + RestconfService.XML, MediaType.APPLICATION_XML, MediaType.TEXT_XML })
-
+ Draft02.MediaTypes.OPERATION + RestconfService.XML, MediaType.APPLICATION_XML, MediaType.TEXT_XML })
public class NormalizedNodeXmlBodyWriter implements MessageBodyWriter<NormalizedNodeContext> {
-
private static final XMLOutputFactory XML_FACTORY;
static {
- XML_FACTORY = XMLOutputFactory.newFactory();
+ XML_FACTORY = XMLOutputFactory.newFactory();
XML_FACTORY.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
}
-
@Override
- public boolean isWriteable(final Class<?> type, final Type genericType, final Annotation[] annotations, final MediaType mediaType) {
+ public boolean isWriteable(final Class<?> type, final Type genericType, final Annotation[] annotations,
+ final MediaType mediaType) {
return type.equals(NormalizedNodeContext.class);
}
@Override
- public long getSize(final NormalizedNodeContext t, final Class<?> type, final Type genericType, final Annotation[] annotations, final MediaType mediaType) {
+ public long getSize(final NormalizedNodeContext t, final Class<?> type, final Type genericType,
+ final Annotation[] annotations, final MediaType mediaType) {
return -1;
}
@Override
- public void writeTo(final NormalizedNodeContext t, final Class<?> type, final Type genericType, final Annotation[] annotations,
- final MediaType mediaType, final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream)
- throws IOException, WebApplicationException {
+ public void writeTo(final NormalizedNodeContext t, final Class<?> type, final Type genericType,
+ final Annotation[] annotations, final MediaType mediaType,
+ final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream) throws IOException,
+ WebApplicationException {
InstanceIdentifierContext pathContext = t.getInstanceIdentifierContext();
if (t.getData() == null) {
throw new RestconfDocumentedException(Response.Status.NOT_FOUND);
throw new IllegalStateException(e);
}
NormalizedNode<?, ?> data = t.getData();
- SchemaPath schemaPath = pathContext.getSchemaNode().getPath().getParent();
- if(data instanceof MapEntryNode) {
- data = ImmutableNodes.mapNodeBuilder(data.getNodeType()).addChild((MapEntryNode) data).build();
- //schemaPath = pathContext.getSchemaNode().getPath();
+ SchemaPath schemaPath = pathContext.getSchemaNode().getPath();
+
+ boolean isDataRoot = false;
+ if (SchemaPath.ROOT.equals(schemaPath)) {
+ isDataRoot = true;
+ } else {
+ schemaPath = schemaPath.getParent();
}
- NormalizedNodeStreamWriter jsonWriter = XMLStreamNormalizedNodeStreamWriter.create(xmlWriter,pathContext.getSchemaContext(),schemaPath);
+ NormalizedNodeStreamWriter jsonWriter = XMLStreamNormalizedNodeStreamWriter.create(xmlWriter,
+ pathContext.getSchemaContext(), schemaPath);
NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(jsonWriter);
+ if (isDataRoot) {
+ writeRootElement(xmlWriter, nnWriter, (ContainerNode) data);
+ } else {
+ if (data instanceof MapEntryNode) {
+ // Restconf allows returning one list item. We need to wrap it
+ // in map node in order to serialize it properly
+ data = ImmutableNodes.mapNodeBuilder(data.getNodeType()).addChild((MapEntryNode) data).build();
+ }
+ nnWriter.write(data);
+ nnWriter.flush();
+ }
+ }
- nnWriter.write(data);
- nnWriter.flush();
+ private void writeRootElement(XMLStreamWriter xmlWriter, NormalizedNodeWriter nnWriter, ContainerNode data)
+ throws IOException {
+ try {
+ QName name = SchemaContext.NAME;
+ xmlWriter.writeStartElement(name.getNamespace().toString(), name.getLocalName());
+ for (DataContainerChild<? extends PathArgument, ?> child : data.getValue()) {
+ nnWriter.write(child);
+ }
+ nnWriter.flush();
+ xmlWriter.writeEndElement();
+ xmlWriter.flush();
+ } catch (XMLStreamException e) {
+ Throwables.propagate(e);
+ }
}
}
import org.opendaylight.controller.sal.restconf.impl.BrokerFacade;
import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
import org.opendaylight.controller.sal.restconf.impl.RestconfImpl;
+import org.opendaylight.controller.sal.restconf.impl.StatisticsRestconfServiceWrapper;
public class RestconfApplication extends Application {
restconfImpl.setControllerContext(controllerContext);
singletons.add(controllerContext);
singletons.add(brokerFacade);
- singletons.add(restconfImpl);
+ singletons.add(StatisticsRestconfServiceWrapper.getInstance());
singletons.add(StructuredDataToXmlProvider.INSTANCE);
singletons.add(StructuredDataToJsonProvider.INSTANCE);
singletons.add(JsonToCompositeNodeProvider.INSTANCE);
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import java.math.BigInteger;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.slf4j.LoggerFactory;
public class RestconfImpl implements RestconfService {
+
private enum UriParameters {
PRETTY_PRINT("prettyPrint"),
DEPTH("depth");
}
}
+
+
private final static RestconfImpl INSTANCE = new RestconfImpl();
private static final int NOTIFICATION_PORT = 8181;
}
return false;
}
+
+ public BigInteger getOperationalReceived() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.rest.impl;
+package org.opendaylight.controller.sal.restconf.impl;
+import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Config;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Get;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Operational;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Post;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Put;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.RestConnectorRuntimeMXBean;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Rpcs;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.rest.api.RestConnector;
-import org.opendaylight.controller.sal.restconf.impl.BrokerFacade;
-import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
import org.opendaylight.controller.sal.streams.websockets.WebSocketServer;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
-public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnector {
+public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnector, RestConnectorRuntimeMXBean {
public final static String NOT_INITALIZED_MSG = "Restconf is not initialized yet. Please try again later";
+ private final StatisticsRestconfServiceWrapper stats = StatisticsRestconfServiceWrapper.getInstance();
private ListenerRegistration<SchemaContextListener> listenerRegistration;
private PortNumber port;
public void setWebsocketPort(PortNumber port) {
}
webSocketServerThread.interrupt();
}
+
+ @Override
+ public Config getConfig() {
+ Config config = new Config();
+ Get get = new Get();
+ get.setReceivedRequests(stats.getConfigGet());
+ config.setGet(get);
+ Post post = new Post();
+ post.setReceivedRequests(stats.getConfigPost());
+ config.setPost(post);
+ Put put = new Put();
+ put.setReceivedRequests(stats.getConfigPut());
+ config.setPut(put);
+ return config;
+ }
+
+ @Override
+ public Operational getOperational() {
+ BigInteger opGet = stats.getOperationalGet();
+ Operational operational = new Operational();
+ Get get = new Get();
+ get.setReceivedRequests(opGet);
+ operational.setGet(get);
+ return operational;
+ }
+
+ @Override
+ public Rpcs getRpcs() {
+ BigInteger rpcInvoke = stats.getRpc();
+ Rpcs rpcs = new Rpcs();
+ rpcs.setReceivedRequests(rpcInvoke);
+ return rpcs ;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.impl;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import org.opendaylight.controller.sal.rest.api.RestconfService;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+
+public class StatisticsRestconfServiceWrapper implements RestconfService {
+
+ AtomicLong operationalGet = new AtomicLong();
+ AtomicLong configGet = new AtomicLong();
+ AtomicLong rpc = new AtomicLong();
+ AtomicLong configPost = new AtomicLong();
+ AtomicLong configPut = new AtomicLong();
+ AtomicLong configDelete = new AtomicLong();
+
+ private static final StatisticsRestconfServiceWrapper INSTANCE = new StatisticsRestconfServiceWrapper(RestconfImpl.getInstance());
+
+ final RestconfService delegate;
+
+ private StatisticsRestconfServiceWrapper(RestconfService delegate) {
+ this.delegate = delegate;
+ }
+
+ public static StatisticsRestconfServiceWrapper getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public Object getRoot() {
+ return delegate.getRoot();
+ }
+
+ @Override
+ public StructuredData getModules(UriInfo uriInfo) {
+ return delegate.getModules(uriInfo);
+ }
+
+ @Override
+ public StructuredData getModules(String identifier, UriInfo uriInfo) {
+ return delegate.getModules(identifier, uriInfo);
+ }
+
+ @Override
+ public StructuredData getModule(String identifier, UriInfo uriInfo) {
+ return delegate.getModule(identifier, uriInfo);
+ }
+
+ @Override
+ public StructuredData getOperations(UriInfo uriInfo) {
+ return delegate.getOperations(uriInfo);
+ }
+
+ @Override
+ public StructuredData getOperations(String identifier, UriInfo uriInfo) {
+ return delegate.getOperations(identifier, uriInfo);
+ }
+
+ @Override
+ public StructuredData invokeRpc(String identifier, CompositeNode payload, UriInfo uriInfo) {
+ rpc.incrementAndGet();
+ return delegate.invokeRpc(identifier, payload, uriInfo);
+ }
+
+ @Override
+ public StructuredData invokeRpc(String identifier, String noPayload, UriInfo uriInfo) {
+ rpc.incrementAndGet();
+ return delegate.invokeRpc(identifier, noPayload, uriInfo);
+ }
+
+ @Override
+ public NormalizedNodeContext readConfigurationData(String identifier, UriInfo uriInfo) {
+ configGet.incrementAndGet();
+ return delegate.readConfigurationData(identifier, uriInfo);
+ }
+
+ @Override
+ public NormalizedNodeContext readOperationalData(String identifier, UriInfo uriInfo) {
+ operationalGet.incrementAndGet();
+ return delegate.readOperationalData(identifier, uriInfo);
+ }
+
+ @Override
+ public Response updateConfigurationData(String identifier, Node<?> payload) {
+ configPut.incrementAndGet();
+ return delegate.updateConfigurationData(identifier, payload);
+ }
+
+ @Override
+ public Response createConfigurationData(String identifier, Node<?> payload) {
+ configPost.incrementAndGet();
+ return delegate.createConfigurationData(identifier, payload);
+ }
+
+ @Override
+ public Response createConfigurationData(Node<?> payload) {
+ configPost.incrementAndGet();
+ return delegate.createConfigurationData(payload);
+ }
+
+ @Override
+ public Response deleteConfigurationData(String identifier) {
+ return delegate.deleteConfigurationData(identifier);
+ }
+
+ @Override
+ public Response subscribeToStream(String identifier, UriInfo uriInfo) {
+ return delegate.subscribeToStream(identifier, uriInfo);
+ }
+
+ @Override
+ public StructuredData getAvailableStreams(UriInfo uriInfo) {
+ return delegate.getAvailableStreams(uriInfo);
+ }
+
+ public BigInteger getConfigDelete() {
+ return BigInteger.valueOf(configDelete.get());
+ }
+
+ public BigInteger getConfigGet() {
+ return BigInteger.valueOf(configGet.get());
+ }
+
+ public BigInteger getConfigPost() {
+ return BigInteger.valueOf(configPost.get());
+ }
+
+ public BigInteger getConfigPut() {
+ return BigInteger.valueOf(configPut.get());
+ }
+
+ public BigInteger getOperationalGet() {
+ return BigInteger.valueOf(operationalGet.get());
+ }
+
+ public BigInteger getRpc() {
+ return BigInteger.valueOf(rpc.get());
+ }
+
+}
config:java-name-prefix RestConnector;
}
+ grouping statistics {
+ leaf received-requests {
+ type uint64;
+ }
+ }
+
augment "/config:modules/config:module/config:configuration" {
case rest-connector-impl {
when "/config:modules/config:module/config:type = 'rest-connector-impl'";
}
}
}
+
+ augment "/config:modules/config:module/config:state" {
+ case rest-connector-impl {
+ when "/config:modules/config:module/config:type = 'rest-connector-impl'";
+ container rpcs {
+ uses statistics;
+ }
+
+ container config {
+ container get {
+ uses statistics;
+ }
+
+ container post {
+ uses statistics;
+ }
+
+ container put {
+ uses statistics;
+ }
+ }
+
+ container operational {
+ container get {
+ uses statistics;
+ }
+ }
+ }
+ }
}
\ No newline at end of file
*/
package org.opendaylight.controller.netconf.it;
-import java.net.InetSocketAddress;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anySetOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.opendaylight.controller.config.manager.impl.AbstractConfigTest;
+import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
+import org.opendaylight.controller.config.spi.ModuleFactory;
+import org.opendaylight.controller.config.yang.test.impl.DepTestImplModuleFactory;
+import org.opendaylight.controller.config.yang.test.impl.IdentityTestModuleFactory;
+import org.opendaylight.controller.config.yang.test.impl.MultipleDependenciesModuleFactory;
+import org.opendaylight.controller.config.yang.test.impl.NetconfTestImplModuleFactory;
+import org.opendaylight.controller.config.yang.test.impl.TestImplModuleFactory;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreException;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreService;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreServiceImpl;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreSnapshot;
import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
import org.opendaylight.controller.netconf.impl.NetconfServerSessionNegotiatorFactory;
import org.opendaylight.controller.netconf.impl.SessionIdProvider;
+import org.opendaylight.controller.netconf.impl.osgi.NetconfMonitoringServiceImpl;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
+import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceSnapshotImpl;
import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
+import org.w3c.dom.Element;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.concurrent.GlobalEventExecutor;
+public abstract class AbstractNetconfConfigTest extends AbstractConfigTest {
-public class AbstractNetconfConfigTest extends AbstractConfigTest {
+ public static final String LOOPBACK_ADDRESS = "127.0.0.1";
+ public static final int SERVER_CONNECTION_TIMEOUT_MILLIS = 5000;
+
+ static ModuleFactory[] FACTORIES = { new TestImplModuleFactory(),
+ new DepTestImplModuleFactory(), new NetconfTestImplModuleFactory(),
+ new IdentityTestModuleFactory(), new MultipleDependenciesModuleFactory() };
private EventLoopGroup nettyThreadgroup;
private HashedWheelTimer hashedWheelTimer;
+ private NetconfClientDispatcherImpl clientDispatcher;
+ private Channel serverTcpChannel;
+
+ private NetconfMessage getConfig;
+ private NetconfMessage get;
+
+ /**
+ * @Before in subclasses is called after this method.
+ */
@Before
- public void setUpAbstractNetconfConfigTest() {
+ public void setUpAbstractNetconfConfigTest() throws Exception {
+ super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(mockedContext, FACTORIES));
+
nettyThreadgroup = new NioEventLoopGroup();
hashedWheelTimer = new HashedWheelTimer();
+
+ loadMessages();
+
+ setUpTestInitial();
+
+ final NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
+ factoriesListener.onAddNetconfOperationServiceFactory(new NetconfOperationServiceFactoryImpl(getYangStore()));
+
+ for (final NetconfOperationServiceFactory netconfOperationServiceFactory : getAdditionalServiceFactories()) {
+ factoriesListener.onAddNetconfOperationServiceFactory(netconfOperationServiceFactory);
+ }
+
+ serverTcpChannel = startNetconfTcpServer(factoriesListener);
+ clientDispatcher = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
+ }
+
+ /**
+ * Called before setUp method is executed, so test classes can set up resources before setUpAbstractNetconfConfigTest method is called.
+ */
+ protected void setUpTestInitial() throws Exception {}
+
+ private void loadMessages() throws Exception {
+ this.getConfig = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
+ this.get = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/get.xml");
+ }
+
+ public NetconfMessage getGetConfig() {
+ return getConfig;
+ }
+
+ public NetconfMessage getGet() {
+ return get;
+ }
+
+ private Channel startNetconfTcpServer(final NetconfOperationServiceFactoryListenerImpl factoriesListener) throws Exception {
+ final NetconfServerDispatcher dispatch = createDispatcher(factoriesListener, getNetconfMonitoringService(), getNotificationProducer());
+
+ final ChannelFuture s;
+ if(getTcpServerAddress() instanceof LocalAddress) {
+ s = dispatch.createLocalServer(((LocalAddress) getTcpServerAddress()));
+ } else {
+ s = dispatch.createServer(((InetSocketAddress) getTcpServerAddress()));
+ }
+ s.await();
+ return s.channel();
+ }
+
+ protected DefaultCommitNotificationProducer getNotificationProducer() {
+ final DefaultCommitNotificationProducer notificationProducer = mock(DefaultCommitNotificationProducer.class);
+ doNothing().when(notificationProducer).close();
+ doNothing().when(notificationProducer).sendCommitNotification(anyString(), any(Element.class), anySetOf(String.class));
+ return notificationProducer;
+ }
+
+ protected Iterable<NetconfOperationServiceFactory> getAdditionalServiceFactories() {
+ return Collections.emptySet();
+ }
+
+ protected SessionMonitoringService getNetconfMonitoringService() throws Exception {
+ final NetconfOperationProvider netconfOperationProvider = mock(NetconfOperationProvider.class);
+ final NetconfOperationServiceSnapshotImpl snap = mock(NetconfOperationServiceSnapshotImpl.class);
+ doReturn(Collections.<NetconfOperationService>emptySet()).when(snap).getServices();
+ doReturn(snap).when(netconfOperationProvider).openSnapshot(anyString());
+ return new NetconfMonitoringServiceImpl(netconfOperationProvider);
+ }
+
+ protected abstract SocketAddress getTcpServerAddress();
+
+ public NetconfClientDispatcherImpl getClientDispatcher() {
+ return clientDispatcher;
+ }
+
+ private HardcodedYangStoreService getYangStore() throws YangStoreException, IOException {
+ final Collection<InputStream> yangDependencies = getBasicYangs();
+ return new HardcodedYangStoreService(yangDependencies);
+ }
+
+ static Collection<InputStream> getBasicYangs() throws IOException {
+
+ final List<String> paths = Arrays.asList(
+ "/META-INF/yang/config.yang",
+ "/META-INF/yang/rpc-context.yang",
+ "/META-INF/yang/config-test.yang",
+ "/META-INF/yang/config-test-impl.yang",
+ "/META-INF/yang/test-types.yang",
+ "/META-INF/yang/ietf-inet-types.yang");
+
+ final Collection<InputStream> yangDependencies = new ArrayList<>();
+ final List<String> failedToFind = new ArrayList<>();
+ for (final String path : paths) {
+ final InputStream resourceAsStream = NetconfITTest.class.getResourceAsStream(path);
+ if (resourceAsStream == null) {
+ failedToFind.add(path);
+ } else {
+ yangDependencies.add(resourceAsStream);
+ }
+ }
+ assertEquals("Some yang files were not found", Collections.<String>emptyList(), failedToFind);
+ return yangDependencies;
}
protected NetconfServerDispatcher createDispatcher(
- NetconfOperationServiceFactoryListenerImpl factoriesListener, SessionMonitoringService sessionMonitoringService,
- DefaultCommitNotificationProducer commitNotifier) {
- SessionIdProvider idProvider = new SessionIdProvider();
+ final NetconfOperationServiceFactoryListenerImpl factoriesListener, final SessionMonitoringService sessionMonitoringService,
+ final DefaultCommitNotificationProducer commitNotifier) {
+ final SessionIdProvider idProvider = new SessionIdProvider();
- NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
- hashedWheelTimer, factoriesListener, idProvider, 5000, commitNotifier, sessionMonitoringService);
+ final NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
+ hashedWheelTimer, factoriesListener, idProvider, SERVER_CONNECTION_TIMEOUT_MILLIS, commitNotifier, sessionMonitoringService);
- NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(
+ final NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(
serverNegotiatorFactory);
return new NetconfServerDispatcher(serverChannelInitializer, nettyThreadgroup, nettyThreadgroup);
}
return nettyThreadgroup;
}
+ /**
+ * @After in subclasses is be called before this.
+ */
@After
- public void cleanUpTimer() {
+ public void cleanUpNetconf() throws Exception {
+ serverTcpChannel.close().await();
hashedWheelTimer.stop();
- nettyThreadgroup.shutdownGracefully();
+ nettyThreadgroup.shutdownGracefully().await();
}
public NetconfClientConfiguration getClientConfiguration(final InetSocketAddress tcpAddress, final int timeout) {
final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
b.withAddress(tcpAddress);
b.withSessionListener(new SimpleNetconfClientSessionListener());
- b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
- timeout));
+ b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, timeout));
b.withConnectionTimeoutMillis(timeout);
return b.build();
}
+
+ public static final class HardcodedYangStoreService implements YangStoreService {
+
+ private final List<InputStream> byteArrayInputStreams;
+
+ public HardcodedYangStoreService(final Collection<? extends InputStream> inputStreams) throws YangStoreException, IOException {
+ byteArrayInputStreams = new ArrayList<>();
+ for (final InputStream inputStream : inputStreams) {
+ assertNotNull(inputStream);
+ final byte[] content = IOUtils.toByteArray(inputStream);
+ final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
+ byteArrayInputStreams.add(byteArrayInputStream);
+ }
+ }
+
+ @Override
+ public YangStoreSnapshot getYangStoreSnapshot() throws YangStoreException {
+ for (final InputStream inputStream : byteArrayInputStreams) {
+ try {
+ inputStream.reset();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ final YangParserImpl yangParser = new YangParserImpl();
+ final SchemaContext schemaContext = yangParser.resolveSchemaContext(new HashSet<>(yangParser.parseYangModelsFromStreamsMapped(byteArrayInputStreams).values()));
+ final YangStoreServiceImpl yangStoreService = new YangStoreServiceImpl(new SchemaContextProvider() {
+ @Override
+ public SchemaContext getSchemaContext() {
+ return schemaContext ;
+ }
+ });
+ return yangStoreService.getYangStoreSnapshot();
+ }
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.netconf.it;
-
-import static org.junit.Assert.assertNotNull;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.commons.io.IOUtils;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreException;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreService;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreServiceImpl;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreSnapshot;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
-import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-
-public class HardcodedYangStoreService implements YangStoreService {
-
- private final List<InputStream> byteArrayInputStreams;
-
- public HardcodedYangStoreService(
- Collection<? extends InputStream> inputStreams)
- throws YangStoreException, IOException {
- byteArrayInputStreams = new ArrayList<>();
- for (InputStream inputStream : inputStreams) {
- assertNotNull(inputStream);
- byte[] content = IOUtils.toByteArray(inputStream);
- ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(
- content);
- byteArrayInputStreams.add(byteArrayInputStream);
- }
- }
-
- @Override
- public YangStoreSnapshot getYangStoreSnapshot() throws YangStoreException {
- for (InputStream inputStream : byteArrayInputStreams) {
- try {
- inputStream.reset();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- YangParserImpl yangParser = new YangParserImpl();
- final SchemaContext schemaContext = yangParser.resolveSchemaContext(new HashSet<>(yangParser.parseYangModelsFromStreamsMapped(byteArrayInputStreams).values()));
- YangStoreServiceImpl yangStoreService = new YangStoreServiceImpl(new SchemaContextProvider() {
- @Override
- public SchemaContext getSchemaContext() {
- return schemaContext ;
- }
- });
- return yangStoreService.getYangStoreSnapshot();
- }
-}
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.opendaylight.controller.netconf.util.test.XmlUnitUtil.assertContainsElementWithName;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import io.netty.channel.ChannelFuture;
import java.io.IOException;
-import java.io.InputStream;
+import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
-import java.util.Collection;
+import java.net.SocketAddress;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.management.InstanceNotFoundException;
import javax.management.Notification;
import javax.management.NotificationListener;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
-import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreException;
import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
-import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
import org.opendaylight.controller.netconf.impl.osgi.NetconfMonitoringServiceImpl;
-import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceSnapshotImpl;
import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
import org.opendaylight.controller.netconf.mapping.api.Capability;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.opendaylight.controller.netconf.monitoring.osgi.NetconfMonitoringActivator;
import org.opendaylight.controller.netconf.monitoring.osgi.NetconfMonitoringOperationService;
import org.opendaylight.controller.netconf.persist.impl.ConfigPersisterNotificationHandler;
public class NetconfConfigPersisterITTest extends AbstractNetconfConfigTest {
- private static final InetSocketAddress tcpAddress = new InetSocketAddress("127.0.0.1", 12023);
+ public static final int PORT = 12026;
+ private static final InetSocketAddress TCP_ADDRESS = new InetSocketAddress(LOOPBACK_ADDRESS, PORT);
- private NetconfClientDispatcher clientDispatcher;
- private DefaultCommitNotificationProducer commitNotifier;
+ private NetconfMonitoringServiceImpl netconfMonitoringService;
- @Before
- public void setUp() throws Exception {
- super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(mockedContext,NetconfITTest.FACTORIES));
-
- NetconfMonitoringServiceImpl monitoringService = new NetconfMonitoringServiceImpl(getNetconfOperationProvider());
-
- NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
- factoriesListener.onAddNetconfOperationServiceFactory(new NetconfOperationServiceFactoryImpl(getYangStore()));
- factoriesListener
- .onAddNetconfOperationServiceFactory(new NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory(
- new NetconfMonitoringOperationService(monitoringService)));
-
-
- commitNotifier = new DefaultCommitNotificationProducer(platformMBeanServer);
- NetconfServerDispatcher dispatch = createDispatcher(factoriesListener, mockSessionMonitoringService(), commitNotifier);
- ChannelFuture s = dispatch.createServer(tcpAddress);
- s.await();
-
- clientDispatcher = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
+ @Override
+ protected void setUpTestInitial() {
+ netconfMonitoringService = new NetconfMonitoringServiceImpl(getNetconfOperationProvider());
}
- @After
- public void cleanUp(){
- commitNotifier.close();
+ @Override
+ protected SessionMonitoringService getNetconfMonitoringService() throws Exception {
+ return netconfMonitoringService;
}
- private HardcodedYangStoreService getYangStore() throws YangStoreException, IOException {
- final Collection<InputStream> yangDependencies = NetconfITTest.getBasicYangs();
- return new HardcodedYangStoreService(yangDependencies);
+ @Override
+ protected SocketAddress getTcpServerAddress() {
+ return TCP_ADDRESS;
}
-
- protected SessionMonitoringService mockSessionMonitoringService() {
- SessionMonitoringService mockedSessionMonitor = mock(SessionMonitoringService.class);
- doNothing().when(mockedSessionMonitor).onSessionUp(any(NetconfManagementSession.class));
- doNothing().when(mockedSessionMonitor).onSessionDown(any(NetconfManagementSession.class));
- return mockedSessionMonitor;
+ @Override
+ protected Iterable<NetconfOperationServiceFactory> getAdditionalServiceFactories() {
+ return Collections.<NetconfOperationServiceFactory>singletonList(new NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory(
+ new NetconfMonitoringOperationService(netconfMonitoringService)));
}
-
+ @Override
+ protected DefaultCommitNotificationProducer getNotificationProducer() {
+ return new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
+ }
@Test
public void testNetconfCommitNotifications() throws Exception {
+ final VerifyingNotificationListener notificationVerifier = createCommitNotificationListener();
+ final VerifyingPersister mockedAggregator = mockAggregator();
- VerifyingNotificationListener notificationVerifier = createCommitNotificationListener();
- VerifyingPersister mockedAggregator = mockAggregator();
-
- try (TestingNetconfClient persisterClient = new TestingNetconfClient("persister", clientDispatcher, getClientConfiguration(tcpAddress, 4000))) {
+ try (TestingNetconfClient persisterClient = new TestingNetconfClient("persister", getClientDispatcher(), getClientConfiguration(TCP_ADDRESS, 4000))) {
try (ConfigPersisterNotificationHandler configPersisterNotificationHandler = new ConfigPersisterNotificationHandler(
platformMBeanServer, mockedAggregator)) {
- try (TestingNetconfClient netconfClient = new TestingNetconfClient("client", clientDispatcher, getClientConfiguration(tcpAddress, 4000))) {
+ try (TestingNetconfClient netconfClient = new TestingNetconfClient("client", getClientDispatcher(), getClientConfiguration(TCP_ADDRESS, 4000))) {
NetconfMessage response = netconfClient.sendMessage(loadGetConfigMessage());
assertContainsElementWithName(response.getDocument(), "modules");
assertContainsElementWithName(response.getDocument(), "services");
}
private VerifyingNotificationListener createCommitNotificationListener() throws InstanceNotFoundException {
- VerifyingNotificationListener listener = new VerifyingNotificationListener();
+ final VerifyingNotificationListener listener = new VerifyingNotificationListener();
platformMBeanServer.addNotificationListener(DefaultCommitNotificationProducer.OBJECT_NAME, listener, null, null);
return listener;
}
public NetconfOperationProvider getNetconfOperationProvider() {
- NetconfOperationProvider factoriesListener = mock(NetconfOperationProvider.class);
- NetconfOperationServiceSnapshotImpl snap = mock(NetconfOperationServiceSnapshotImpl.class);
- NetconfOperationService service = mock(NetconfOperationService.class);
- Set<Capability> caps = Sets.newHashSet();
+ final NetconfOperationProvider factoriesListener = mock(NetconfOperationProvider.class);
+ final NetconfOperationServiceSnapshotImpl snap = mock(NetconfOperationServiceSnapshotImpl.class);
+ final NetconfOperationService service = mock(NetconfOperationService.class);
+ final Set<Capability> caps = Sets.newHashSet();
doReturn(caps).when(service).getCapabilities();
- Set<NetconfOperationService> services = Sets.newHashSet(service);
+ final Set<NetconfOperationService> services = Sets.newHashSet(service);
doReturn(services).when(snap).getServices();
doReturn(snap).when(factoriesListener).openSnapshot(anyString());
public List<Notification> notifications = Lists.newArrayList();
@Override
- public void handleNotification(Notification notification, Object handback) {
+ public void handleNotification(final Notification notification, final Object handback) {
this.notifications.add(notification);
}
- void assertNotificationCount(Object size) {
+ void assertNotificationCount(final Object size) {
assertEquals(size, notifications.size());
}
- void assertNotificationContent(int notificationIndex, int expectedModulesSize, int expectedServicesSize, int expectedCapsSize) {
- Notification notification = notifications.get(notificationIndex);
+ void assertNotificationContent(final int notificationIndex, final int expectedModulesSize, final int expectedServicesSize, final int expectedCapsSize) {
+ final Notification notification = notifications.get(notificationIndex);
assertEquals(CommitJMXNotification.class, notification.getClass());
- int capsSize = ((CommitJMXNotification) notification).getCapabilities().size();
+ final int capsSize = ((CommitJMXNotification) notification).getCapabilities().size();
assertEquals("Expected capabilities count", expectedCapsSize, capsSize);
- Element configSnapshot = ((CommitJMXNotification) notification).getConfigSnapshot();
- int modulesSize = configSnapshot.getElementsByTagName("module").getLength();
+ final Element configSnapshot = ((CommitJMXNotification) notification).getConfigSnapshot();
+ final int modulesSize = configSnapshot.getElementsByTagName("module").getLength();
assertEquals("Expected modules count", expectedModulesSize, modulesSize);
- int servicesSize = configSnapshot.getElementsByTagName("instance").getLength();
+ final int servicesSize = configSnapshot.getElementsByTagName("instance").getLength();
assertEquals("Expected services count", expectedServicesSize, servicesSize);
}
}
private Persister mockedPersister;
public VerifyingPersister() throws IOException {
- Persister mockedAggregator = mock(Persister.class);
+ final Persister mockedAggregator = mock(Persister.class);
doAnswer(new Answer<Object>() {
@Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- ConfigSnapshotHolder configSnapshot = (ConfigSnapshotHolder) invocation.getArguments()[0];
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ final ConfigSnapshotHolder configSnapshot = (ConfigSnapshotHolder) invocation.getArguments()[0];
snapshots.add(configSnapshot);
return null;
}
this.mockedPersister = mockedAggregator;
}
- void assertSnapshotCount(Object size) {
+ void assertSnapshotCount(final Object size) {
assertEquals(size, snapshots.size());
}
- void assertSnapshotContent(int notificationIndex, int expectedModulesSize, int expectedServicesSize, int expectedCapsSize)
+ void assertSnapshotContent(final int notificationIndex, final int expectedModulesSize, final int expectedServicesSize, final int expectedCapsSize)
throws SAXException, IOException {
- ConfigSnapshotHolder snapshot = snapshots.get(notificationIndex);
- int capsSize = snapshot.getCapabilities().size();
+ final ConfigSnapshotHolder snapshot = snapshots.get(notificationIndex);
+ final int capsSize = snapshot.getCapabilities().size();
assertEquals("Expected capabilities count", expectedCapsSize, capsSize);
- Document configSnapshot = readXmlToDocument(snapshot.getConfigSnapshot());
+ final Document configSnapshot = readXmlToDocument(snapshot.getConfigSnapshot());
assertElementsCount(configSnapshot, "module", expectedModulesSize);
assertElementsCount(configSnapshot, "instance", expectedServicesSize);
}
@Override
- public void persistConfig(ConfigSnapshotHolder configSnapshotHolder) throws IOException {
+ public void persistConfig(final ConfigSnapshotHolder configSnapshotHolder) throws IOException {
mockedPersister.persistConfig(configSnapshotHolder);
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.it;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.netconf.util.test.XmlUnitUtil.assertContainsElementWithText;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
+import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
+import org.opendaylight.controller.netconf.impl.osgi.NetconfMonitoringServiceImpl;
+import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceSnapshotImpl;
+import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
+import org.opendaylight.controller.netconf.mapping.api.Capability;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.controller.netconf.monitoring.osgi.NetconfMonitoringActivator;
+import org.opendaylight.controller.netconf.monitoring.osgi.NetconfMonitoringOperationService;
+import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.slf4j.Logger;
+import org.w3c.dom.Document;
+
+public class NetconfITMonitoringTest extends AbstractNetconfConfigTest {
+
+ public static final int PORT = 12025;
+ public static final InetSocketAddress TCP_ADDRESS = new InetSocketAddress(LOOPBACK_ADDRESS, PORT);
+ public static final TestingCapability TESTING_CAPABILITY = new TestingCapability();
+
+ private NetconfMonitoringServiceImpl netconfMonitoringService;
+
+ @Override
+ protected void setUpTestInitial() {
+ netconfMonitoringService = new NetconfMonitoringServiceImpl(getNetconfOperationProvider());
+ }
+
+ @Override
+ protected SessionMonitoringService getNetconfMonitoringService() throws Exception {
+ return netconfMonitoringService;
+ }
+
+ @Override
+ protected Iterable<NetconfOperationServiceFactory> getAdditionalServiceFactories() {
+ return Collections.<NetconfOperationServiceFactory>singletonList(new NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory(
+ new NetconfMonitoringOperationService(netconfMonitoringService)));
+ }
+
+ @Override
+ protected InetSocketAddress getTcpServerAddress() {
+ return TCP_ADDRESS;
+ }
+
+ static SessionMonitoringService getNetconfMonitoringListenerService(final Logger logger, final NetconfMonitoringServiceImpl monitor) {
+ return new SessionMonitoringService() {
+ @Override
+ public void onSessionUp(final NetconfManagementSession session) {
+ logger.debug("Management session up {}", session);
+ monitor.onSessionUp(session);
+ }
+
+ @Override
+ public void onSessionDown(final NetconfManagementSession session) {
+ logger.debug("Management session down {}", session);
+ monitor.onSessionDown(session);
+ }
+ };
+ }
+
+ @Test
+ public void testGetResponseFromMonitoring() throws Exception {
+ try (TestingNetconfClient netconfClient = new TestingNetconfClient("client-monitoring", getClientDispatcher(), getClientConfiguration(TCP_ADDRESS, 10000))) {
+ try (TestingNetconfClient netconfClient2 = new TestingNetconfClient("client-monitoring2", getClientDispatcher(), getClientConfiguration(TCP_ADDRESS, 10000))) {
+ Thread.sleep(500);
+ final NetconfMessage response = netconfClient2.sendMessage(getGet());
+ assertSessionElementsInResponse(response.getDocument(), 2);
+ }
+ Thread.sleep(500);
+ final NetconfMessage response = netconfClient.sendMessage(getGet());
+ assertSessionElementsInResponse(response.getDocument(), 1);
+ }
+ }
+
+
+ @Test(timeout = 13 * 10000)
+ public void testClientHelloWithAuth() throws Exception {
+ String fileName = "netconfMessages/client_hello_with_auth.xml";
+ final String hello = XmlFileLoader.fileToString(fileName);
+
+ fileName = "netconfMessages/get.xml";
+ final String get = XmlFileLoader.fileToString(fileName);
+
+ final Socket sock = new Socket(TCP_ADDRESS.getHostName(), TCP_ADDRESS.getPort());
+ sock.getOutputStream().write(hello.getBytes(Charsets.UTF_8));
+ final String separator = "]]>]]>";
+
+ sock.getOutputStream().write(separator.getBytes(Charsets.UTF_8));
+ sock.getOutputStream().write(get.getBytes(Charsets.UTF_8));
+ sock.getOutputStream().write(separator.getBytes(Charsets.UTF_8));
+
+ final StringBuilder responseBuilder = new StringBuilder();
+
+ try (InputStream inputStream = sock.getInputStream();
+ InputStreamReader reader = new InputStreamReader(inputStream);
+ BufferedReader buff = new BufferedReader(reader)) {
+ String line;
+ while ((line = buff.readLine()) != null) {
+
+ responseBuilder.append(line);
+ responseBuilder.append(System.lineSeparator());
+
+ if(line.contains("</rpc-reply>"))
+ break;
+ }
+ }
+
+ sock.close();
+
+ final String helloMsg = responseBuilder.substring(0, responseBuilder.indexOf(separator));
+ Document doc = XmlUtil.readXmlToDocument(helloMsg);
+ assertContainsElementWithText(doc, "urn:ietf:params:netconf:capability:candidate:1.0");
+
+ final String replyMsg = responseBuilder.substring(responseBuilder.indexOf(separator) + separator.length());
+ doc = XmlUtil.readXmlToDocument(replyMsg);
+ assertContainsElementWithText(doc, "tomas");
+ }
+
+ private void assertSessionElementsInResponse(final Document document, final int i) {
+ final int elementSize = document.getElementsByTagName("session-id").getLength();
+ assertEquals("Incorrect number of session-id tags in " + XmlUtil.toString(document), i, elementSize);
+ }
+
+ public static NetconfOperationProvider getNetconfOperationProvider() {
+ final NetconfOperationProvider factoriesListener = mock(NetconfOperationProvider.class);
+ final NetconfOperationServiceSnapshotImpl snap = mock(NetconfOperationServiceSnapshotImpl.class);
+ try {
+ doNothing().when(snap).close();
+ } catch (final Exception e) {
+ // not happening
+ throw new IllegalStateException(e);
+ }
+ final NetconfOperationService service = mock(NetconfOperationService.class);
+ final Set<Capability> caps = Sets.newHashSet();
+ caps.add(TESTING_CAPABILITY);
+
+ doReturn(caps).when(service).getCapabilities();
+ final Set<NetconfOperationService> services = Sets.newHashSet(service);
+ doReturn(services).when(snap).getServices();
+ doReturn(snap).when(factoriesListener).openSnapshot(anyString());
+
+ return factoriesListener;
+ }
+
+ private static class TestingCapability implements Capability {
+ @Override
+ public String getCapabilityUri() {
+ return "namespaceModuleRevision";
+ }
+
+ @Override
+ public Optional<String> getModuleNamespace() {
+ return Optional.of("namespace");
+ }
+
+ @Override
+ public Optional<String> getModuleName() {
+ return Optional.of("name");
+ }
+
+ @Override
+ public Optional<String> getRevision() {
+ return Optional.of("revision");
+ }
+
+ @Override
+ public Optional<String> getCapabilitySchema() {
+ return Optional.of("content");
+ }
+
+ @Override
+ public Optional<List<String>> getLocation() {
+ return Optional.absent();
+ }
+ }
+}
package org.opendaylight.controller.netconf.it;
-import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import com.google.common.collect.Lists;
+import io.netty.channel.local.LocalAddress;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
-import java.io.InputStream;
-import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-
-import junit.framework.Assert;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
-import org.opendaylight.controller.config.spi.ModuleFactory;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.auth.AuthProvider;
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreException;
-import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
-import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
-import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
import org.opendaylight.controller.netconf.ssh.NetconfSSHServer;
import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil;
-import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.protocol.framework.NeverReconnectStrategy;
-import com.google.common.collect.Lists;
-
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.GlobalEventExecutor;
-
public class NetconfITSecureTest extends AbstractNetconfConfigTest {
- private static final InetSocketAddress tlsAddress = new InetSocketAddress("127.0.0.1", 12024);
+ public static final int PORT = 12024;
+ private static final InetSocketAddress TLS_ADDRESS = new InetSocketAddress("127.0.0.1", PORT);
+
+ public static final String USERNAME = "user";
+ public static final String PASSWORD = "pwd";
- private DefaultCommitNotificationProducer commitNot;
private NetconfSSHServer sshServer;
- private NetconfMessage getConfig;
@Before
public void setUp() throws Exception {
- this.getConfig = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
-
- super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(mockedContext, getModuleFactories().toArray(
- new ModuleFactory[0])));
-
- final NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
- factoriesListener.onAddNetconfOperationServiceFactory(new NetconfOperationServiceFactoryImpl(getYangStore()));
-
- commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
-
-
- final NetconfServerDispatcher dispatchS = createDispatcher(factoriesListener);
- ChannelFuture s = dispatchS.createLocalServer(NetconfConfigUtil.getNetconfLocalAddress());
- s.await();
- EventLoopGroup bossGroup = new NioEventLoopGroup();
-
final char[] pem = PEMGenerator.generate().toCharArray();
- sshServer = NetconfSSHServer.start(tlsAddress.getPort(), NetconfConfigUtil.getNetconfLocalAddress(), bossGroup, pem);
+ sshServer = NetconfSSHServer.start(TLS_ADDRESS.getPort(), NetconfConfigUtil.getNetconfLocalAddress(), getNettyThreadgroup(), pem);
sshServer.setAuthProvider(getAuthProvider());
}
- private NetconfServerDispatcher createDispatcher(final NetconfOperationServiceFactoryListenerImpl factoriesListener) {
- return super.createDispatcher(factoriesListener, NetconfITTest.getNetconfMonitoringListenerService(), commitNot);
- }
-
@After
public void tearDown() throws Exception {
sshServer.close();
- commitNot.close();
sshServer.join();
}
- private HardcodedYangStoreService getYangStore() throws YangStoreException, IOException {
- final Collection<InputStream> yangDependencies = NetconfITTest.getBasicYangs();
- return new HardcodedYangStoreService(yangDependencies);
- }
-
- protected List<ModuleFactory> getModuleFactories() {
- return asList(NetconfITTest.FACTORIES);
- }
-
@Test
public void testSecure() throws Exception {
final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration())) {
- NetconfMessage response = netconfClient.sendMessage(getConfig);
- Assert.assertFalse("Unexpected error message " + XmlUtil.toString(response.getDocument()),
+ NetconfMessage response = netconfClient.sendMessage(getGetConfig());
+ assertFalse("Unexpected error message " + XmlUtil.toString(response.getDocument()),
NetconfMessageUtil.isErrorMessage(response));
final NetconfMessage gs = new NetconfMessage(XmlUtil.readXmlToDocument("<rpc message-id=\"2\"\n" +
"</rpc>\n"));
response = netconfClient.sendMessage(gs);
- Assert.assertFalse("Unexpected error message " + XmlUtil.toString(response.getDocument()),
+ assertFalse("Unexpected error message " + XmlUtil.toString(response.getDocument()),
NetconfMessageUtil.isErrorMessage(response));
}
}
final int requests = 1000;
for (int i = 0; i < requests; i++) {
- final Future<NetconfMessage> netconfMessageFuture = netconfClient.sendRequest(getConfig);
+ final Future<NetconfMessage> netconfMessageFuture = netconfClient.sendRequest(getGetConfig());
futures.add(netconfMessageFuture);
netconfMessageFuture.addListener(new GenericFutureListener<Future<? super NetconfMessage>>() {
@Override
// Give future listeners some time to finish counter incrementation
Thread.sleep(5000);
- org.junit.Assert.assertEquals(requests, responseCounter.get());
+ assertEquals(requests, responseCounter.get());
}
}
public NetconfClientConfiguration getClientConfiguration() throws IOException {
final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
- b.withAddress(tlsAddress);
+ b.withAddress(TLS_ADDRESS);
b.withSessionListener(new SimpleNetconfClientSessionListener());
b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
b.withProtocol(NetconfClientConfiguration.NetconfClientProtocol.SSH);
}
public AuthProvider getAuthProvider() throws Exception {
- AuthProvider mock = mock(AuthProvider.class);
- doReturn(true).when(mock).authenticated(anyString(), anyString());
- return mock;
+ final AuthProvider mockAuth = mock(AuthProvider.class);
+ doReturn("mockedAuth").when(mockAuth).toString();
+ doReturn(true).when(mockAuth).authenticated(anyString(), anyString());
+ return mockAuth;
}
public AuthenticationHandler getAuthHandler() throws IOException {
- return new LoginPassword("user", "pwd");
+ return new LoginPassword(USERNAME, PASSWORD);
+ }
+
+ @Override
+ protected LocalAddress getTcpServerAddress() {
+ return NetconfConfigUtil.getNetconfLocalAddress();
}
}
package org.opendaylight.controller.netconf.it;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import io.netty.channel.ChannelFuture;
import java.io.IOException;
-import java.io.InputStream;
-import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import javax.management.ObjectName;
import javax.xml.parsers.ParserConfigurationException;
-import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
-import org.junit.matchers.JUnitMatchers;
import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
-import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
-import org.opendaylight.controller.config.spi.ModuleFactory;
import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
import org.opendaylight.controller.config.yang.test.impl.DepTestImplModuleFactory;
-import org.opendaylight.controller.config.yang.test.impl.IdentityTestModuleFactory;
import org.opendaylight.controller.config.yang.test.impl.MultipleDependenciesModuleFactory;
import org.opendaylight.controller.config.yang.test.impl.MultipleDependenciesModuleMXBean;
import org.opendaylight.controller.config.yang.test.impl.NetconfTestImplModuleFactory;
import org.opendaylight.controller.config.yang.test.impl.NetconfTestImplModuleMXBean;
-import org.opendaylight.controller.config.yang.test.impl.TestImplModuleFactory;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreException;
-import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
-import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
-import org.opendaylight.controller.netconf.impl.osgi.NetconfMonitoringServiceImpl;
-import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
-import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceSnapshotImpl;
-import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider;
-import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
public class NetconfITTest extends AbstractNetconfConfigTest {
- // TODO refactor, pull common code up to AbstractNetconfITTest
+ public static final int PORT = 12023;
+ public static final InetSocketAddress TCP_ADDRESS = new InetSocketAddress(LOOPBACK_ADDRESS, PORT);
- private static final InetSocketAddress tcpAddress = new InetSocketAddress("127.0.0.1", 12023);
-
-
- private NetconfMessage getConfig, getConfigCandidate, editConfig, closeSession;
- private DefaultCommitNotificationProducer commitNotificationProducer;
- private NetconfServerDispatcher dispatch;
-
- private NetconfClientDispatcherImpl clientDispatcher;
-
- static ModuleFactory[] FACTORIES = {new TestImplModuleFactory(), new DepTestImplModuleFactory(),
- new NetconfTestImplModuleFactory(), new IdentityTestModuleFactory(),
- new MultipleDependenciesModuleFactory()};
+ private NetconfMessage getConfigCandidate, editConfig, closeSession;
+ private NetconfClientDispatcher clientDispatcher;
@Before
public void setUp() throws Exception {
- initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(mockedContext,
- FACTORIES
- ));
-
loadMessages();
-
- NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
- factoriesListener.onAddNetconfOperationServiceFactory(new NetconfOperationServiceFactoryImpl(getYangStore()));
-
- commitNotificationProducer = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
-
- dispatch = createDispatcher(factoriesListener);
- ChannelFuture s = dispatch.createServer(tcpAddress);
- s.await();
-
- clientDispatcher = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
- }
-
- private NetconfServerDispatcher createDispatcher(NetconfOperationServiceFactoryListenerImpl factoriesListener) {
- return super.createDispatcher(factoriesListener, getNetconfMonitoringListenerService(), commitNotificationProducer);
+ clientDispatcher = getClientDispatcher();
}
- static NetconfMonitoringServiceImpl getNetconfMonitoringListenerService() {
- NetconfOperationProvider netconfOperationProvider = mock(NetconfOperationProvider.class);
- NetconfOperationServiceSnapshotImpl snap = mock(NetconfOperationServiceSnapshotImpl.class);
- doReturn(Collections.<NetconfOperationService>emptySet()).when(snap).getServices();
- doReturn(snap).when(netconfOperationProvider).openSnapshot(anyString());
- return new NetconfMonitoringServiceImpl(netconfOperationProvider);
- }
-
- @After
- public void tearDown() throws Exception {
- commitNotificationProducer.close();
- clientDispatcher.close();
+ @Override
+ protected InetSocketAddress getTcpServerAddress() {
+ return TCP_ADDRESS;
}
private void loadMessages() throws IOException, SAXException, ParserConfigurationException {
this.editConfig = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/edit_config.xml");
- this.getConfig = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
this.getConfigCandidate = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/getConfig_candidate.xml");
this.closeSession = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/closeSession.xml");
}
- private HardcodedYangStoreService getYangStore() throws YangStoreException, IOException {
- final Collection<InputStream> yangDependencies = getBasicYangs();
- return new HardcodedYangStoreService(yangDependencies);
- }
-
- static Collection<InputStream> getBasicYangs() throws IOException {
-
- List<String> paths = Arrays.asList("/META-INF/yang/config.yang", "/META-INF/yang/rpc-context.yang",
- "/META-INF/yang/config-test.yang", "/META-INF/yang/config-test-impl.yang", "/META-INF/yang/test-types.yang",
- "/META-INF/yang/ietf-inet-types.yang");
- final Collection<InputStream> yangDependencies = new ArrayList<>();
- List<String> failedToFind = new ArrayList<>();
- for (String path : paths) {
- InputStream resourceAsStream = NetconfITTest.class.getResourceAsStream(path);
- if (resourceAsStream == null) {
- failedToFind.add(path);
- } else {
- yangDependencies.add(resourceAsStream);
- }
- }
- assertEquals("Some yang files were not found", Collections.<String>emptyList(), failedToFind);
- return yangDependencies;
- }
-
-
@Test
public void testNetconfClientDemonstration() throws Exception {
- try (TestingNetconfClient netconfClient = new TestingNetconfClient("client", clientDispatcher, getClientConfiguration(tcpAddress, 4000))) {
+ try (TestingNetconfClient netconfClient = new TestingNetconfClient("client", clientDispatcher, getClientConfiguration(TCP_ADDRESS, 4000))) {
- Set<String> capabilitiesFromNetconfServer = netconfClient.getCapabilities();
- long sessionId = netconfClient.getSessionId();
+ final Set<String> capabilitiesFromNetconfServer = netconfClient.getCapabilities();
+ final long sessionId = netconfClient.getSessionId();
// NetconfMessage can be created :
// new NetconfMessage(XmlUtil.readXmlToDocument("<xml/>"));
- NetconfMessage response = netconfClient.sendMessage(getConfig);
+ final NetconfMessage response = netconfClient.sendMessage(getGetConfig());
response.getDocument();
}
}
@Test
public void testTwoSessions() throws Exception {
- try (TestingNetconfClient netconfClient = new TestingNetconfClient("1", clientDispatcher, getClientConfiguration(tcpAddress, 10000))) {
- try (TestingNetconfClient netconfClient2 = new TestingNetconfClient("2", clientDispatcher, getClientConfiguration(tcpAddress, 10000))) {
+ try (TestingNetconfClient netconfClient = new TestingNetconfClient("1", clientDispatcher, getClientConfiguration(TCP_ADDRESS, 10000))) {
+ try (TestingNetconfClient netconfClient2 = new TestingNetconfClient("2", clientDispatcher, getClientConfiguration(TCP_ADDRESS, 10000))) {
assertNotNull(netconfClient2.getCapabilities());
}
}
}
- @Ignore
- @Test
- public void waitingTest() throws Exception {
- final ConfigTransactionJMXClient transaction = this.configRegistryClient.createTransaction();
- transaction.createModule(DepTestImplModuleFactory.NAME, "eb");
- transaction.commit();
- Thread.currentThread().suspend();
- }
-
@Test
public void rpcReplyContainsAllAttributesTest() throws Exception {
- try (TestingNetconfClient netconfClient = createSession(tcpAddress, "1")) {
- final String rpc = "<rpc message-id=\"5\" a=\"a\" b=\"44\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">"
- + "<get/>" + "</rpc>";
+ try (TestingNetconfClient netconfClient = createSession(TCP_ADDRESS, "1")) {
+ final String rpc = "<rpc message-id=\"5\" a=\"a\" b=\"44\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"><get/>" + "</rpc>";
final Document doc = XmlUtil.readXmlToDocument(rpc);
final NetconfMessage message = netconfClient.sendMessage(new NetconfMessage(doc));
assertNotNull(message);
@Test
public void rpcReplyErrorContainsAllAttributesTest() throws Exception {
- try (TestingNetconfClient netconfClient = createSession(tcpAddress, "1")) {
- final String rpc = "<rpc message-id=\"1\" a=\"adada\" b=\"4\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">"
- + "<commit/>" + "</rpc>";
+ try (TestingNetconfClient netconfClient = createSession(TCP_ADDRESS, "1")) {
+ final String rpc = "<rpc message-id=\"1\" a=\"adada\" b=\"4\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"><commit/>" + "</rpc>";
final Document doc = XmlUtil.readXmlToDocument(rpc);
final NetconfMessage message = netconfClient.sendMessage(new NetconfMessage(doc));
final NamedNodeMap expectedAttributes = doc.getDocumentElement().getAttributes();
@Test
public void rpcOutputContainsCorrectNamespace() throws Exception {
final ConfigTransactionJMXClient transaction = this.configRegistryClient.createTransaction();
- ObjectName dep = transaction.createModule(DepTestImplModuleFactory.NAME, "instanceD");
- ObjectName impl = transaction.createModule(NetconfTestImplModuleFactory.NAME, "instance");
- NetconfTestImplModuleMXBean proxy = configRegistryClient
+ final ObjectName dep = transaction.createModule(DepTestImplModuleFactory.NAME, "instanceD");
+ final ObjectName impl = transaction.createModule(NetconfTestImplModuleFactory.NAME, "instance");
+ final NetconfTestImplModuleMXBean proxy = configRegistryClient
.newMXBeanProxy(impl, NetconfTestImplModuleMXBean.class);
proxy.setTestingDep(dep);
proxy.setSimpleShort((short) 0);
transaction.commit();
- try (TestingNetconfClient netconfClient = createSession(tcpAddress, "1")) {
+ try (TestingNetconfClient netconfClient = createSession(TCP_ADDRESS, "1")) {
final String expectedNamespace = "urn:opendaylight:params:xml:ns:yang:controller:test:impl";
final String rpc = "<rpc message-id=\"5\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">"
@Test
public void testCloseSession() throws Exception {
- try (TestingNetconfClient netconfClient = createSession(tcpAddress, "1")) {
+ try (TestingNetconfClient netconfClient = createSession(TCP_ADDRESS, "1")) {
// edit config
Document rpcReply = netconfClient.sendMessage(this.editConfig)
@Test
public void testEditConfig() throws Exception {
- try (TestingNetconfClient netconfClient = createSession(tcpAddress, "1")) {
+ try (TestingNetconfClient netconfClient = createSession(TCP_ADDRESS, "1")) {
// send edit_config.xml
final Document rpcReply = netconfClient.sendMessage(this.editConfig).getDocument();
assertIsOK(rpcReply);
@Test
public void testValidate() throws Exception {
- try (TestingNetconfClient netconfClient = createSession(tcpAddress, "1")) {
+ try (TestingNetconfClient netconfClient = createSession(TCP_ADDRESS, "1")) {
// begin transaction
Document rpcReply = netconfClient.sendMessage(getConfigCandidate).getDocument();
assertEquals("data", XmlElement.fromDomDocument(rpcReply).getOnlyChildElement().getName());
}
private Document assertGetConfigWorks(final TestingNetconfClient netconfClient) throws InterruptedException, ExecutionException, TimeoutException, NetconfDocumentedException {
- return assertGetConfigWorks(netconfClient, this.getConfig);
+ return assertGetConfigWorks(netconfClient, getGetConfig());
}
private Document assertGetConfigWorks(final TestingNetconfClient netconfClient, final NetconfMessage getConfigMessage)
@Test
public void testGetConfig() throws Exception {
- try (TestingNetconfClient netconfClient = createSession(tcpAddress, "1")) {
+ try (TestingNetconfClient netconfClient = createSession(TCP_ADDRESS, "1")) {
assertGetConfigWorks(netconfClient);
}
}
@Test
public void createYangTestBasedOnYuma() throws Exception {
- try (TestingNetconfClient netconfClient = createSession(tcpAddress, "1")) {
+ try (TestingNetconfClient netconfClient = createSession(TCP_ADDRESS, "1")) {
Document rpcReply = netconfClient.sendMessage(
XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/editConfig_merge_yang-test.xml"))
.getDocument();
final ObjectName on = new ObjectName(
"org.opendaylight.controller:instanceName=impl-dep-instance,type=Module,moduleFactoryName=impl-dep");
- Set<ObjectName> cfgBeans = configRegistryClient.lookupConfigBeans();
+ final Set<ObjectName> cfgBeans = configRegistryClient.lookupConfigBeans();
assertEquals(cfgBeans, Sets.newHashSet(on));
}
}
@Test
public void testIdRef() throws Exception {
- NetconfMessage editId = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/editConfig_identities.xml");
- NetconfMessage commit = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/commit.xml");
+ final NetconfMessage editId = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/editConfig_identities.xml");
+ final NetconfMessage commit = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/commit.xml");
- try (TestingNetconfClient netconfClient = createSession(tcpAddress, "1")) {
+ try (TestingNetconfClient netconfClient = createSession(TCP_ADDRESS, "1")) {
assertIsOK(netconfClient.sendMessage(editId).getDocument());
assertIsOK(netconfClient.sendMessage(commit).getDocument());
- NetconfMessage response = netconfClient.sendMessage(getConfig);
+ final NetconfMessage response = netconfClient.sendMessage(getGetConfig());
- assertThat(XmlUtil.toString(response.getDocument()), JUnitMatchers.containsString("<afi xmlns:prefix=\"urn:opendaylight:params:xml:ns:yang:controller:config:test:types\">prefix:test-identity1</afi>"));
- assertThat(XmlUtil.toString(response.getDocument()), JUnitMatchers.containsString("<afi xmlns:prefix=\"urn:opendaylight:params:xml:ns:yang:controller:config:test:types\">prefix:test-identity2</afi>"));
- assertThat(XmlUtil.toString(response.getDocument()), JUnitMatchers.containsString("<safi xmlns:prefix=\"urn:opendaylight:params:xml:ns:yang:controller:config:test:types\">prefix:test-identity2</safi>"));
- assertThat(XmlUtil.toString(response.getDocument()), JUnitMatchers.containsString("<safi xmlns:prefix=\"urn:opendaylight:params:xml:ns:yang:controller:config:test:types\">prefix:test-identity1</safi>"));
+ assertThat(XmlUtil.toString(response.getDocument()), containsString("<afi xmlns:prefix=\"urn:opendaylight:params:xml:ns:yang:controller:config:test:types\">prefix:test-identity1</afi>"));
+ assertThat(XmlUtil.toString(response.getDocument()), containsString("<afi xmlns:prefix=\"urn:opendaylight:params:xml:ns:yang:controller:config:test:types\">prefix:test-identity2</afi>"));
+ assertThat(XmlUtil.toString(response.getDocument()), containsString("<safi xmlns:prefix=\"urn:opendaylight:params:xml:ns:yang:controller:config:test:types\">prefix:test-identity2</safi>"));
+ assertThat(XmlUtil.toString(response.getDocument()), containsString("<safi xmlns:prefix=\"urn:opendaylight:params:xml:ns:yang:controller:config:test:types\">prefix:test-identity1</safi>"));
- } catch (Exception e) {
+ } catch (final Exception e) {
fail(Throwables.getStackTraceAsString(e));
}
}
return ret;
}
-
@Test
public void testMultipleDependencies() throws Exception {
// push first xml, should add parent and d1,d2 dependencies
- try (TestingNetconfClient netconfClient = createSession(tcpAddress, "1")) {
- Document rpcReply = netconfClient.sendMessage(
+ try (TestingNetconfClient netconfClient = createSession(TCP_ADDRESS, "1")) {
+ final Document rpcReply = netconfClient.sendMessage(
XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/editConfig_merge_multiple-deps1.xml"))
.getDocument();
assertIsOK(rpcReply);
commit(netconfClient);
}
// verify that parent.getTestingDeps == d1,d2
- MultipleDependenciesModuleMXBean parentProxy = configRegistryClient.newMXBeanProxy(
+ final MultipleDependenciesModuleMXBean parentProxy = configRegistryClient.newMXBeanProxy(
configRegistryClient.lookupConfigBean(MultipleDependenciesModuleFactory.NAME, "parent"),
MultipleDependenciesModuleMXBean.class);
{
- List<ObjectName> testingDeps = parentProxy.getTestingDeps();
+ final List<ObjectName> testingDeps = parentProxy.getTestingDeps();
assertEquals(2, testingDeps.size());
- Set<String> actualRefs = getServiceReferences(testingDeps);
+ final Set<String> actualRefs = getServiceReferences(testingDeps);
assertEquals(Sets.newHashSet("ref_d1", "ref_d2"), actualRefs);
}
mergeD3(parentProxy);
}
- public void mergeD3(MultipleDependenciesModuleMXBean parentProxy) throws Exception {
+ public void mergeD3(final MultipleDependenciesModuleMXBean parentProxy) throws Exception {
try (TestingNetconfClient netconfClient = new TestingNetconfClient(
- "test " + tcpAddress.toString(), clientDispatcher, getClientConfiguration(tcpAddress, 5000))) {
+ "test " + TCP_ADDRESS.toString(), clientDispatcher, getClientConfiguration(TCP_ADDRESS, 5000))) {
- Document rpcReply = netconfClient.sendMessage(
+ final Document rpcReply = netconfClient.sendMessage(
XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/editConfig_merge_multiple-deps2.xml"))
.getDocument();
assertIsOK(rpcReply);
commit(netconfClient);
}
{
- List<ObjectName> testingDeps = parentProxy.getTestingDeps();
+ final List<ObjectName> testingDeps = parentProxy.getTestingDeps();
assertEquals(3, testingDeps.size());
- Set<String> actualRefs = getServiceReferences(testingDeps);
+ final Set<String> actualRefs = getServiceReferences(testingDeps);
assertEquals(Sets.newHashSet("ref_d1", "ref_d2", "ref_d3"), actualRefs);
}
}
- public Set<String> getServiceReferences(List<ObjectName> testingDeps) {
+ public Set<String> getServiceReferences(final List<ObjectName> testingDeps) {
return new HashSet<>(Lists.transform(testingDeps, new Function<ObjectName, String>() {
@Override
- public String apply(ObjectName input) {
+ public String apply(final ObjectName input) {
return ObjectNameUtil.getReferenceName(input);
}
}));
}
- public void commit(TestingNetconfClient netconfClient) throws Exception {
- Document rpcReply;
+ public void commit(final TestingNetconfClient netconfClient) throws Exception {
+ final Document rpcReply;
rpcReply = netconfClient.sendMessage(XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/commit.xml"))
.getDocument();
assertIsOK(rpcReply);
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.netconf.it;
-
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.opendaylight.controller.netconf.util.test.XmlUnitUtil.assertContainsElementWithText;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import io.netty.channel.ChannelFuture;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import junit.framework.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
-import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreException;
-import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
-import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
-import org.opendaylight.controller.netconf.impl.osgi.NetconfMonitoringServiceImpl;
-import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
-import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceSnapshotImpl;
-import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
-import org.opendaylight.controller.netconf.mapping.api.Capability;
-import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider;
-import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
-import org.opendaylight.controller.netconf.monitoring.osgi.NetconfMonitoringActivator;
-import org.opendaylight.controller.netconf.monitoring.osgi.NetconfMonitoringOperationService;
-import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
-
-public class NetconfMonitoringITTest extends AbstractNetconfConfigTest {
-
- private static final Logger logger = LoggerFactory.getLogger(NetconfITTest.class);
-
- private static final InetSocketAddress tcpAddress = new InetSocketAddress("127.0.0.1", 12023);
-
- @Mock
- private DefaultCommitNotificationProducer commitNot;
- private NetconfServerDispatcher dispatch;
-
- private NetconfClientDispatcherImpl clientDispatcher;
-
- private NetconfMonitoringServiceImpl monitoringService;
-
- @Before
- public void setUp() throws Exception {
- super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(mockedContext, NetconfITTest.FACTORIES));
-
- monitoringService = new NetconfMonitoringServiceImpl(getNetconfOperationProvider());
-
- NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
- factoriesListener.onAddNetconfOperationServiceFactory(new NetconfOperationServiceFactoryImpl(getYangStore()));
- factoriesListener
- .onAddNetconfOperationServiceFactory(new NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory(
- new NetconfMonitoringOperationService(monitoringService)));
-
-
- dispatch = createDispatcher(factoriesListener);
- ChannelFuture s = dispatch.createServer(tcpAddress);
- s.await();
-
- clientDispatcher = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
- }
-
- private HardcodedYangStoreService getYangStore() throws YangStoreException, IOException {
- final Collection<InputStream> yangDependencies = NetconfITTest.getBasicYangs();
- return new HardcodedYangStoreService(yangDependencies);
- }
-
- private NetconfServerDispatcher createDispatcher(
- NetconfOperationServiceFactoryListenerImpl factoriesListener) {
- return super.createDispatcher(factoriesListener, getNetconfMonitoringListenerService(logger, monitoringService), commitNot);
- }
-
- static SessionMonitoringService getNetconfMonitoringListenerService(final Logger logger, final NetconfMonitoringServiceImpl monitor) {
- return new SessionMonitoringService() {
- @Override
- public void onSessionUp(NetconfManagementSession session) {
- logger.debug("Management session up {}", session);
- monitor.onSessionUp(session);
- }
-
- @Override
- public void onSessionDown(NetconfManagementSession session) {
- logger.debug("Management session down {}", session);
- monitor.onSessionDown(session);
- }
- };
- }
-
-
- @Test
- public void testGetResponseFromMonitoring() throws Exception {
- try (TestingNetconfClient netconfClient = new TestingNetconfClient("client-monitoring", clientDispatcher, getClientConfiguration(tcpAddress, 4000))) {
- try (TestingNetconfClient netconfClient2 = new TestingNetconfClient("client-monitoring2", clientDispatcher, getClientConfiguration(tcpAddress, 4000))) {
- NetconfMessage response = netconfClient.sendMessage(loadGetMessage());
- assertSessionElementsInResponse(response.getDocument(), 2);
- }
- NetconfMessage response = netconfClient.sendMessage(loadGetMessage());
- assertSessionElementsInResponse(response.getDocument(), 1);
- }
- }
-
-
- @Test(timeout = 13 * 10000)
- public void testClientHelloWithAuth() throws Exception {
- String fileName = "netconfMessages/client_hello_with_auth.xml";
- String hello = XmlFileLoader.fileToString(fileName);
-
- fileName = "netconfMessages/get.xml";
- String get = XmlFileLoader.fileToString(fileName);
-
- Socket sock = new Socket(tcpAddress.getHostName(), tcpAddress.getPort());
- sock.getOutputStream().write(hello.getBytes(Charsets.UTF_8));
- String separator = "]]>]]>";
-
- sock.getOutputStream().write(separator.getBytes(Charsets.UTF_8));
- sock.getOutputStream().write(get.getBytes(Charsets.UTF_8));
- sock.getOutputStream().write(separator.getBytes(Charsets.UTF_8));
-
- StringBuilder responseBuilder = new StringBuilder();
-
- try (InputStream inputStream = sock.getInputStream();
- InputStreamReader reader = new InputStreamReader(inputStream);
- BufferedReader buff = new BufferedReader(reader)) {
- String line;
- while ((line = buff.readLine()) != null) {
-
- responseBuilder.append(line);
- responseBuilder.append(System.lineSeparator());
-
- if(line.contains("</rpc-reply>"))
- break;
- }
- }
-
- sock.close();
-
- String helloMsg = responseBuilder.substring(0, responseBuilder.indexOf(separator));
- Document doc = XmlUtil.readXmlToDocument(helloMsg);
- assertContainsElementWithText(doc, "urn:ietf:params:netconf:capability:candidate:1.0");
-
- String replyMsg = responseBuilder.substring(responseBuilder.indexOf(separator) + separator.length());
- doc = XmlUtil.readXmlToDocument(replyMsg);
- assertContainsElementWithText(doc, "tomas");
- }
-
- private void assertSessionElementsInResponse(Document document, int i) {
- int elementSize = document.getElementsByTagName("session-id").getLength();
- Assert.assertEquals("Incorrect number of session-id tags in " + XmlUtil.toString(document),i, elementSize);
- }
-
- private NetconfMessage loadGetMessage() throws Exception {
- return XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/get.xml");
- }
-
- public static NetconfOperationProvider getNetconfOperationProvider() throws Exception {
- NetconfOperationProvider factoriesListener = mock(NetconfOperationProvider.class);
- NetconfOperationServiceSnapshotImpl snap = mock(NetconfOperationServiceSnapshotImpl.class);
- doNothing().when(snap).close();
- NetconfOperationService service = mock(NetconfOperationService.class);
- Set<Capability> caps = Sets.newHashSet();
- caps.add(new Capability() {
- @Override
- public String getCapabilityUri() {
- return "namespaceModuleRevision";
- }
-
- @Override
- public Optional<String> getModuleNamespace() {
- return Optional.of("namespace");
- }
-
- @Override
- public Optional<String> getModuleName() {
- return Optional.of("name");
- }
-
- @Override
- public Optional<String> getRevision() {
- return Optional.of("revision");
- }
-
- @Override
- public Optional<String> getCapabilitySchema() {
- return Optional.of("content");
- }
-
- @Override
- public Optional<List<String>> getLocation() {
- return Optional.absent();
- }
- });
-
- doReturn(caps).when(service).getCapabilities();
- Set<NetconfOperationService> services = Sets.newHashSet(service);
- doReturn(services).when(snap).getServices();
- doReturn(snap).when(factoriesListener).openSnapshot(anyString());
-
- return factoriesListener;
- }
-
-
-}
public final class SSLUtil {
- private SSLUtil() {
- }
+ private SSLUtil() {}
public static SSLContext initializeSecureContext(final String pass, final InputStream ksKeysFile, final InputStream ksTrustFile,
final String algorithm) throws KeyStoreException, NoSuchAlgorithmException, CertificateException, IOException,
package org.opendaylight.controller.netconf.test.tool;
-import java.io.File;
-import java.io.IOException;
-import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider;
-import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
+import org.opendaylight.controller.netconf.auth.AuthProvider;
class AcceptingAuthProvider implements AuthProvider {
- private final String privateKeyPEMString;
-
- public AcceptingAuthProvider() {
- try {
- this.privateKeyPEMString = PEMGenerator.readOrGeneratePK(new File("PK"));
- } catch (final IOException e) {
- throw new RuntimeException(e);
- }
- }
@Override
public synchronized boolean authenticated(final String username, final String password) {
return true;
}
- @Override
- public char[] getPEMAsCharArray() {
- return privateKeyPEMString.toCharArray();
- }
}
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceSnapshot;
import org.opendaylight.controller.netconf.monitoring.osgi.NetconfMonitoringOperationService;
import org.opendaylight.controller.netconf.ssh.NetconfSSHServer;
+import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
server = dispatcher.createLocalServer(tcpLocalAddress);
try {
- NetconfSSHServer.start(currentPort, tcpLocalAddress, new AcceptingAuthProvider(), nettyThreadgroup);
+ final NetconfSSHServer sshServer = NetconfSSHServer.start(currentPort, tcpLocalAddress, nettyThreadgroup, getPemArray());
+ sshServer.setAuthProvider(new AcceptingAuthProvider());
} catch (final Exception e) {
LOG.warn("Cannot start simulated device on {}, skipping", address, e);
// Close local server and continue
return openDevices;
}
+ private char[] getPemArray() {
+ try {
+ return PEMGenerator.readOrGeneratePK(new File("PK")).toCharArray();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private Map<ModuleBuilder, String> parseSchemasToModuleBuilders(final Main.Params params) {
final SharedSchemaRepository consumer = new SharedSchemaRepository("netconf-simulator");
consumer.registerSchemaSourceListener(TextToASTTransformer.create(consumer, consumer));