<arphandler.version>0.5.2-SNAPSHOT</arphandler.version>
<forwarding.staticrouting>0.5.2-SNAPSHOT</forwarding.staticrouting>
<samples.loadbalancer>0.5.2-SNAPSHOT</samples.loadbalancer>
- <config.version>0.2.4-SNAPSHOT</config.version>
- <netconf.version>0.2.4-SNAPSHOT</netconf.version>
+ <config.version>0.2.5-SNAPSHOT</config.version>
+ <netconf.version>0.2.5-SNAPSHOT</netconf.version>
<mdsal.version>1.1-SNAPSHOT</mdsal.version>
<containermanager.version>0.5.2-SNAPSHOT</containermanager.version>
<containermanager.it.version>0.5.2-SNAPSHOT</containermanager.it.version>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<artifactId>config-api</artifactId>
* transaction was committed after creating this transaction. Clients can create
* new transaction and merge the changes.
*/
-public class ConflictingVersionException extends RuntimeException {
+public class ConflictingVersionException extends Exception {
private static final long serialVersionUID = 1L;
public ConflictingVersionException() {
/**
* This exception is not intended to be used while implementing modules,
- * itaggregates validation exceptions and sends them back to the user.
+ * it aggregates validation exceptions and sends them back to the user.
+ * Use {@link org.opendaylight.controller.config.api.JmxAttributeValidationException} for
+ * validating modules instead.
*/
-public class ValidationException extends RuntimeException {
+public class ValidationException extends Exception {
private static final long serialVersionUID = -6072893219820274247L;
private final Map<String/* module name */, Map<String/* instance name */, ExceptionMessageWithStackTrace>> failedValidations;
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>config-manager</artifactId>
try {
validate();
} catch (Exception e) {
- throw ValidationException.createForSingleException(
- moduleIdentifier, e);
+
+ throw new MBeanException(ValidationException.createForSingleException(
+ moduleIdentifier, e));
}
return Void.TYPE;
}
package org.opendaylight.controller.config.manager.impl.osgi;
import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.api.jmx.CommitStatus;
import org.opendaylight.controller.config.manager.impl.ConfigRegistryImpl;
import org.opendaylight.controller.config.spi.ModuleFactory;
synchronized void blankTransaction() {
// race condition check: config-persister might push new configuration while server is starting up.
ConflictingVersionException lastException = null;
- for (int i = 0; i < 10; i++) {
+ int maxAttempts = 10;
+ for (int i = 0; i < maxAttempts; i++) {
try {
// create transaction
boolean blankTransaction = true;
Thread.currentThread().interrupt();
throw new IllegalStateException(interruptedException);
}
+ } catch (ValidationException e) {
+ logger.error("Validation exception while running blank transaction indicates programming error", e);
+ throw new RuntimeException("Validation exception while running blank transaction indicates programming error", e);
}
}
- throw lastException;
+ throw new RuntimeException("Maximal number of attempts reached and still cannot get optimistic lock from " +
+ "config manager",lastException);
}
@Override
import javax.management.MBeanException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
-import javax.management.RuntimeMBeanException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
private void testValidation(ConfigTransactionClient transaction)
throws InstanceAlreadyExistsException, ReflectionException,
- InstanceNotFoundException, MBeanException {
+ InstanceNotFoundException, MBeanException, ConflictingVersionException {
ObjectName fixed1names = transaction.createModule(
TestingFixedThreadPoolModuleFactory.NAME, fixed1);
// call validate on config bean
platformMBeanServer.invoke(fixed1names, "validate", new Object[0],
new String[0]);
fail();
- } catch (RuntimeMBeanException e) {
- RuntimeException targetException = e.getTargetException();
+ } catch (MBeanException e) {
+ Exception targetException = e.getTargetException();
assertNotNull(targetException);
assertEquals(ValidationException.class, targetException.getClass());
}
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-subsystem</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<artifactId>config-module-archetype</artifactId>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>config-persister-api</artifactId>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>config-persister-directory-adapter</artifactId>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>config-persister-directory-autodetect-adapter</artifactId>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>config-persister-directory-xml-adapter</artifactId>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>config-persister-file-adapter</artifactId>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>config-persister-file-xml-adapter</artifactId>
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-subsystem</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>config-plugin-parent</artifactId>
<properties>
<jmxGeneratorPath>${project.build.directory}/generated-sources/config</jmxGeneratorPath>
- <config.version>0.2.4-SNAPSHOT</config.version>
</properties>
<build>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>config-util</artifactId>
*/
package org.opendaylight.controller.config.util;
-import java.util.Map;
-import java.util.Set;
+import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.api.ValidationException;
+import org.opendaylight.controller.config.api.jmx.CommitStatus;
+import org.opendaylight.controller.config.api.jmx.ConfigRegistryMXBean;
+import org.opendaylight.controller.config.api.jmx.ConfigTransactionControllerMXBean;
+import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
import javax.management.Attribute;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.JMX;
+import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import javax.management.RuntimeMBeanException;
-
-import org.opendaylight.controller.config.api.ConflictingVersionException;
-import org.opendaylight.controller.config.api.ValidationException;
-import org.opendaylight.controller.config.api.jmx.CommitStatus;
-import org.opendaylight.controller.config.api.jmx.ConfigRegistryMXBean;
-import org.opendaylight.controller.config.api.jmx.ConfigTransactionControllerMXBean;
-import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
+import java.util.Map;
+import java.util.Set;
public class ConfigTransactionJMXClient implements ConfigTransactionClient {
private final ConfigRegistryMXBean configRegistryMXBeanProxy;
throws ValidationException {
try {
configMBeanServer.invoke(configBeanON, "validate", null, null);
+ } catch (MBeanException e) {
+ Exception targetException = e.getTargetException();
+ if (targetException instanceof ValidationException){
+ throw (ValidationException) targetException;
+ } else {
+ throw new RuntimeException(e);
+ }
} catch (JMException e) {
throw new RuntimeException(e);
- } catch (RuntimeMBeanException e) {
- throw e.getTargetException();
}
}
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-plugin-parent</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<artifactId>logback-config</artifactId>
*/
package org.opendaylight.controller.config.yang.logback.config;
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.InstanceNotFoundException;
-import javax.management.JMX;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.joran.spi.JoranException;
+import ch.qos.logback.core.rolling.FixedWindowRollingPolicy;
+import ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy;
+import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
import org.opendaylight.controller.config.manager.impl.AbstractConfigTest;
import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
import org.slf4j.LoggerFactory;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
-import ch.qos.logback.classic.joran.JoranConfigurator;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.joran.spi.JoranException;
-import ch.qos.logback.core.rolling.FixedWindowRollingPolicy;
-import ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
public class LogbackModuleWithInitialConfigurationTest extends AbstractConfigTest {
}
public ObjectName createBeans() throws JoranException, InstanceAlreadyExistsException, IOException,
- MalformedObjectNameException, InstanceNotFoundException {
+ MalformedObjectNameException, InstanceNotFoundException, ValidationException, ConflictingVersionException {
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
*/
package org.opendaylight.controller.config.yang.logback.config;
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.List;
-
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.InstanceNotFoundException;
-import javax.management.JMX;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.core.joran.spi.JoranException;
+import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.manager.impl.AbstractConfigTest;
import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
import org.slf4j.LoggerFactory;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.joran.JoranConfigurator;
-import ch.qos.logback.core.joran.spi.JoranException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.List;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
public class LogbackWithXmlConfigModuleTest extends AbstractConfigTest {
* @throws MalformedObjectNameException
*/
@Test
- public void test() throws InstanceAlreadyExistsException, InstanceNotFoundException, MalformedObjectNameException {
+ public void test() throws InstanceAlreadyExistsException, InstanceNotFoundException, MalformedObjectNameException, ValidationException, ConflictingVersionException {
ConfigTransactionJMXClient transaction = configRegistryClient.createTransaction();
ObjectName nameRetrieved = transaction.lookupConfigBean(factory.getImplementationName(), LogbackModuleFactory.INSTANCE_NAME);
*/
@Test
public void testAddNewLogger() throws InstanceAlreadyExistsException, InstanceNotFoundException,
- MalformedObjectNameException {
+ MalformedObjectNameException, ValidationException, ConflictingVersionException {
ConfigTransactionJMXClient transaction = configRegistryClient.createTransaction();
ObjectName nameRetrieved = transaction.lookupConfigBean(factory.getImplementationName(), LogbackModuleFactory.INSTANCE_NAME);
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-plugin-parent</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-plugin-parent</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-plugin-parent</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-plugin-parent</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
</parent>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<artifactId>config-subsystem</artifactId>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<jacoco.version>0.6.2.201302030002</jacoco.version>
<slf4j.version>1.7.2</slf4j.version>
<salGeneratorPath>${project.build.directory}/generated-sources/sal</salGeneratorPath>
- <config.version>0.2.4-SNAPSHOT</config.version>
</properties>
<dependencies>
<parent>
<artifactId>config-plugin-parent</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>config-plugin-parent</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
import org.opendaylight.controller.config.manager.impl.AbstractConfigTest;
import org.opendaylight.controller.config.manager.impl.factoriesresolver.HardcodedModuleFactoriesResolver;
shutdownViaRuntimeJMX(secret);
}
- private void setSecret(String secret) throws InstanceNotFoundException {
+ private void setSecret(String secret) throws InstanceNotFoundException, ValidationException, ConflictingVersionException {
ConfigTransactionJMXClient transaction = configRegistryClient.createTransaction();
ObjectName on = transaction.lookupConfigBean(NAME, NAME);
ShutdownModuleMXBean proxy = transaction.newMXBeanProxy(on, ShutdownModuleMXBean.class);
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-plugin-parent</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-plugin-parent</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<artifactId>yang-jmx-generator-it</artifactId>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>yang-jmx-generator-plugin</artifactId>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<artifactId>yang-jmx-generator</artifactId>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>yang-store-api</artifactId>
<parent>
<artifactId>config-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>yang-store-impl</artifactId>
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-plugin-parent</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-plugin-parent</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../config-plugin-parent</relativePath>
</parent>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>yang-jmx-generator-plugin</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>${config.version}</version>
</dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>yang-jmx-generator-plugin</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>${config.version}</version>
</dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>maven-sal-api-gen-plugin</artifactId>
- <version>${yangtools.version}</version>
+ <version>${yangtools.version}</version>
<type>jar</type>
</dependency>
</dependencies>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>yang-jmx-generator-plugin</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>${config.version}</version>
</dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
/**
* Core component of the SAL layer responsible for wiring the SAL consumers.
- *
+ *
* The responsibility of the broker is to maintain registration of SAL
* functionality {@link Consumer}s and {@link Provider}s, store provider and
* consumer specific context and functionality registration via
* {@link ConsumerSession} and provide access to infrastructure services, which
* removes direct dependencies between providers and consumers.
- *
- *
+ *
+ *
* <h3>Infrastructure services</h3> Some examples of infrastructure services:
- *
+ *
* <ul>
* <li>RPC Invocation - see {@link ConsumerSession#rpc(QName, CompositeNode)},
* {@link ProviderSession#addRpcImplementation(QName, RpcImplementation)} and
* <li>Data Store access and modification - see {@link DataBrokerService} and
* {@link DataProviderService}
* </ul>
- *
+ *
* The services are exposed via session.
- *
+ *
* <h3>Session-based access</h3>
- *
+ *
* The providers and consumers needs to register in order to use the
* binding-independent SAL layer and to expose functionality via SAL layer.
- *
+ *
* For more information about session-based access see {@link ConsumerSession}
* and {@link ProviderSession}
- *
- *
- *
+ *
+ *
+ *
*/
public interface Broker {
/**
* Registers the {@link Consumer}, which will use the SAL layer.
- *
+ *
* <p>
* During the registration, the broker obtains the initial functionality
* from consumer, using the {@link Consumer#getConsumerFunctionality()}, and
* register that functionality into system and concrete infrastructure
* services.
- *
+ *
* <p>
* Note that consumer could register additional functionality at later point
* by using service and functionality specific APIs.
- *
+ *
* <p>
* The consumer is required to use returned session for all communication
* with broker or one of the broker services. The session is announced to
* the consumer by invoking
* {@link Consumer#onSessionInitiated(ConsumerSession)}.
- *
+ *
* @param cons
* Consumer to be registered.
* @param context
/**
* Registers the {@link Provider}, which will use the SAL layer.
- *
+ *
* <p>
* During the registration, the broker obtains the initial functionality
* from consumer, using the {@link Provider#getProviderFunctionality()}, and
* register that functionality into system and concrete infrastructure
* services.
- *
+ *
* <p>
* Note that consumer could register additional functionality at later point
* by using service and functionality specific APIs (e.g.
* {@link ProviderSession#addRpcImplementation(QName, RpcImplementation)}
- *
+ *
* <p>
* The consumer is <b>required to use</b> returned session for all
* communication with broker or one of the broker services. The session is
* announced to the consumer by invoking
* {@link Provider#onSessionInitiated(ProviderSession)}.
- *
- *
+ *
+ *
* @param prov
* Provider to be registered.
* @param context
/**
* {@link Consumer} specific access to the SAL functionality.
- *
+ *
* <p>
* ConsumerSession is {@link Consumer}-specific access to the SAL
* functionality and infrastructure services.
- *
+ *
* <p>
* The session serves to store SAL context (e.g. registration of
* functionality) for the consumer and provides access to the SAL
* infrastructure services and other functionality provided by
* {@link Provider}s.
- *
- *
- *
+ *
+ *
+ *
*/
public interface ConsumerSession {
/**
* Sends an RPC to other components registered to the broker.
- *
+ *
* @see RpcImplementation
* @param rpc
* Name of RPC
/**
* Returns a session specific instance (implementation) of requested
* service
- *
+ *
* @param service
* Broker service
* @return Session specific implementation of service
/**
* Closes a session between consumer and broker.
- *
+ *
* <p>
* The close operation unregisters a consumer and remove all registered
* functionality of the consumer from the system.
- *
+ *
*/
void close();
}
/**
* {@link Provider} specific access to the SAL functionality.
- *
+ *
* <p>
* ProviderSession is {@link Provider}-specific access to the SAL
* functionality and infrastructure services, which also allows for exposing
* the provider's functionality to the other {@link Consumer}s.
- *
+ *
* <p>
* The session serves to store SAL context (e.g. registration of
* functionality) for the providers and exposes access to the SAL
* infrastructure services, dynamic functionality registration and any other
* functionality provided by other {@link Provider}s.
- *
+ *
*/
public interface ProviderSession extends ConsumerSession {
/**
* Registers an implementation of the rpc.
- *
+ *
* <p>
* The registered rpc functionality will be available to all other
* consumers and providers registered to the broker, which are aware of
* the {@link QName} assigned to the rpc.
- *
+ *
* <p>
* There is no assumption that rpc type is in the set returned by
* invoking {@link RpcImplementation#getSupportedRpcs()}. This allows
* for dynamic rpc implementations.
- *
+ *
* @param rpcType
* Name of Rpc
* @param implementation
/**
* Closes a session between provider and SAL.
- *
+ *
* <p>
* The close operation unregisters a provider and remove all registered
* functionality of the provider from the system.
boolean isClosed();
Set<QName> getSupportedRpcs();
-
+
ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener);
}
public interface RpcRegistration extends Registration<RpcImplementation> {
QName getType();
+
+ @Override
+ void close();
}
public interface RoutedRpcRegistration extends RpcRegistration,
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>yang-jmx-generator-plugin</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>${config.version}</version>
</dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>sal-parent</artifactId>
<version>1.1-SNAPSHOT</version>
</parent>
- <properties>
- <netconf.version>0.2.4-SNAPSHOT</netconf.version>
- </properties>
+
<artifactId>sal-netconf-connector</artifactId>
<scm>
<connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>yang-jmx-generator-plugin</artifactId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>${config.version}</version>
</dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>maven-sal-api-gen-plugin</artifactId>
- <version>${yangtools.version}</version>
+ <version>${yangtools.version}</version>
<type>jar</type>
</dependency>
</dependencies>
val listener = new NetconfDeviceListener(this);
val task = startClientTask(dispatcher, listener)
- if (mountInstance != null) {
- commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
- }
return processingExecutor.submit(task) as Future<Void>;
- //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
}
def Optional<SchemaContext> getSchemaContext() {
deviceContextProvider.createContextFromCapabilities(initialCapabilities);
if (mountInstance != null && schemaContext.isPresent) {
mountInstance.schemaContext = schemaContext.get();
+ val operations = schemaContext.get().operations;
+ for (rpc : operations) {
+ mountInstance.addRpcImplementation(rpc.QName, this);
+ }
}
updateDeviceState()
if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+ commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
}
} catch (Exception e) {
logger.error("Netconf client NOT started. ", e)
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
+/**
+ * {@link ListenerAdapter} is responsible to track events, which occurred by
+ * changing data in data source.
+ */
public class ListenerAdapter implements DataChangeListener {
- private static final Logger logger = LoggerFactory.getLogger(ListenerAdapter.class);
- private final XmlMapper xmlMapper = new XmlMapper();
- private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
-
- private final InstanceIdentifier path;
- private ListenerRegistration<DataChangeListener> registration;
- private final String streamName;
- private Set<Channel> subscribers = new ConcurrentSet<>();
- private final EventBus eventBus;
- private final EventBusChangeRecorder eventBusChangeRecorder;
-
- ListenerAdapter(InstanceIdentifier path, String streamName) {
- Preconditions.checkNotNull(path);
- Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
- this.path = path;
- this.streamName = streamName;
- eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
- eventBusChangeRecorder = new EventBusChangeRecorder();
- eventBus.register(eventBusChangeRecorder);
- }
-
- @Override
- public void onDataChanged(DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
- if (!change.getCreatedConfigurationData().isEmpty() || !change.getCreatedOperationalData().isEmpty()
- || !change.getUpdatedConfigurationData().isEmpty() || !change.getUpdatedOperationalData().isEmpty()
- || !change.getRemovedConfigurationData().isEmpty() || !change.getRemovedOperationalData().isEmpty()) {
- String xml = prepareXmlFrom(change);
- Event event = new Event(EventType.NOTIFY);
- event.setData(xml);
- eventBus.post(event);
- }
- }
-
- private final class EventBusChangeRecorder {
- @Subscribe public void recordCustomerChange(Event event) {
- if (event.getType() == EventType.REGISTER) {
- Channel subscriber = event.getSubscriber();
- if (!subscribers.contains(subscriber)) {
- subscribers.add(subscriber);
- }
- } else if (event.getType() == EventType.DEREGISTER) {
- subscribers.remove(event.getSubscriber());
- Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
- } else if (event.getType() == EventType.NOTIFY) {
- for (Channel subscriber : subscribers) {
- if (subscriber.isActive()) {
- logger.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
- subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
- } else {
- logger.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
- subscribers.remove(subscriber);
- }
- }
- }
- }
- }
-
- private final class Event {
- private final EventType type;
- private Channel subscriber;
- private String data;
-
- public Event(EventType type) {
- this.type = type;
- }
-
- public Channel getSubscriber() {
- return subscriber;
- }
-
- public void setSubscriber(Channel subscriber) {
- this.subscriber = subscriber;
- }
-
- public String getData() {
- return data;
- }
-
- public void setData(String data) {
- this.data = data;
- }
-
- public EventType getType() {
- return type;
- }
- }
-
- private enum EventType {
- REGISTER,
- DEREGISTER,
- NOTIFY;
- }
-
- private String prepareXmlFrom(DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
- Document doc = createDocument();
- Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
- "notification");
- doc.appendChild(notificationElement);
-
- Element eventTimeElement = doc.createElement("eventTime");
- eventTimeElement.setTextContent(toRFC3339(new Date()));
- notificationElement.appendChild(eventTimeElement);
-
- Element dataChangedNotificationEventElement = doc.createElementNS(
- "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
- addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change);
- notificationElement.appendChild(dataChangedNotificationEventElement);
-
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- TransformerFactory tf = TransformerFactory.newInstance();
- Transformer transformer = tf.newTransformer();
- transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
- transformer.setOutputProperty(OutputKeys.METHOD, "xml");
- transformer.setOutputProperty(OutputKeys.INDENT, "yes");
- transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
- transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
- transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, "UTF-8")));
- byte[] charData = out.toByteArray();
- return new String(charData, "UTF-8");
- } catch (TransformerException | UnsupportedEncodingException e) {
- String msg = "Error during transformation of Document into String";
- logger.error(msg, e);
- return msg;
- }
- }
-
- private String toRFC3339(Date d) {
- return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2");
- }
-
- private Document createDocument() {
- DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
- Document doc = null;
- try {
- DocumentBuilder bob = dbf.newDocumentBuilder();
- doc = bob.newDocument();
- } catch (ParserConfigurationException e) {
- return null;
- }
- return doc;
- }
-
- private void addValuesToDataChangedNotificationEventElement(Document doc,
- Element dataChangedNotificationEventElement, DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
- addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.CREATED);
- addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.CREATED);
- if (change.getCreatedConfigurationData().isEmpty()) {
- addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.UPDATED);
- }
- if (change.getCreatedOperationalData().isEmpty()) {
- addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.UPDATED);
- }
- addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.DELETED);
- addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.DELETED);
- }
-
- private void addValuesFromDataToElement(Document doc, Set<InstanceIdentifier> data, Element element, Store store,
- Operation operation) {
- if (data == null || data.isEmpty()) {
- return;
- }
- for (InstanceIdentifier path : data) {
- Node node = createDataChangeEventElement(doc, path, null, store, operation);
- element.appendChild(node);
- }
- }
-
- private void addValuesFromDataToElement(Document doc, Map<InstanceIdentifier, CompositeNode> data, Element element, Store store,
- Operation operation) {
- if (data == null || data.isEmpty()) {
- return;
- }
- for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
- Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), store, operation);
- element.appendChild(node);
- }
- }
-
- private Node createDataChangeEventElement(Document doc, InstanceIdentifier path, CompositeNode data, Store store,
- Operation operation) {
- Element dataChangeEventElement = doc.createElement("data-change-event");
-
- Element pathElement = doc.createElement("path");
- addPathAsValueToElement(path, pathElement);
- dataChangeEventElement.appendChild(pathElement);
-
- Element storeElement = doc.createElement("store");
- storeElement.setTextContent(store.value);
- dataChangeEventElement.appendChild(storeElement);
-
- Element operationElement = doc.createElement("operation");
- operationElement.setTextContent(operation.value);
- dataChangeEventElement.appendChild(operationElement);
-
- if (data != null) {
- Element dataElement = doc.createElement("data");
- Node dataAnyXml = translateToXml(path, data);
- Node adoptedNode = doc.adoptNode(dataAnyXml);
- dataElement.appendChild(adoptedNode);
- dataChangeEventElement.appendChild(dataElement);
- }
-
- return dataChangeEventElement;
- }
-
- private Node translateToXml(InstanceIdentifier path, CompositeNode data) {
- DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path);
- if (schemaNode == null) {
- logger.info("Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", path);
- return null;
- }
- try {
- Document xml = xmlMapper.write(data, schemaNode);
- return xml.getFirstChild();
- } catch (UnsupportedDataTypeException e) {
- logger.error("Error occured during translation of notification to XML.", e);
- return null;
- }
- }
-
- private void addPathAsValueToElement(InstanceIdentifier path, Element element) {
- // Map< key = namespace, value = prefix>
- Map<String, String> prefixes = new HashMap<>();
- InstanceIdentifier instanceIdentifier = path;
- StringBuilder textContent = new StringBuilder();
- for (PathArgument pathArgument : instanceIdentifier.getPath()) {
- textContent.append("/");
- writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
- if (pathArgument instanceof NodeIdentifierWithPredicates) {
- Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
- for (QName keyValue : predicates.keySet()) {
- String predicateValue = String.valueOf(predicates.get(keyValue));
- textContent.append("[");
- writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
- textContent.append("='");
- textContent.append(predicateValue);
- textContent.append("'");
- textContent.append("]");
- }
- } else if (pathArgument instanceof NodeWithValue) {
- textContent.append("[.='");
- textContent.append(((NodeWithValue)pathArgument).getValue());
- textContent.append("'");
- textContent.append("]");
- }
- }
- element.setTextContent(textContent.toString());
- }
-
- private static void writeIdentifierWithNamespacePrefix(Element element, StringBuilder textContent, QName qName,
- Map<String, String> prefixes) {
- String namespace = qName.getNamespace().toString();
- String prefix = prefixes.get(namespace);
- if (prefix == null) {
- prefix = qName.getPrefix();
- if (prefix == null || prefix.isEmpty() || prefixes.containsValue(prefix)) {
- prefix = generateNewPrefix(prefixes.values());
- }
- }
-
- element.setAttribute("xmlns:" + prefix, namespace.toString());
- textContent.append(prefix);
- prefixes.put(namespace, prefix);
-
- textContent.append(":");
- textContent.append(qName.getLocalName());
- }
-
- private static String generateNewPrefix(Collection<String> prefixes) {
- StringBuilder result = null;
- Random random = new Random();
- do {
- result = new StringBuilder();
- for (int i = 0; i < 4; i++) {
- int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
- result.append(Character.toChars(randomNumber));
- }
- } while (prefixes.contains(result.toString()));
-
- return result.toString();
- }
-
- public InstanceIdentifier getPath() {
- return path;
- }
-
- public void setRegistration(ListenerRegistration<DataChangeListener> registration) {
- this.registration = registration;
- }
-
- public String getStreamName() {
- return streamName;
- }
-
- public void close() throws Exception {
- subscribers = new ConcurrentSet<>();
- registration.close();
- registration = null;
- eventBus.unregister(eventBusChangeRecorder);
- }
-
- public boolean isListening() {
- return registration == null ? false : true;
- }
-
- public void addSubscriber(Channel subscriber) {
- if (!subscriber.isActive()) {
- logger.debug("Channel is not active between websocket server and subscriber {}"
- + subscriber.remoteAddress());
- }
- Event event = new Event(EventType.REGISTER);
- event.setSubscriber(subscriber);
- eventBus.post(event);
- }
-
- public void removeSubscriber(Channel subscriber) {
- logger.debug("Subscriber {} is removed.", subscriber.remoteAddress());
- Event event = new Event(EventType.DEREGISTER);
- event.setSubscriber(subscriber);
- eventBus.post(event);
- }
-
- public boolean hasSubscribers() {
- return !subscribers.isEmpty();
- }
-
- private static enum Store {
- CONFIG("config"),
- OPERATION("operation");
-
- private final String value;
-
- private Store(String value) {
- this.value = value;
- }
- }
-
- private static enum Operation {
- CREATED("created"),
- UPDATED("updated"),
- DELETED("deleted");
-
- private final String value;
-
- private Operation(String value) {
- this.value = value;
- }
- }
+ private static final Logger logger = LoggerFactory
+ .getLogger(ListenerAdapter.class);
+ private final XmlMapper xmlMapper = new XmlMapper();
+ private final SimpleDateFormat rfc3339 = new SimpleDateFormat(
+ "yyyy-MM-dd'T'hh:mm:ssZ");
+
+ private final InstanceIdentifier path;
+ private ListenerRegistration<DataChangeListener> registration;
+ private final String streamName;
+ private Set<Channel> subscribers = new ConcurrentSet<>();
+ private final EventBus eventBus;
+ private final EventBusChangeRecorder eventBusChangeRecorder;
+
+ /**
+ * Creates new {@link ListenerAdapter} listener specified by path and stream
+ * name.
+ *
+ * @param path
+ * Path to data in data store.
+ * @param streamName
+ * The name of the stream.
+ */
+ ListenerAdapter(InstanceIdentifier path, String streamName) {
+ Preconditions.checkNotNull(path);
+ Preconditions
+ .checkArgument(streamName != null && !streamName.isEmpty());
+ this.path = path;
+ this.streamName = streamName;
+ eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+ eventBusChangeRecorder = new EventBusChangeRecorder();
+ eventBus.register(eventBusChangeRecorder);
+ }
+
+ @Override
+ public void onDataChanged(
+ DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
+ if (!change.getCreatedConfigurationData().isEmpty()
+ || !change.getCreatedOperationalData().isEmpty()
+ || !change.getUpdatedConfigurationData().isEmpty()
+ || !change.getUpdatedOperationalData().isEmpty()
+ || !change.getRemovedConfigurationData().isEmpty()
+ || !change.getRemovedOperationalData().isEmpty()) {
+ String xml = prepareXmlFrom(change);
+ Event event = new Event(EventType.NOTIFY);
+ event.setData(xml);
+ eventBus.post(event);
+ }
+ }
+
+ /**
+ * Tracks events of data change by customer.
+ */
+ private final class EventBusChangeRecorder {
+ @Subscribe
+ public void recordCustomerChange(Event event) {
+ if (event.getType() == EventType.REGISTER) {
+ Channel subscriber = event.getSubscriber();
+ if (!subscribers.contains(subscriber)) {
+ subscribers.add(subscriber);
+ }
+ } else if (event.getType() == EventType.DEREGISTER) {
+ subscribers.remove(event.getSubscriber());
+ Notificator
+ .removeListenerIfNoSubscriberExists(ListenerAdapter.this);
+ } else if (event.getType() == EventType.NOTIFY) {
+ for (Channel subscriber : subscribers) {
+ if (subscriber.isActive()) {
+ logger.debug("Data are sent to subscriber {}:",
+ subscriber.remoteAddress());
+ subscriber.writeAndFlush(new TextWebSocketFrame(event
+ .getData()));
+ } else {
+ logger.debug(
+ "Subscriber {} is removed - channel is not active yet.",
+ subscriber.remoteAddress());
+ subscribers.remove(subscriber);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Represents event of specific {@link EventType} type, holds data and
+ * {@link Channel} subscriber.
+ */
+ private final class Event {
+ private final EventType type;
+ private Channel subscriber;
+ private String data;
+
+ /**
+ * Creates new event specified by {@link EventType} type.
+ *
+ * @param type
+ * EventType
+ */
+ public Event(EventType type) {
+ this.type = type;
+ }
+
+ /**
+ * Gets the {@link Channel} subscriber.
+ *
+ * @return Channel
+ */
+ public Channel getSubscriber() {
+ return subscriber;
+ }
+
+ /**
+ * Sets subscriber for event.
+ *
+ * @param subscriber
+ * Channel
+ */
+ public void setSubscriber(Channel subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ /**
+ * Gets event data.
+ *
+ * @return String representation of event data.
+ */
+ public String getData() {
+ return data;
+ }
+
+ /**
+ * Sets event data.
+ *
+ * @param String
+ * data.
+ */
+ public void setData(String data) {
+ this.data = data;
+ }
+
+ /**
+ * Gets event type.
+ *
+ * @return The type of the event.
+ */
+ public EventType getType() {
+ return type;
+ }
+ }
+
+ /**
+ * Type of the event.
+ */
+ private enum EventType {
+ REGISTER, DEREGISTER, NOTIFY;
+ }
+
+ /**
+ * Prepare data in printable form and transform it to String.
+ *
+ * @param change
+ * DataChangeEvent
+ * @return Data in printable form.
+ */
+ private String prepareXmlFrom(
+ DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
+ Document doc = createDocument();
+ Element notificationElement = doc.createElementNS(
+ "urn:ietf:params:xml:ns:netconf:notification:1.0",
+ "notification");
+ doc.appendChild(notificationElement);
+
+ Element eventTimeElement = doc.createElement("eventTime");
+ eventTimeElement.setTextContent(toRFC3339(new Date()));
+ notificationElement.appendChild(eventTimeElement);
+
+ Element dataChangedNotificationEventElement = doc.createElementNS(
+ "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
+ "data-changed-notification");
+ addValuesToDataChangedNotificationEventElement(doc,
+ dataChangedNotificationEventElement, change);
+ notificationElement.appendChild(dataChangedNotificationEventElement);
+
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ TransformerFactory tf = TransformerFactory.newInstance();
+ Transformer transformer = tf.newTransformer();
+ transformer
+ .setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+ transformer.setOutputProperty(OutputKeys.METHOD, "xml");
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+ transformer.setOutputProperty(
+ "{http://xml.apache.org/xslt}indent-amount", "4");
+ transformer.transform(new DOMSource(doc), new StreamResult(
+ new OutputStreamWriter(out, "UTF-8")));
+ byte[] charData = out.toByteArray();
+ return new String(charData, "UTF-8");
+ } catch (TransformerException | UnsupportedEncodingException e) {
+ String msg = "Error during transformation of Document into String";
+ logger.error(msg, e);
+ return msg;
+ }
+ }
+
+ /**
+ * Formats data specified by RFC3339.
+ *
+ * @param d
+ * Date
+ * @return Data specified by RFC3339.
+ */
+ private String toRFC3339(Date d) {
+ return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2");
+ }
+
+ /**
+ * Creates {@link Document} document.
+ *
+ * @return {@link Document} document.
+ */
+ private Document createDocument() {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ Document doc = null;
+ try {
+ DocumentBuilder bob = dbf.newDocumentBuilder();
+ doc = bob.newDocument();
+ } catch (ParserConfigurationException e) {
+ return null;
+ }
+ return doc;
+ }
+
+ /**
+ * Adds values to data changed notification event element.
+ *
+ * @param doc
+ * {@link Document}
+ * @param dataChangedNotificationEventElement
+ * {@link Element}
+ * @param change
+ * {@link DataChangeEvent}
+ */
+ private void addValuesToDataChangedNotificationEventElement(Document doc,
+ Element dataChangedNotificationEventElement,
+ DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
+ addValuesFromDataToElement(doc, change.getCreatedConfigurationData(),
+ dataChangedNotificationEventElement, Store.CONFIG,
+ Operation.CREATED);
+ addValuesFromDataToElement(doc, change.getCreatedOperationalData(),
+ dataChangedNotificationEventElement, Store.OPERATION,
+ Operation.CREATED);
+ if (change.getCreatedConfigurationData().isEmpty()) {
+ addValuesFromDataToElement(doc,
+ change.getUpdatedConfigurationData(),
+ dataChangedNotificationEventElement, Store.CONFIG,
+ Operation.UPDATED);
+ }
+ if (change.getCreatedOperationalData().isEmpty()) {
+ addValuesFromDataToElement(doc, change.getUpdatedOperationalData(),
+ dataChangedNotificationEventElement, Store.OPERATION,
+ Operation.UPDATED);
+ }
+ addValuesFromDataToElement(doc, change.getRemovedConfigurationData(),
+ dataChangedNotificationEventElement, Store.CONFIG,
+ Operation.DELETED);
+ addValuesFromDataToElement(doc, change.getRemovedOperationalData(),
+ dataChangedNotificationEventElement, Store.OPERATION,
+ Operation.DELETED);
+ }
+
+ /**
+ * Adds values from data to element.
+ *
+ * @param doc
+ * {@link Document}
+ * @param data
+ * Set of {@link InstanceIdentifier}.
+ * @param element
+ * {@link Element}
+ * @param store
+ * {@link Store}
+ * @param operation
+ * {@link Operation}
+ */
+ private void addValuesFromDataToElement(Document doc,
+ Set<InstanceIdentifier> data, Element element, Store store,
+ Operation operation) {
+ if (data == null || data.isEmpty()) {
+ return;
+ }
+ for (InstanceIdentifier path : data) {
+ Node node = createDataChangeEventElement(doc, path, null, store,
+ operation);
+ element.appendChild(node);
+ }
+ }
+
+ /**
+ * Adds values from data to element.
+ *
+ * @param doc
+ * {@link Document}
+ * @param data
+ * Map of {@link InstanceIdentifier} and {@link CompositeNode}.
+ * @param element
+ * {@link Element}
+ * @param store
+ * {@link Store}
+ * @param operation
+ * {@link Operation}
+ */
+ private void addValuesFromDataToElement(Document doc,
+ Map<InstanceIdentifier, CompositeNode> data, Element element,
+ Store store, Operation operation) {
+ if (data == null || data.isEmpty()) {
+ return;
+ }
+ for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
+ Node node = createDataChangeEventElement(doc, entry.getKey(),
+ entry.getValue(), store, operation);
+ element.appendChild(node);
+ }
+ }
+
+ /**
+ * Creates changed event element from data.
+ *
+ * @param doc
+ * {@link Document}
+ * @param path
+ * Path to data in data store.
+ * @param data
+ * {@link CompositeNode}
+ * @param store
+ * {@link Store}
+ * @param operation
+ * {@link Operation}
+ * @return {@link Node} node represented by changed event element.
+ */
+ private Node createDataChangeEventElement(Document doc,
+ InstanceIdentifier path, CompositeNode data, Store store,
+ Operation operation) {
+ Element dataChangeEventElement = doc.createElement("data-change-event");
+
+ Element pathElement = doc.createElement("path");
+ addPathAsValueToElement(path, pathElement);
+ dataChangeEventElement.appendChild(pathElement);
+
+ Element storeElement = doc.createElement("store");
+ storeElement.setTextContent(store.value);
+ dataChangeEventElement.appendChild(storeElement);
+
+ Element operationElement = doc.createElement("operation");
+ operationElement.setTextContent(operation.value);
+ dataChangeEventElement.appendChild(operationElement);
+
+ if (data != null) {
+ Element dataElement = doc.createElement("data");
+ Node dataAnyXml = translateToXml(path, data);
+ Node adoptedNode = doc.adoptNode(dataAnyXml);
+ dataElement.appendChild(adoptedNode);
+ dataChangeEventElement.appendChild(dataElement);
+ }
+
+ return dataChangeEventElement;
+ }
+
+ /**
+ * Translates {@link CompositeNode} data to XML format.
+ *
+ * @param path
+ * Path to data in data store.
+ * @param data
+ * {@link CompositeNode}
+ * @return Data in XML format.
+ */
+ private Node translateToXml(InstanceIdentifier path, CompositeNode data) {
+ DataNodeContainer schemaNode = ControllerContext.getInstance()
+ .getDataNodeContainerFor(path);
+ if (schemaNode == null) {
+ logger.info(
+ "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.",
+ path);
+ return null;
+ }
+ try {
+ Document xml = xmlMapper.write(data, schemaNode);
+ return xml.getFirstChild();
+ } catch (UnsupportedDataTypeException e) {
+ logger.error(
+ "Error occured during translation of notification to XML.",
+ e);
+ return null;
+ }
+ }
+
+ /**
+ * Adds path as value to element.
+ *
+ * @param path
+ * Path to data in data store.
+ * @param element
+ * {@link Element}
+ */
+ private void addPathAsValueToElement(InstanceIdentifier path,
+ Element element) {
+ // Map< key = namespace, value = prefix>
+ Map<String, String> prefixes = new HashMap<>();
+ InstanceIdentifier instanceIdentifier = path;
+ StringBuilder textContent = new StringBuilder();
+ for (PathArgument pathArgument : instanceIdentifier.getPath()) {
+ textContent.append("/");
+ writeIdentifierWithNamespacePrefix(element, textContent,
+ pathArgument.getNodeType(), prefixes);
+ if (pathArgument instanceof NodeIdentifierWithPredicates) {
+ Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument)
+ .getKeyValues();
+ for (QName keyValue : predicates.keySet()) {
+ String predicateValue = String.valueOf(predicates
+ .get(keyValue));
+ textContent.append("[");
+ writeIdentifierWithNamespacePrefix(element, textContent,
+ keyValue, prefixes);
+ textContent.append("='");
+ textContent.append(predicateValue);
+ textContent.append("'");
+ textContent.append("]");
+ }
+ } else if (pathArgument instanceof NodeWithValue) {
+ textContent.append("[.='");
+ textContent.append(((NodeWithValue) pathArgument).getValue());
+ textContent.append("'");
+ textContent.append("]");
+ }
+ }
+ element.setTextContent(textContent.toString());
+ }
+
+ /**
+ * Writes identifier that consists of prefix and QName.
+ *
+ * @param element
+ * {@link Element}
+ * @param textContent
+ * StringBuilder
+ * @param qName
+ * QName
+ * @param prefixes
+ * Map of namespaces and prefixes.
+ */
+ private static void writeIdentifierWithNamespacePrefix(Element element,
+ StringBuilder textContent, QName qName, Map<String, String> prefixes) {
+ String namespace = qName.getNamespace().toString();
+ String prefix = prefixes.get(namespace);
+ if (prefix == null) {
+ prefix = qName.getPrefix();
+ if (prefix == null || prefix.isEmpty()
+ || prefixes.containsValue(prefix)) {
+ prefix = generateNewPrefix(prefixes.values());
+ }
+ }
+
+ element.setAttribute("xmlns:" + prefix, namespace.toString());
+ textContent.append(prefix);
+ prefixes.put(namespace, prefix);
+
+ textContent.append(":");
+ textContent.append(qName.getLocalName());
+ }
+
+ /**
+ * Generates new prefix which consists of four random characters <a-z>.
+ *
+ * @param prefixes
+ * Collection of prefixes.
+ * @return New prefix which consists of four random characters <a-z>.
+ */
+ private static String generateNewPrefix(Collection<String> prefixes) {
+ StringBuilder result = null;
+ Random random = new Random();
+ do {
+ result = new StringBuilder();
+ for (int i = 0; i < 4; i++) {
+ int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
+ result.append(Character.toChars(randomNumber));
+ }
+ } while (prefixes.contains(result.toString()));
+
+ return result.toString();
+ }
+
+ /**
+ * Gets path pointed to data in data store.
+ *
+ * @return Path pointed to data in data store.
+ */
+ public InstanceIdentifier getPath() {
+ return path;
+ }
+
+ /**
+ * Sets {@link ListenerRegistration} registration.
+ *
+ * @param registration
+ * ListenerRegistration<DataChangeListener>
+ */
+ public void setRegistration(
+ ListenerRegistration<DataChangeListener> registration) {
+ this.registration = registration;
+ }
+
+ /**
+ * Gets the name of the stream.
+ *
+ * @return The name of the stream.
+ */
+ public String getStreamName() {
+ return streamName;
+ }
+
+ /**
+ * Removes all subscribers and unregisters event bus change recorder form
+ * event bus.
+ */
+ public void close() throws Exception {
+ subscribers = new ConcurrentSet<>();
+ registration.close();
+ registration = null;
+ eventBus.unregister(eventBusChangeRecorder);
+ }
+
+ /**
+ * Checks if {@link ListenerRegistration} registration exist.
+ *
+ * @return True if exist, false otherwise.
+ */
+ public boolean isListening() {
+ return registration == null ? false : true;
+ }
+
+ /**
+ * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
+ * subscriber to the event and post event into event bus.
+ *
+ * @param subscriber
+ * Channel
+ */
+ public void addSubscriber(Channel subscriber) {
+ if (!subscriber.isActive()) {
+ logger.debug("Channel is not active between websocket server and subscriber {}"
+ + subscriber.remoteAddress());
+ }
+ Event event = new Event(EventType.REGISTER);
+ event.setSubscriber(subscriber);
+ eventBus.post(event);
+ }
+
+ /**
+ * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
+ * subscriber to the event and posts event into event bus.
+ *
+ * @param subscriber
+ */
+ public void removeSubscriber(Channel subscriber) {
+ logger.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+ Event event = new Event(EventType.DEREGISTER);
+ event.setSubscriber(subscriber);
+ eventBus.post(event);
+ }
+
+ /**
+ * Checks if exists at least one {@link Channel} subscriber.
+ *
+ * @return True if exist at least one {@link Channel} subscriber, false
+ * otherwise.
+ */
+ public boolean hasSubscribers() {
+ return !subscribers.isEmpty();
+ }
+
+ /**
+ * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
+ */
+ private static enum Store {
+ CONFIG("config"), OPERATION("operation");
+
+ private final String value;
+
+ private Store(String value) {
+ this.value = value;
+ }
+ }
+
+ /**
+ * Consists of three types {@link Operation#CREATED},
+ * {@link Operation#UPDATED} and {@link Operation#DELETED}.
+ */
+ private static enum Operation {
+ CREATED("created"), UPDATED("updated"), DELETED("deleted");
+
+ private final String value;
+
+ private Operation(String value) {
+ this.value = value;
+ }
+ }
}
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+/**
+ * {@link Notificator} is responsible to create, remove and find {@link ListenerAdapter} listener.
+ */
public class Notificator {
- private static Map<String, ListenerAdapter> listenersByStreamName = new ConcurrentHashMap<>();
- private static Map<InstanceIdentifier, ListenerAdapter> listenersByInstanceIdentifier = new ConcurrentHashMap<>();
- private static final Lock lock = new ReentrantLock();
+ private static Map<String, ListenerAdapter> listenersByStreamName = new ConcurrentHashMap<>();
+ private static Map<InstanceIdentifier, ListenerAdapter> listenersByInstanceIdentifier = new ConcurrentHashMap<>();
+ private static final Lock lock = new ReentrantLock();
- private Notificator() {
- }
+ private Notificator() {
+ }
- public static ListenerAdapter getListenerFor(String streamName) {
- return listenersByStreamName.get(streamName);
- }
+ /**
+ * Gets {@link ListenerAdapter} specified by stream name.
+ *
+ * @param streamName
+ * The name of the stream.
+ * @return {@link ListenerAdapter} specified by stream name.
+ */
+ public static ListenerAdapter getListenerFor(String streamName) {
+ return listenersByStreamName.get(streamName);
+ }
- public static ListenerAdapter getListenerFor(InstanceIdentifier path) {
- return listenersByInstanceIdentifier.get(path);
- }
+ /**
+ * Gets {@link ListenerAdapter} listener specified by
+ * {@link InstanceIdentifier} path.
+ *
+ * @param path
+ * Path to data in data repository.
+ * @return ListenerAdapter
+ */
+ public static ListenerAdapter getListenerFor(InstanceIdentifier path) {
+ return listenersByInstanceIdentifier.get(path);
+ }
- public static boolean existListenerFor(InstanceIdentifier path) {
- return listenersByInstanceIdentifier.containsKey(path);
- }
+ /**
+ * Checks if the listener specified by {@link InstanceIdentifier} path
+ * exist.
+ *
+ * @param path
+ * Path to data in data repository.
+ * @return True if the listener exist, false otherwise.
+ */
+ public static boolean existListenerFor(InstanceIdentifier path) {
+ return listenersByInstanceIdentifier.containsKey(path);
+ }
- public static ListenerAdapter createListener(InstanceIdentifier path, String streamName) {
- ListenerAdapter listener = new ListenerAdapter(path, streamName);
- try {
- lock.lock();
- listenersByInstanceIdentifier.put(path, listener);
- listenersByStreamName.put(streamName, listener);
- } finally {
- lock.unlock();
- }
- return listener;
- }
+ /**
+ * Creates new {@link ListenerAdapter} listener from
+ * {@link InstanceIdentifier} path and stream name.
+ *
+ * @param path
+ * Path to data in data repository.
+ * @param streamName
+ * The name of the stream.
+ * @return New {@link ListenerAdapter} listener from
+ * {@link InstanceIdentifier} path and stream name.
+ */
+ public static ListenerAdapter createListener(InstanceIdentifier path,
+ String streamName) {
+ ListenerAdapter listener = new ListenerAdapter(path, streamName);
+ try {
+ lock.lock();
+ listenersByInstanceIdentifier.put(path, listener);
+ listenersByStreamName.put(streamName, listener);
+ } finally {
+ lock.unlock();
+ }
+ return listener;
+ }
- public static void removeListener(InstanceIdentifier path) {
- ListenerAdapter listener = listenersByInstanceIdentifier.get(path);
- deleteListener(listener);
- }
+ /**
+ * Looks for listener determined by {@link InstanceIdentifier} path and
+ * removes it.
+ *
+ * @param path
+ * InstanceIdentifier
+ */
+ public static void removeListener(InstanceIdentifier path) {
+ ListenerAdapter listener = listenersByInstanceIdentifier.get(path);
+ deleteListener(listener);
+ }
- public static String createStreamNameFromUri(String uri) {
- if (uri == null) {
- return null;
- }
- String result = uri;
- if (result.startsWith("/")) {
- result = result.substring(1);
- }
- if (result.endsWith("/")) {
- result = result.substring(0, result.length());
- }
- return result;
- }
+ /**
+ * Creates String representation of stream name from URI. Removes slash from
+ * URI in start and end position.
+ *
+ * @param uri
+ * URI for creation stream name.
+ * @return String representation of stream name.
+ */
+ public static String createStreamNameFromUri(String uri) {
+ if (uri == null) {
+ return null;
+ }
+ String result = uri;
+ if (result.startsWith("/")) {
+ result = result.substring(1);
+ }
+ if (result.endsWith("/")) {
+ result = result.substring(0, result.length());
+ }
+ return result;
+ }
- public static void removeAllListeners() {
- for (ListenerAdapter listener : listenersByInstanceIdentifier.values()) {
- try {
- listener.close();
- } catch (Exception e) {
- }
- }
- try {
- lock.lock();
- listenersByStreamName = new ConcurrentHashMap<>();
- listenersByInstanceIdentifier = new ConcurrentHashMap<>();
- } finally {
- lock.unlock();
- }
- }
+ /**
+ * Removes all listeners.
+ */
+ public static void removeAllListeners() {
+ for (ListenerAdapter listener : listenersByInstanceIdentifier.values()) {
+ try {
+ listener.close();
+ } catch (Exception e) {
+ }
+ }
+ try {
+ lock.lock();
+ listenersByStreamName = new ConcurrentHashMap<>();
+ listenersByInstanceIdentifier = new ConcurrentHashMap<>();
+ } finally {
+ lock.unlock();
+ }
+ }
- public static void removeListenerIfNoSubscriberExists(ListenerAdapter listener) {
- if (!listener.hasSubscribers()) {
- deleteListener(listener);
- }
- }
+ /**
+ * Checks if listener has at least one subscriber. In case it has any, delete
+ * listener.
+ *
+ * @param listener
+ * ListenerAdapter
+ */
+ public static void removeListenerIfNoSubscriberExists(
+ ListenerAdapter listener) {
+ if (!listener.hasSubscribers()) {
+ deleteListener(listener);
+ }
+ }
- private static void deleteListener(ListenerAdapter listener) {
- if (listener != null) {
- try {
- listener.close();
- } catch (Exception e) {
- }
- try {
- lock.lock();
- listenersByInstanceIdentifier.remove(listener.getPath());
- listenersByStreamName.remove(listener.getStreamName());
- } finally {
- lock.unlock();
- }
- }
- }
+ /**
+ * Delete {@link ListenerAdapter} listener specified in parameter.
+ *
+ * @param listener
+ * ListenerAdapter
+ */
+ private static void deleteListener(ListenerAdapter listener) {
+ if (listener != null) {
+ try {
+ listener.close();
+ } catch (Exception e) {
+ }
+ try {
+ lock.lock();
+ listenersByInstanceIdentifier.remove(listener.getPath());
+ listenersByStreamName.remove(listener.getStreamName());
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
}
\ No newline at end of file
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+/**
+ * {@link WebSocketServer} is responsible to start and stop web socket server at
+ * {@link #PORT}.
+ */
public class WebSocketServer implements Runnable {
- private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
-
- public static final int PORT = 8181;
- private EventLoopGroup bossGroup;
- private EventLoopGroup workerGroup;
-
- @Override
- public void run() {
- bossGroup = new NioEventLoopGroup();
- workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new WebSocketServerInitializer());
-
- Channel ch = b.bind(PORT).sync().channel();
- logger.info("Web socket server started at port {}.", PORT);
-
- ch.closeFuture().sync();
- } catch (InterruptedException e) {
- // NOOP
- } finally {
- stop();
- }
- }
-
- private void stop() {
- Notificator.removeAllListeners();
- if (bossGroup != null) {
- bossGroup.shutdownGracefully();
- }
- if (workerGroup != null) {
- workerGroup.shutdownGracefully();
- }
- }
+ private static final Logger logger = LoggerFactory
+ .getLogger(WebSocketServer.class);
+
+ public static final int PORT = 8181;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+
+ @Override
+ public void run() {
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new WebSocketServerInitializer());
+
+ Channel ch = b.bind(PORT).sync().channel();
+ logger.info("Web socket server started at port {}.", PORT);
+
+ ch.closeFuture().sync();
+ } catch (InterruptedException e) {
+ // NOOP
+ } finally {
+ stop();
+ }
+ }
+
+ /**
+ * Stops the web socket server and removes all listeners.
+ */
+ private void stop() {
+ Notificator.removeAllListeners();
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
+ }
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * {@link WebSocketServerHandler} is implementation of
+ * {@link SimpleChannelInboundHandler} which allow handle
+ * {@link FullHttpRequest} and {@link WebSocketFrame} messages.
+ */
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
- private static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandler.class);
-
- private WebSocketServerHandshaker handshaker;
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg instanceof FullHttpRequest) {
- handleHttpRequest(ctx, (FullHttpRequest) msg);
- } else if (msg instanceof WebSocketFrame) {
- handleWebSocketFrame(ctx, (WebSocketFrame) msg);
- }
- }
-
- private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)
- throws Exception {
- // Handle a bad request.
- if (!req.getDecoderResult().isSuccess()) {
- sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
- return;
- }
-
- // Allow only GET methods.
- if (req.getMethod() != GET) {
- sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
- return;
- }
-
- String streamName = Notificator.createStreamNameFromUri(req.getUri());
- ListenerAdapter listener = Notificator.getListenerFor(streamName);
- if (listener != null) {
- listener.addSubscriber(ctx.channel());
- logger.debug("Subscriber successfully registered.");
- } else {
- logger.error("Listener for stream with name '{}' was not found.", streamName);
- sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
- }
-
- // Handshake
- WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
- getWebSocketLocation(req), null, false);
- handshaker = wsFactory.newHandshaker(req);
- if (handshaker == null) {
- WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
- } else {
- handshaker.handshake(ctx.channel(), req);
- }
-
- }
-
- private static void sendHttpResponse(ChannelHandlerContext ctx,
- HttpRequest req, FullHttpResponse res) {
- // Generate an error page if response getStatus code is not OK (200).
- if (res.getStatus().code() != 200) {
- ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
- res.content().writeBytes(buf);
- buf.release();
- setContentLength(res, res.content().readableBytes());
- }
-
- // Send the response and close the connection if necessary.
- ChannelFuture f = ctx.channel().writeAndFlush(res);
- if (!isKeepAlive(req) || res.getStatus().code() != 200) {
- f.addListener(ChannelFutureListener.CLOSE);
- }
- }
-
- private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws IOException {
- if (frame instanceof CloseWebSocketFrame) {
- handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
- String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
- ListenerAdapter listener = Notificator.getListenerFor(streamName);
- if (listener != null) {
- listener.removeSubscriber(ctx.channel());
- logger.debug("Subscriber successfully registered.");
- }
- Notificator.removeListenerIfNoSubscriberExists(listener);
- return;
- } else if (frame instanceof PingWebSocketFrame) {
- ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
- return;
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- if (cause instanceof java.nio.channels.ClosedChannelException == false) {
- //cause.printStackTrace();
- }
- ctx.close();
- }
-
- private static String getWebSocketLocation(HttpRequest req) {
- return "http://" + req.headers().get(HOST) + req.getUri();
- }
+ private static final Logger logger = LoggerFactory
+ .getLogger(WebSocketServerHandler.class);
+
+ private WebSocketServerHandshaker handshaker;
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+ if (msg instanceof FullHttpRequest) {
+ handleHttpRequest(ctx, (FullHttpRequest) msg);
+ } else if (msg instanceof WebSocketFrame) {
+ handleWebSocketFrame(ctx, (WebSocketFrame) msg);
+ }
+ }
+
+ /**
+ * Checks if HTTP request method is GET and if is possible to decode HTTP
+ * result of request.
+ *
+ * @param ctx
+ * ChannelHandlerContext
+ * @param req
+ * FullHttpRequest
+ */
+ private void handleHttpRequest(ChannelHandlerContext ctx,
+ FullHttpRequest req) throws Exception {
+ // Handle a bad request.
+ if (!req.getDecoderResult().isSuccess()) {
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1,
+ BAD_REQUEST));
+ return;
+ }
+
+ // Allow only GET methods.
+ if (req.getMethod() != GET) {
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1,
+ FORBIDDEN));
+ return;
+ }
+
+ String streamName = Notificator.createStreamNameFromUri(req.getUri());
+ ListenerAdapter listener = Notificator.getListenerFor(streamName);
+ if (listener != null) {
+ listener.addSubscriber(ctx.channel());
+ logger.debug("Subscriber successfully registered.");
+ } else {
+ logger.error("Listener for stream with name '{}' was not found.",
+ streamName);
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1,
+ INTERNAL_SERVER_ERROR));
+ }
+
+ // Handshake
+ WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
+ getWebSocketLocation(req), null, false);
+ handshaker = wsFactory.newHandshaker(req);
+ if (handshaker == null) {
+ WebSocketServerHandshakerFactory
+ .sendUnsupportedWebSocketVersionResponse(ctx.channel());
+ } else {
+ handshaker.handshake(ctx.channel(), req);
+ }
+
+ }
+
+ /**
+ * Checks response status, send response and close connection if necessary
+ *
+ * @param ctx
+ * ChannelHandlerContext
+ * @param req
+ * HttpRequest
+ * @param res
+ * FullHttpResponse
+ */
+ private static void sendHttpResponse(ChannelHandlerContext ctx,
+ HttpRequest req, FullHttpResponse res) {
+ // Generate an error page if response getStatus code is not OK (200).
+ if (res.getStatus().code() != 200) {
+ ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(),
+ CharsetUtil.UTF_8);
+ res.content().writeBytes(buf);
+ buf.release();
+ setContentLength(res, res.content().readableBytes());
+ }
+
+ // Send the response and close the connection if necessary.
+ ChannelFuture f = ctx.channel().writeAndFlush(res);
+ if (!isKeepAlive(req) || res.getStatus().code() != 200) {
+ f.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ /**
+ * Handles web socket frame.
+ *
+ * @param ctx
+ * {@link ChannelHandlerContext}
+ * @param frame
+ * {@link WebSocketFrame}
+ */
+ private void handleWebSocketFrame(ChannelHandlerContext ctx,
+ WebSocketFrame frame) throws IOException {
+ if (frame instanceof CloseWebSocketFrame) {
+ handshaker.close(ctx.channel(),
+ (CloseWebSocketFrame) frame.retain());
+ String streamName = Notificator
+ .createStreamNameFromUri(((CloseWebSocketFrame) frame)
+ .reasonText());
+ ListenerAdapter listener = Notificator.getListenerFor(streamName);
+ if (listener != null) {
+ listener.removeSubscriber(ctx.channel());
+ logger.debug("Subscriber successfully registered.");
+ }
+ Notificator.removeListenerIfNoSubscriberExists(listener);
+ return;
+ } else if (frame instanceof PingWebSocketFrame) {
+ ctx.channel().write(
+ new PongWebSocketFrame(frame.content().retain()));
+ return;
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ if (cause instanceof java.nio.channels.ClosedChannelException == false) {
+ // cause.printStackTrace();
+ }
+ ctx.close();
+ }
+
+ /**
+ * Get web socket location from HTTP request.
+ *
+ * @param req
+ * HTTP request from which the location will be returned
+ * @return String representation of web socket location.
+ */
+ private static String getWebSocketLocation(HttpRequest req) {
+ return "http://" + req.headers().get(HOST) + req.getUri();
+ }
}
package org.opendaylight.controller.sal.streams.websockets;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
-public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
+/**
+ * {@link WebSocketServerInitializer} is used to setup the
+ * {@link ChannelPipeline} of a {@link Channel}.
+ */
+public class WebSocketServerInitializer extends
+ ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast("codec-http", new HttpServerCodec());
- pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
- pipeline.addLast("handler", new WebSocketServerHandler());
- }
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast("codec-http", new HttpServerCodec());
+ pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
+ pipeline.addLast("handler", new WebSocketServerHandler());
+ }
}
*/
package org.opendaylight.controller.sal.restconf.broker.impl;
-import com.google.common.base.Optional;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
+
public class DataBrokerServiceImpl implements DataBrokerService {
private static final Logger logger = LoggerFactory.getLogger(DataBrokerServiceImpl.class.toString());
- private RestconfClientContext restconfClientContext;
- private SalRemoteService salRemoteService;
+ private final RestconfClientContext restconfClientContext;
+ private final SalRemoteService salRemoteService;
public DataBrokerServiceImpl(RestconfClientContext restconfClientContext) {
this.restconfClientContext = restconfClientContext;
}
private class SalRemoteDataListenerRegistration implements ListenerRegistration<DataChangeListener> {
- private DataChangeListener dataChangeListener;
+ private final DataChangeListener dataChangeListener;
public SalRemoteDataListenerRegistration(DataChangeListener dataChangeListener){
this.dataChangeListener = dataChangeListener;
}
return this.dataChangeListener;
}
@Override
- public void close() throws Exception {
+ public void close() {
//noop
}
}
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<artifactId>config-netconf-connector</artifactId>
<name>${project.artifactId}</name>
package org.opendaylight.controller.netconf.confignetconfconnector.operations;
-import java.util.HashMap;
-import java.util.Map;
-
+import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.api.jmx.CommitStatus;
import org.opendaylight.controller.config.util.ConfigRegistryClient;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
+import java.util.HashMap;
+import java.util.Map;
+
public class Commit extends AbstractConfigNetconfOperation {
private static final Logger logger = LoggerFactory.getLogger(Commit.class);
try {
status = this.transactionProvider.commitTransaction();
} catch (final IllegalStateException e) {
+ // FIXME: when can IllegalStateException occur?
logger.warn("Commit failed: ", e);
final Map<String, String> errorInfo = new HashMap<>();
errorInfo.put(ErrorTag.operation_failed.name(),
"Operation failed. Use 'get-config' or 'edit-config' before triggering 'commit' operation");
throw new NetconfDocumentedException(e.getMessage(), e, ErrorType.application, ErrorTag.operation_failed,
ErrorSeverity.error, errorInfo);
- } catch (final NetconfDocumentedException e) {
- throw new NetconfDocumentedException(
- "Unable to retrieve config snapshot after commit for persister, details: " + e.getMessage(),
- ErrorType.application, ErrorTag.operation_failed, ErrorSeverity.error, e.getErrorInfo());
+ } catch (ValidationException e) {
+ throw NetconfDocumentedException.wrap(e);
+ } catch (ConflictingVersionException e) {
+ throw NetconfDocumentedException.wrap(e);
+
}
logger.trace("Datastore {} committed successfully: {}", Datastore.candidate, status);
package org.opendaylight.controller.netconf.confignetconfconnector.operations;
-import java.util.HashMap;
-import java.util.Map;
-
import org.opendaylight.controller.config.util.ConfigRegistryClient;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorSeverity;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
+import java.util.HashMap;
+import java.util.Map;
+
public class DiscardChanges extends AbstractConfigNetconfOperation {
public static final String DISCARD = "discard-changes";
try {
fromXml(xml);
} catch (final IllegalArgumentException e) {
+ //FIXME where can IllegalStateException be thrown?
logger.warn("Rpc error: {}", ErrorTag.bad_attribute, e);
final Map<String, String> errorInfo = new HashMap<>();
errorInfo.put(ErrorTag.bad_attribute.name(), e.getMessage());
try {
this.transactionProvider.abortTransaction();
} catch (final IllegalStateException e) {
+ //FIXME where can IllegalStateException be thrown?
logger.warn("Abort failed: ", e);
final Map<String, String> errorInfo = new HashMap<>();
errorInfo
try {
checkXml(xml);
} catch (IllegalStateException e) {
+ //FIXME where can IllegalStateException be thrown? I see precondition that guards for programming bugs..
logger.warn("Rpc error: {}", ErrorTag.missing_attribute, e);
final Map<String, String> errorInfo = new HashMap<>();
errorInfo.put(ErrorTag.missing_attribute.name(), "Missing value of datastore attribute");
throw new NetconfDocumentedException(e.getMessage(), e, ErrorType.rpc, ErrorTag.missing_attribute,
ErrorSeverity.error, errorInfo);
} catch (final IllegalArgumentException e) {
+ // FIXME use checked exception if it has domain meaning
logger.warn("Rpc error: {}", ErrorTag.bad_attribute, e);
final Map<String, String> errorInfo = new HashMap<>();
errorInfo.put(ErrorTag.bad_attribute.name(), e.getMessage());
transactionProvider.validateTransaction();
} catch (ValidationException e) {
logger.warn("Validation failed", e);
- final Map<String, String> errorInfo = new HashMap<>();
- errorInfo.put(ErrorTag.operation_failed.name(), "Validation failed");
- throw new NetconfDocumentedException(e.getMessage(), e, ErrorType.application, ErrorTag.operation_failed,
- ErrorSeverity.error, errorInfo);
+ throw NetconfDocumentedException.wrap(e);
} catch (IllegalStateException e) {
logger.warn("Validation failed", e);
final Map<String, String> errorInfo = new HashMap<>();
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
-import org.opendaylight.controller.config.api.JmxAttributeValidationException;
import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.util.ConfigRegistryClient;
import org.opendaylight.controller.config.util.ConfigTransactionClient;
EditConfigXmlParser.EditConfigExecution editConfigExecution) throws NetconfDocumentedException {
try {
set(configRegistryClient, editConfigExecution);
- } catch (IllegalStateException | JmxAttributeValidationException | ValidationException e) {
+
+ } catch (IllegalStateException e) {
+ //FIXME: when can IllegalStateException be thrown?
+ // JmxAttributeValidationException is wrapped in DynamicWritableWrapper with ValidationException
+ // ValidationException is not thrown until validate or commit is issued
logger.warn("Set phase for {} failed", EditConfigXmlParser.EDIT_CONFIG, e);
final Map<String, String> errorInfo = new HashMap<>();
errorInfo.put(ErrorTag.operation_failed.name(), e.getMessage());
EditConfigExecution editConfigExecution) throws NetconfDocumentedException {
try {
test(configRegistryClient, editConfigExecution, editConfigExecution.getDefaultStrategy());
- } catch (IllegalStateException | JmxAttributeValidationException | ValidationException e) {
+ } catch (IllegalStateException | ValidationException e) {
+ //FIXME: when can IllegalStateException be thrown?
logger.warn("Test phase for {} failed", EditConfigXmlParser.EDIT_CONFIG, e);
final Map<String, String> errorInfo = new HashMap<>();
errorInfo.put(ErrorTag.operation_failed.name(), e.getMessage());
}
private void test(ConfigRegistryClient configRegistryClient, EditConfigExecution execution,
- EditStrategyType editStrategyType) {
+ EditStrategyType editStrategyType) throws ValidationException {
ObjectName taON = transactionProvider.getTestTransaction();
try {
return identityNameToSchemaNode.containsKey(idName);
}
+ // FIXME method never used
public IdentitySchemaNode getIdentitySchemaNode(String idName) {
Preconditions.checkState(identityNameToSchemaNode.containsKey(idName), "No identity under name %s", idName);
return identityNameToSchemaNode.get(idName);
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.config.api.ConflictingVersionException;
import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.api.jmx.CommitStatus;
import org.opendaylight.controller.config.util.ConfigRegistryClient;
import org.opendaylight.controller.config.util.ConfigTransactionClient;
-import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Commit and notification send must be atomic
*/
- public synchronized CommitStatus commitTransaction() throws NetconfDocumentedException {
+ public synchronized CommitStatus commitTransaction() throws ValidationException, ConflictingVersionException {
final Optional<ObjectName> maybeTaON = getTransaction();
Preconditions.checkState(maybeTaON.isPresent(), "No transaction found for session " + netconfSessionIdForReporting);
ObjectName taON = maybeTaON.get();
// no clean up: user can reconfigure and recover this transaction
logger.warn("Transaction {} failed on {}", taON, validationException.toString());
throw validationException;
- } catch (Exception e) {
+ } catch (ConflictingVersionException e) {
logger.error("Exception while commit of {}, aborting transaction", taON, e);
// clean up
abortTransaction();
transactionClient.validateConfig();
}
- public void validateTestTransaction(ObjectName taON) {
+ public void validateTestTransaction(ObjectName taON) throws ValidationException {
ConfigTransactionClient transactionClient = configRegistryClient.getConfigTransactionClient(taON);
transactionClient.validateConfig();
}
package org.opendaylight.controller.netconf.confignetconfconnector;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigInteger;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.InstanceNotFoundException;
-import javax.management.ObjectName;
-import javax.xml.parsers.ParserConfigurationException;
-
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.matchers.JUnitMatchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.api.annotations.AbstractServiceInterface;
import org.opendaylight.controller.config.api.annotations.ServiceInterfaceAnnotation;
import org.opendaylight.controller.config.manager.impl.AbstractConfigTest;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
public class NetconfMappingTest extends AbstractConfigTest {
transactionProvider = new TransactionProvider(this.configRegistryClient, NETCONF_SESSION_ID);
}
- private ObjectName createModule(final String instanceName) throws InstanceAlreadyExistsException, InstanceNotFoundException, URISyntaxException {
+ private ObjectName createModule(final String instanceName) throws InstanceAlreadyExistsException, InstanceNotFoundException, URISyntaxException, ValidationException, ConflictingVersionException {
final ConfigTransactionJMXClient transaction = this.configRegistryClient.createTransaction();
final ObjectName on = transaction.createModule(this.factory.getImplementationName(), instanceName);
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.config.api.ServiceReferenceReadableRegistry;
+import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.util.ConfigRegistryClient;
import org.opendaylight.controller.config.util.ConfigTransactionClient;
import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
}
@Test
- public void test() throws NetconfDocumentedException {
+ public void test() throws NetconfDocumentedException, ValidationException {
EditConfig edit = new EditConfig(yangStoreSnapshot, provider, configRegistry,
ValidateTest.NETCONF_SESSION_ID_FOR_REPORTING);
EditConfigStrategy editStrat = mock(EditConfigStrategy.class);
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<artifactId>config-persister-impl</artifactId>
<name>${project.artifactId}</name>
package org.opendaylight.controller.netconf.persist.impl;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import javax.annotation.concurrent.Immutable;
-
+import com.google.common.base.Preconditions;
+import io.netty.channel.EventLoopGroup;
import org.opendaylight.controller.config.api.ConflictingVersionException;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
-import com.google.common.base.Preconditions;
-import io.netty.channel.EventLoopGroup;
+import javax.annotation.concurrent.Immutable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
@Immutable
public class ConfigPusher {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
+ private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000;
private static final int NETCONF_SEND_ATTEMPTS = 20;
}
public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup,
- long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) {
+ long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) {
this.address = address;
this.nettyThreadGroup = nettyThreadGroup;
this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
EditAndCommitResponse editAndCommitResponse = pushLastConfig(configSnapshotHolder, netconfClient);
return new EditAndCommitResponseWithRetries(editAndCommitResponse, retryAttempt);
} catch (ConflictingVersionException e) {
+ logger.debug("Conflicting version detected, will retry after timeout");
lastException = e;
Thread.sleep(1000);
} catch (RuntimeException e) {
logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession());
return netconfClient;
}
- logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, latestCapabilities);
+ Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
+ logger.debug("Netconf server did not provide required capabilities. Attempt {}. " +
+ "Expected but not found: {}, all expected {}, current {}",
+ attempt, allNotFound, expectedCaps, latestCapabilities);
Util.closeClientAndDispatcher(netconfClient);
Thread.sleep(delayMillis);
}
logger.error("Could not connect to the server in {} ms", maxWaitForCapabilitiesMillis);
throw new RuntimeException("Could not connect to netconf server");
}
- Set<String> allNotFound = new HashSet<>(expectedCaps);
- allNotFound.removeAll(latestCapabilities);
+ Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}",
allNotFound, expectedCaps, latestCapabilities);
throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound);
}
+ private static Set<String> computeNotFoundCapabilities(Set<String> expectedCaps, Set<String> latestCapabilities) {
+ Set<String> allNotFound = new HashSet<>(expectedCaps);
+ allNotFound.removeAll(latestCapabilities);
+ return allNotFound;
+ }
+
/**
* Sends two RPCs to the netconf server: edit-config and commit.
}
- private static NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient) throws IOException {
+ private static NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient)
+ throws ConflictingVersionException, IOException {
try {
NetconfMessage netconfMessage = netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
NetconfUtil.checkIsMessageOk(netconfMessage);
return netconfMessage;
- } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) {
+ }catch(ConflictingVersionException e) {
+ logger.trace("conflicting version detected: {}", e.toString());
+ throw e;
+ } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions
logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e);
throw new IOException("Failed to execute netconf transaction", e);
}
}
+
// load editConfig.xml template, populate /rpc/edit-config/config with parameter
private static NetconfMessage createEditConfigMessage(Element dataElement) {
String editConfigResourcePath = "/netconfOp/editConfig.xml";
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ietf-netconf-monitoring-extension</artifactId>
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ietf-netconf-monitoring</artifactId>
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>netconf-api</artifactId>
</Private-Package>
<Import-Package>
javax.management,
+ org.opendaylight.controller.config.api,
org.opendaylight.controller.config.api.jmx,
org.opendaylight.protocol.framework,
io.netty.channel,
package org.opendaylight.controller.netconf.api;
+import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.api.ValidationException;
+
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
/**
private static final long serialVersionUID = 1L;
+
+
public enum ErrorType {
transport, rpc, protocol, application;
this.errorInfo = errorInfo;
}
+ public static NetconfDocumentedException wrap(ValidationException e) throws NetconfDocumentedException {
+ final Map<String, String> errorInfo = new HashMap<>();
+ errorInfo.put(ErrorTag.operation_failed.name(), "Validation failed");
+ throw new NetconfDocumentedException(e.getMessage(), e, ErrorType.application, ErrorTag.operation_failed,
+ ErrorSeverity.error, errorInfo);
+ }
+
+ public static NetconfDocumentedException wrap(ConflictingVersionException e) throws NetconfDocumentedException {
+ final Map<String, String> errorInfo = new HashMap<>();
+ errorInfo.put(ErrorTag.operation_failed.name(), "Optimistic lock failed");
+ throw new NetconfDocumentedException(e.getMessage(), e, ErrorType.application, ErrorTag.operation_failed,
+ ErrorSeverity.error, errorInfo);
+ }
+
public ErrorType getErrorType() {
return this.errorType;
}
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<artifactId>netconf-client</artifactId>
<name>${project.artifactId}</name>
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>netconf-impl</artifactId>
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<artifactId>netconf-it</artifactId>
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>netconf-mapping-api</artifactId>
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>netconf-monitoring</artifactId>
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>netconf-subsystem</artifactId>
<groupId>org.opendaylight.controller</groupId>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
</parent>
<artifactId>netconf-util</artifactId>
<name>${project.artifactId}</name>
}
}, connectionTimeoutMillis, TimeUnit.MILLISECONDS);
- sendMessage(helloMessage);
+ // FIXME, make sessionPreferences return HelloMessage, move NetconfHelloMessage to API
+ sendMessage((NetconfHelloMessage)helloMessage);
changeState(State.OPEN_WAIT);
}
}
}
- private void sendMessage(NetconfMessage message) {
- this.channel.writeAndFlush(message);
- }
-
@Override
protected void handleMessage(NetconfHelloMessage netconfMessage) {
Preconditions.checkNotNull(netconfMessage != null, "netconfMessage");
package org.opendaylight.controller.netconf.util.handler;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
public class NetconfChunkAggregator extends ByteToMessageDecoder {
private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
private State state = State.HEADER_ONE;
private long chunkSize;
- private ByteBuf chunk;
+ private CompositeByteBuf chunk;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
}
state = State.HEADER_TWO;
+
+ initChunk();
break;
}
case HEADER_TWO:
case HEADER_LENGTH_FIRST:
{
final byte b = in.readByte();
- if (b < '1' || b > '9') {
- logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9');
- throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
- }
-
- chunkSize = b - '0';
+ chunkSize = processHeaderLengthFirst(b);
state = State.HEADER_LENGTH_OTHER;
break;
}
return;
}
- chunk = in.readBytes((int)chunkSize);
+ aggregateChunks(in.readBytes((int) chunkSize));
state = State.FOOTER_ONE;
break;
case FOOTER_ONE:
}
state = State.FOOTER_TWO;
+ chunkSize = 0;
break;
}
case FOOTER_TWO:
{
final byte b = in.readByte();
+
if (b != '#') {
logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
case FOOTER_THREE:
{
final byte b = in.readByte();
- if (b != '#') {
- logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
+
+ // In this state, either header-of-new-chunk or message-end is expected
+ // Depends on the next character
+
+ if (isHeaderLengthFirst(b)) {
+ // Extract header length#1 from new chunk
+ chunkSize = processHeaderLengthFirst(b);
+ // Proceed with next chunk processing
+ state = State.HEADER_LENGTH_OTHER;
+ } else if (b == '#') {
+ state = State.FOOTER_FOUR;
+ } else {
+ logger.debug("Got byte {} while waiting for {} or {}-{}", b, (byte) '#', (byte) '1', (byte) '9');
throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
}
- state = State.FOOTER_FOUR;
break;
}
case FOOTER_FOUR:
state = State.HEADER_ONE;
out.add(chunk);
- chunkSize = 0;
chunk = null;
break;
}
in.discardReadBytes();
}
+
+ private void initChunk() {
+ chunk = Unpooled.compositeBuffer();
+ }
+
+ private void aggregateChunks(ByteBuf newChunk) {
+ chunk.addComponent(chunk.numComponents(), newChunk);
+
+ // Update writer index, addComponent does not update it
+ chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
+ }
+
+ private static int processHeaderLengthFirst(byte b) {
+ if (isHeaderLengthFirst(b) == false) {
+ logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9');
+ throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
+ }
+
+ return b - '0';
+ }
+
+ private static boolean isHeaderLengthFirst(byte b) {
+ return b >= '1' && b <= '9';
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.util.handler;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import junit.framework.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+public class NetconfChunkAggregatorTest {
+
+ private static final String CHUNKED_MESSAGE = "\n#4\n" +
+ "<rpc" +
+ "\n#18\n" +
+ " message-id=\"102\"\n" +
+ "\n#79\n" +
+ " xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ " <close-session/>\n" +
+ "</rpc>" +
+ "\n##\n";
+
+ public static final String EXPECTED_MESSAGE = "<rpc message-id=\"102\"\n" +
+ " xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ " <close-session/>\n" +
+ "</rpc>";
+
+ private static final String CHUNKED_MESSAGE_ONE = "\n#101\n" + EXPECTED_MESSAGE + "\n##\n";
+
+ private static NetconfChunkAggregator agr;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ agr = new NetconfChunkAggregator();
+ }
+
+ @Test
+ public void testMultipleChunks() throws Exception {
+ List<Object> output = Lists.newArrayList();
+ ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE.getBytes(Charsets.UTF_8));
+ agr.decode(null, input, output);
+
+ Assert.assertEquals(1, output.size());
+ ByteBuf chunk = (ByteBuf) output.get(0);
+
+ Assert.assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
+ }
+
+ @Test
+ public void testOneChunks() throws Exception {
+ List<Object> output = Lists.newArrayList();
+ ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE_ONE.getBytes(Charsets.UTF_8));
+ agr.decode(null, input, output);
+
+ Assert.assertEquals(1, output.size());
+ ByteBuf chunk = (ByteBuf) output.get(0);
+
+ Assert.assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
+ }
+
+
+}
<relativePath>../commons/opendaylight</relativePath>
</parent>
- <version>0.2.4-SNAPSHOT</version>
+ <version>0.2.5-SNAPSHOT</version>
<artifactId>netconf-subsystem</artifactId>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<maven.bundle.version>2.4.0</maven.bundle.version>
<slf4j.version>1.7.2</slf4j.version>
<netconf.netty.version>4.0.10.Final</netconf.netty.version>
- <netconf.version>0.2.4-SNAPSHOT</netconf.version>
- <config.version>0.2.4-SNAPSHOT</config.version>
<salGeneratorPath>${project.build.directory}/generated-sources/sal</salGeneratorPath>
</properties>
}
public byte[] getSalt() {
- return salt.clone();
+ return (salt == null) ? null : salt.clone();
}
@Override