--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>\r
+<!--\r
+ Necessary TODO: Put your copyright here.\r
+\r
+ This program and the accompanying materials are made available under the\r
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ and is available at http://www.eclipse.org/legal/epl-v10.html\r
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
+ <modelVersion>4.0.0</modelVersion>\r
+ <parent>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>commons.opendaylight</artifactId>\r
+ <version>1.4.2-SNAPSHOT</version>\r
+ <relativePath>../../opendaylight/commons/opendaylight</relativePath>\r
+ </parent>\r
+ <artifactId>features-adsal-compatibility</artifactId>\r
+ <packaging>jar</packaging>\r
+ <properties>\r
+ <features.file>features.xml</features.file>\r
+ <feature.test.version>0.6.2-SNAPSHOT</feature.test.version>\r
+ </properties>\r
+ <dependencies>\r
+ <!--\r
+ Necessary TODO: Put dependencies on any feature repos\r
+ you use in your features.xml file.\r
+\r
+ Note: they will need to be <type>xml</xml>\r
+ and <classifier>features</classifier>.\r
+ One other thing to watch for is to make sure they are\r
+ <scope>compile</compile>, which they should be by default,\r
+ but be cautious lest they be at a different scope in a parent pom.\r
+\r
+ Examples:\r
+ <dependency>\r
+ <groupId>org.opendaylight.yangtools</groupId>\r
+ <artifactId>features-yangtools</artifactId>\r
+ <version>0.6.2-SNAPSHOT</version>\r
+ <classifier>features</classifier>\r
+ <type>xml</type>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>features-mdsal</artifactId>\r
+ <version>1.1-SNAPSHOT</version>\r
+ <classifier>features</classifier>\r
+ <type>xml</type>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.openflowplugin</groupId>\r
+ <artifactId>features-openflowplugin</artifactId>\r
+ <version>0.0.3-SNAPSHOT</version>\r
+ <classifier>features</classifier>\r
+ <type>xml</type>\r
+ </dependency>\r
+ -->\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>features-mdsal</artifactId>\r
+ <version>${mdsal.version}</version>\r
+ <classifier>features</classifier>\r
+ <type>xml</type>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>features-flow</artifactId>\r
+ <version>${mdsal.version}</version>\r
+ <classifier>features</classifier>\r
+ <type>xml</type>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>features-adsal</artifactId>\r
+ <version>${sal.version}</version>\r
+ <classifier>features</classifier>\r
+ <type>xml</type>\r
+ </dependency>\r
+\r
+ <!--\r
+ Necessary TODO: Put dependencies for bundles directly referenced\r
+ in your features.xml file. For every <bundle> reference in your\r
+ features.xml file, you need a corresponding dependency here.\r
+\r
+ Examples:\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>controller-provider</artifactId>\r
+ <version>${project.version}</version>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>controller-model</artifactId>\r
+ <version>${project.version}</version>\r
+ </dependency>\r
+ -->\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>sal-compatibility</artifactId>\r
+ <version>${mdsal.version}</version>\r
+ </dependency>\r
+\r
+ <!--\r
+ Necessary TODO: Put dependencies for configfiles directly referenced\r
+ in your features.xml file. For every <configfile> reference in your\r
+ features.xml file, you need a corresponding dependency here.\r
+\r
+ Example (presuming here version is coming from the parent pom):\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>controller-config</artifactId>\r
+ <version>${project.version}</version>\r
+ <type>xml</type>\r
+ <classifier>config</classifier>\r
+ </dependency>\r
+ -->\r
+\r
+ <!--\r
+ Optional TODO: Remove TODO comments.\r
+ -->\r
+ <!-- test to validate features.xml -->\r
+ <dependency>\r
+ <groupId>org.opendaylight.yangtools</groupId>\r
+ <artifactId>features-test</artifactId>\r
+ <version>${feature.test.version}</version>\r
+ <scope>test</scope>\r
+ </dependency>\r
+ <!-- dependency for opendaylight-karaf-empty for use by testing -->\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>opendaylight-karaf-empty</artifactId>\r
+ <version>${commons.opendaylight.version}</version>\r
+ <type>zip</type>\r
+ </dependency>\r
+ <!-- Uncomment this if you get an error : java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)Lorg/slf4j/helpers/FormattingTuple;\r
+ <dependency>\r
+ <groupId>org.slf4j</groupId>\r
+ <artifactId>slf4j-simple</artifactId>\r
+ <version>1.7.2</version>\r
+ </dependency>\r
+ -->\r
+\r
+ </dependencies>\r
+ <build>\r
+ <resources>\r
+ <resource>\r
+ <directory>src/main/resources</directory>\r
+ <filtering>true</filtering>\r
+ </resource>\r
+ </resources>\r
+ <plugins>\r
+ <plugin>\r
+ <groupId>org.apache.maven.plugins</groupId>\r
+ <artifactId>maven-resources-plugin</artifactId>\r
+ <executions>\r
+ <execution>\r
+ <id>filter</id>\r
+ <phase>generate-resources</phase>\r
+ <goals>\r
+ <goal>resources</goal>\r
+ </goals>\r
+ </execution>\r
+ </executions>\r
+ </plugin>\r
+ <plugin>\r
+ <groupId>org.codehaus.mojo</groupId>\r
+ <artifactId>build-helper-maven-plugin</artifactId>\r
+ <executions>\r
+ <execution>\r
+ <id>attach-artifacts</id>\r
+ <phase>package</phase>\r
+ <goals>\r
+ <goal>attach-artifact</goal>\r
+ </goals>\r
+ <configuration>\r
+ <artifacts>\r
+ <artifact>\r
+ <file>${project.build.directory}/classes/${features.file}</file>\r
+ <type>xml</type>\r
+ <classifier>features</classifier>\r
+ </artifact>\r
+ </artifacts>\r
+ </configuration>\r
+ </execution>\r
+ </executions>\r
+ </plugin>\r
+ <plugin>\r
+ <groupId>org.apache.maven.plugins</groupId>\r
+ <artifactId>maven-surefire-plugin</artifactId>\r
+ <version>${surefire.version}</version>\r
+ <configuration>\r
+ <systemPropertyVariables>\r
+ <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>\r
+ <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>\r
+ <karaf.distro.version>${commons.opendaylight.version}</karaf.distro.version>\r
+ </systemPropertyVariables>\r
+ <dependenciesToScan>\r
+ <dependency>org.opendaylight.yangtools:features-test</dependency>\r
+ </dependenciesToScan>\r
+ </configuration>\r
+ </plugin>\r
+ </plugins>\r
+ </build>\r
+ <scm>\r
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>\r
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>\r
+ <tag>HEAD</tag>\r
+ <url>https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary</url>\r
+ </scm>\r
+</project>\r
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+ Necessary TODO: Put your copyright statement here
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<features name="odl-adsal-compatibility-${sal.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.2.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
+ <repository>mvn:org.opendaylight.controller/features-mdsal/${mdsal.version}/xml/features</repository>
+ <repository>mvn:org.opendaylight.controller/features-adsal/${sal.version}/xml/features</repository>
+ <repository>mvn:org.opendaylight.controller/features-flow/${mdsal.version}/xml/features</repository>
+ <feature name='odl-adsal-compatibility-all' version='${project.version}' description='OpenDaylight :: controller :: All'>
+ <feature version='${sal.version}'>odl-adsal-compatibility</feature>
+ </feature>
+ <feature name='odl-adsal-compatibility' description="OpenDaylight :: AD-SAL :: Compatibility" version="${sal.version}">
+ <feature version="${mdsal.version}">odl-mdsal-broker</feature>
+ <feature version="${mdsal.version}">odl-flow-model</feature>
+ <feature version="${sal.version}">odl-adsal-all</feature>
+ <bundle>mvn:org.opendaylight.controller/sal-compatibility/${mdsal.version}</bundle>
+ </feature>
+</features>
<version>1.4.2-SNAPSHOT</version>
<type>zip</type>
</dependency>
+ <!-- Feature Dependencies -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>features-adsal</artifactId>
+ <version>${sal.version}</version>
+ <classifier>features</classifier>
+ <type>xml</type>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>features-base</artifactId>
+ <classifier>features</classifier>
+ <version>${commons.opendaylight.version}</version>
+ <type>xml</type>
+ </dependency>
<!-- Bundle Dependencies -->
<dependency>
<groupId>org.opendaylight.controller</groupId>
<module>flow</module>
<module>netconf</module>
<module>protocol-framework</module>
+ <module>adsal-compatibility</module>
</modules>
-</project>
+</project>
\ No newline at end of file
pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
@Override
public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
- initializer.initializeChannel(channel, promise);
-
// add closed channel handler
+ // This handler has to be added before initializer.initializeChannel is called
+ // Initializer might add some handlers using addFirst e.g. AsyncSshHandler and in that case
+ // closed channel handler is before the handler that invokes channel inactive event
channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this));
+
+ initializer.initializeChannel(channel, promise);
}
});
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ // Pass info about disconnect further and then reconnect
+ super.channelInactive(ctx);
+
if (promise.isCancelled()) {
return;
}
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>mockito-configuration</artifactId>
+ </dependency>
</dependencies>
<build>
JmxAttribute that = (JmxAttribute) o;
- if (attributeName != null ? !attributeName.equals(that.attributeName)
- : that.attributeName != null) {
+ if (!attributeName.equals(that.attributeName)) {
return false;
}
@Override
public int hashCode() {
- return attributeName != null ? attributeName.hashCode() : 0;
+ return attributeName.hashCode();
}
@Override
--- /dev/null
+package org.opendaylight.controller.config.api;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.config.api.annotations.AbstractServiceInterface;
+import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+
+import javax.management.*;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class IdentityAttributeRefTest {
+
+ IdentityAttributeRef attr = new IdentityAttributeRef("attr");
+
+ @Test
+ public void testConstructor() throws Exception {
+ String param = new String("attr");
+ Assert.assertEquals(attr.getqNameOfIdentity(), param);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testConstructor2() throws Exception {
+ IdentityAttributeRef attr = new IdentityAttributeRef(null);
+ }
+
+ @Test
+ public void testHashCode() throws Exception {
+ Assert.assertEquals(attr.hashCode(), new String("attr").hashCode());
+ }
+
+ @Test
+ public void testEqual() throws Exception {
+ Assert.assertEquals(attr, attr);
+ }
+
+ @Test
+ public void testEqual2() throws Exception {
+ Assert.assertEquals(attr, new IdentityAttributeRef("attr"));
+ }
+
+ @Test
+ public void testNotEqual() throws Exception {
+ Assert.assertNotEquals(attr, new IdentityAttributeRef("different"));
+ }
+
+ @Test
+ public void testResolveIdentity() throws Exception {
+ DependencyResolver res = mock(DependencyResolver.class);
+ IdentityAttributeRef a = new IdentityAttributeRef("abcd");
+ doReturn(SubIdentity.class).when(res).resolveIdentity(a, Identity.class);
+ a.resolveIdentity(res, Identity.class);
+ verify(res).resolveIdentity(a, Identity.class);
+ }
+
+ @Test
+ public void testValidateIdentity() throws Exception {
+ DependencyResolver res = mock(DependencyResolver.class);
+ JmxAttribute jmxAttr = new JmxAttribute("abc");
+ doNothing().when(res).validateIdentity(attr, Identity.class, jmxAttr);
+ attr.validateIdentity(res, Identity.class, jmxAttr);
+ verify(res).validateIdentity(attr, Identity.class, jmxAttr);
+ }
+
+ static class Identity extends BaseIdentity {}
+
+ static class SubIdentity extends Identity {}
+}
--- /dev/null
+package org.opendaylight.controller.config.api;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class JmxAttributeTest {
+
+ @Test
+ public void testJmxAttribute() throws Exception {
+ JmxAttribute attr = new JmxAttribute("test");
+ assertEquals("test", attr.getAttributeName());
+ }
+
+ @Test
+ public void testToString() throws Exception {
+ JmxAttribute attr = new JmxAttribute("test");
+ assertEquals(attr.toString(), new JmxAttribute("test").toString());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJmxAttributeInvalid() throws Exception {
+ JmxAttribute attr = new JmxAttribute(null);
+ }
+
+ @Test
+ public void testJmxAttributeEqual() throws Exception {
+ JmxAttribute a1 = new JmxAttribute("test_string");
+ JmxAttribute a2 = new JmxAttribute("test_string");
+ assertEquals(a1, a2);
+ }
+
+ @Test
+ public void testJmxAttributeNotEqual() throws Exception {
+ JmxAttribute a1 = new JmxAttribute("test_string");
+ JmxAttribute a2 = new JmxAttribute("different");
+ assertNotEquals(a1, a2);
+ }
+
+ @Test
+ public void testJmxAttributeEqual2() throws Exception {
+ JmxAttribute a1 = new JmxAttribute("test_string");
+ assertNotNull(a1);
+ }
+
+ @Test
+ public void testJmxAttributeHashCode() throws Exception {
+ JmxAttribute a1 = new JmxAttribute("test_string");
+ assertEquals(a1.hashCode(), new String("test_string").hashCode());
+ }
+}
package org.opendaylight.controller.config.api;
-import static org.junit.Assert.assertEquals;
+import java.nio.file.AccessDeniedException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import com.google.common.collect.Lists;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import javax.management.Query;
+
+import static org.junit.Assert.*;
+
public class JmxAttributeValidationExceptionTest {
private JmxAttribute jmxAttribute = new JmxAttribute("attr1");
- private JmxAttribute jmxAttribute2 = new JmxAttribute("attr2");
- @Before
- public void setUp() throws Exception {
+ @Test
+ public void testJmxAttributeValidationExceptionElement() throws Exception {
+ JmxAttribute attributeName = new JmxAttribute("attr_name");
+ JmxAttributeValidationException e = new JmxAttributeValidationException(attributeName);
+ assertThat(e.getAttributeNames(), CoreMatchers.hasItem(attributeName));
+ }
+ @Test
+ public void testJmxAttributeValidationExceptionList() throws Exception {
+ List attributeNames = new ArrayList<JmxAttribute>();
+ attributeNames.add(new JmxAttribute("att1"));
+ attributeNames.add(new JmxAttribute("att2"));
+ attributeNames.add(new JmxAttribute("att3"));
+ JmxAttributeValidationException e = new JmxAttributeValidationException(attributeNames);
+ assertEquals(e.getAttributeNames(), attributeNames);
}
@Test
- public void testGetAttributeNames() throws Exception {
+ public void testJmxAttributeValidationExceptionList2() throws Exception {
+ List attributeNames = new ArrayList<JmxAttribute>();
+ attributeNames.add(new JmxAttribute("att1"));
+ attributeNames.add(new JmxAttribute("att2"));
+ attributeNames.add(new JmxAttribute("att3"));
+ JmxAttributeValidationException e = new JmxAttributeValidationException("exception str",
+ new AccessDeniedException(""), attributeNames);
+ assertEquals(e.getAttributeNames(), attributeNames);
+ }
+ @Test
+ public void testJmxAttributeValidationExceptionJmxElement() throws Exception {
+ JmxAttribute attributeName = new JmxAttribute("attr_name");
+ JmxAttributeValidationException e = new JmxAttributeValidationException("exception str",
+ new AccessDeniedException(""), attributeName);
+ assertEquals(e.getAttributeNames(), Arrays.asList(attributeName));
}
@Test
public void testCheckNotNull() throws Exception {
try {
- JmxAttributeValidationException.checkNotNull(false, "message", jmxAttribute);
+ JmxAttributeValidationException.checkNotNull(false, jmxAttribute);
} catch (JmxAttributeValidationException e) {
assertJmxEx(e, jmxAttribute.getAttributeName() + " " + "message", jmxAttribute);
}
}
@Test
- public void testWrap() throws Exception {
+ public void testCheckCondition() throws Exception {
+ JmxAttributeValidationException.checkCondition(true, "message", jmxAttribute);
+ }
+ @Test(expected = JmxAttributeValidationException.class)
+ public void testJmxAttributeValidationException() throws Exception {
+ JmxAttributeValidationException.wrap(new Exception("tmp"), jmxAttribute);
}
- @Test
- public void testCheckCondition() throws Exception {
- try {
- JmxAttributeValidationException.checkCondition(false, "message", jmxAttribute);
- } catch (JmxAttributeValidationException e) {
- assertJmxEx(e, jmxAttribute.getAttributeName() + " " + "message", jmxAttribute);
- }
+ @Test(expected = JmxAttributeValidationException.class)
+ public void testJmxAttributeValidationException2() throws Exception {
+ JmxAttributeValidationException.wrap(new Exception("tmp"), "message", jmxAttribute);
+ }
+
+ @Test(expected = JmxAttributeValidationException.class)
+ public void testCheckConditionFalse() throws Exception {
+ JmxAttributeValidationException.checkCondition(false, "message", jmxAttribute);
}
private void assertJmxEx(JmxAttributeValidationException e, String message, JmxAttribute... attrNames) {
--- /dev/null
+package org.opendaylight.controller.config.api;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+public class ModuleIdentifierTest {
+ String fact = new String("factory");
+ String inst = new String("instance");
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConstructor() throws Exception {
+ ModuleIdentifier m = new ModuleIdentifier(null, "instance");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConstructor2() throws Exception {
+ ModuleIdentifier m = new ModuleIdentifier("name", null);
+ }
+
+ @Test
+ public void testEquals() throws Exception {
+
+ ModuleIdentifier m1 = new ModuleIdentifier(fact, inst);
+ assertEquals(m1, new ModuleIdentifier(fact, inst));
+ }
+
+ @Test
+ public void testEquals2() throws Exception {
+ assertNotEquals(new ModuleIdentifier(fact, inst), null);
+ }
+
+ @Test
+ public void testEquals3() throws Exception {
+ assertNotEquals(new ModuleIdentifier(fact, inst), new ModuleIdentifier(fact, "i"));
+ }
+
+ @Test
+ public void testEquals4() throws Exception {
+ assertNotEquals(new ModuleIdentifier(fact, inst), new ModuleIdentifier("f", inst));
+ }
+
+ @Test
+ public void testEquals5() throws Exception {
+ ModuleIdentifier m1 = new ModuleIdentifier(fact, inst);
+ assertEquals(m1, m1);
+ }
+
+ @Test
+ public void testHashCode() throws Exception {
+ int hash = new ModuleIdentifier(fact, inst).hashCode();
+ assertEquals(hash, new ModuleIdentifier("factory", "instance").hashCode());
+ }
+
+ @Test
+ public void testToString() throws Exception {
+ assertEquals( new ModuleIdentifier("factory", "instance").toString(),
+ new ModuleIdentifier("factory", "instance").toString());
+ }
+}
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.fail;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.matchers.JUnitMatchers.containsString;
import com.google.common.collect.Lists;
import java.util.Map;
+
+import org.junit.Assert;
import org.junit.Test;
public class ValidationExceptionTest {
}
fail("Duplicate exception should have failed");
}
+
+ @Test
+ public void testGetTrace() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp = new ValidationException.ExceptionMessageWithStackTrace();
+ exp.setTrace("trace");
+ Assert.assertEquals(exp.getTrace(), "trace");
+ }
+
+ @Test
+ public void testSetMessage() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp = new ValidationException.ExceptionMessageWithStackTrace();
+ exp.setMessage("message");
+ Assert.assertEquals(exp.getMessage(), "message");
+ }
+
+ @Test
+ public void testHashCode() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp = new ValidationException.ExceptionMessageWithStackTrace();
+ Assert.assertEquals(exp.hashCode(), new ValidationException.ExceptionMessageWithStackTrace().hashCode());
+ }
+
+ @Test
+ public void testExceptionMessageWithStackTraceConstructor() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "string2");
+ Assert.assertEquals(exp, exp);
+ }
+
+ @Test
+ public void testExceptionMessageWithStackTraceConstructor2() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "string2");
+ Assert.assertNotEquals(exp, null);
+ }
+
+ @Test
+ public void testExceptionMessageWithStackTraceConstructor3() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "string2");
+ Assert.assertNotEquals(exp, new Exception());
+ }
+
+ @Test
+ public void testExceptionMessageWithStackTraceConstructor4() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "string2");
+ Assert.assertEquals(exp, new ValidationException.ExceptionMessageWithStackTrace("string1", "string2"));
+ }
+
+ @Test
+ public void testEqual() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "string2");
+ ValidationException.ExceptionMessageWithStackTrace exp2 =
+ new ValidationException.ExceptionMessageWithStackTrace(null, "string2");
+ Assert.assertNotEquals(exp, exp2);
+ }
+
+ @Test
+ public void testEqual2() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "string2");
+ ValidationException.ExceptionMessageWithStackTrace exp2 =
+ new ValidationException.ExceptionMessageWithStackTrace("different", "string2");
+ Assert.assertNotEquals(exp, exp2);
+ }
+
+
+ @Test
+ public void testEqual3() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "string2");
+ ValidationException.ExceptionMessageWithStackTrace exp2 =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", null);
+ Assert.assertNotEquals(exp, exp2);
+ }
+
+ @Test
+ public void testEqual4() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "string2");
+ ValidationException.ExceptionMessageWithStackTrace exp2 =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "different");
+ Assert.assertNotEquals(exp, exp2);
+ }
+
+ @Test
+ public void testEqual5() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp =
+ new ValidationException.ExceptionMessageWithStackTrace(null, "string2");
+ ValidationException.ExceptionMessageWithStackTrace exp2 =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "string2");
+ Assert.assertNotEquals(exp, exp2);
+ }
+
+ @Test
+ public void testEqual6() throws Exception {
+ ValidationException.ExceptionMessageWithStackTrace exp =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", null);
+ ValidationException.ExceptionMessageWithStackTrace exp2 =
+ new ValidationException.ExceptionMessageWithStackTrace("string1", "string2");
+ Assert.assertNotEquals(exp, exp2);
+ }
}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.config.api.jmx;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CommitStatusTest {
+ List newInst = new ArrayList<ObjectName>();
+ List reusedInst = new ArrayList<ObjectName>();
+ List recreatedInst = new ArrayList<ObjectName>();
+
+ @Before
+ public void setUp() throws Exception {
+ newInst.add(new ObjectName("domain: key1 = value1 , key2 = value2"));
+ reusedInst.add(new ObjectName("o2: key = val"));
+ recreatedInst.add(new ObjectName("o3: key = k"));
+ }
+ @Test
+ public void testCommitStatus() throws Exception {
+ CommitStatus status = new CommitStatus(newInst, reusedInst, recreatedInst);
+ Assert.assertEquals(status.getNewInstances(), newInst);
+ Assert.assertEquals(status.getRecreatedInstances(), recreatedInst);
+ Assert.assertEquals(status.getReusedInstances(), reusedInst);
+ }
+
+ @Test
+ public void testEqual() throws Exception {
+ CommitStatus status = new CommitStatus(newInst, reusedInst, recreatedInst);
+ Assert.assertEquals(status, new CommitStatus(newInst, reusedInst, recreatedInst));
+ Assert.assertEquals(status.toString(), new CommitStatus(newInst, reusedInst, recreatedInst).toString());
+ Assert.assertEquals(status, status);
+ }
+
+ @Test
+ public void testHashCode() throws Exception {
+ CommitStatus status = new CommitStatus(newInst, reusedInst, recreatedInst);
+ Assert.assertEquals(status.hashCode(), new CommitStatus(newInst, reusedInst, recreatedInst).hashCode());
+ }
+
+ @Test
+ public void testNotEqual() throws Exception {
+ List newInst2 = new ArrayList<ObjectName>();
+ List reusedInst2 = new ArrayList<ObjectName>();
+ List recreatedInst2 = new ArrayList<ObjectName>();
+
+ newInst2.add(new ObjectName("first: key1 = value1"));
+ reusedInst2.add(new ObjectName("second: key = val"));
+ recreatedInst2.add(new ObjectName("third: key = k"));
+
+ CommitStatus status = new CommitStatus(newInst, reusedInst, recreatedInst);
+ Assert.assertNotEquals(status, null);
+ Assert.assertNotEquals(status, new Object());
+ Assert.assertNotEquals(status, new CommitStatus(newInst2, reusedInst, recreatedInst));
+ Assert.assertNotEquals(status, new CommitStatus(newInst, reusedInst2, recreatedInst));
+ Assert.assertNotEquals(status, new CommitStatus(newInst, reusedInst, recreatedInst2));
+
+ CommitStatus status2 = new CommitStatus(newInst, reusedInst, recreatedInst);
+ }
+}
--- /dev/null
+package org.opendaylight.controller.config.api.jmx;
+
+
+import org.junit.Test;
+import org.opendaylight.controller.config.api.jmx.constants.ConfigRegistryConstants;
+
+public class ConfigRegistryConstantsTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreateON() throws Exception {
+ ConfigRegistryConstants.createON("test.<:", "asd", "asd");
+ }
+}
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
+
+import java.util.HashMap;
import java.util.Map;
import javax.management.ObjectName;
import junit.framework.Assert;
fail(test + " should have failed on " + ex);
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreateON() throws Exception {
+ ObjectNameUtil.createON(">}+!#");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreateON2() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ ObjectNameUtil.createON(">}+!#", map);
+ }
}
}
}
- rpc get-node-ip-address {
- input {
- uses "inv:node-context-ref";
- }
- output {
- uses ip-address-grouping;
- }
- }
-
grouping flow-node-connector {
uses port:flow-capable-port;
import java.util.Iterator;
import java.util.Map.Entry;
+import javax.annotation.Nonnull;
+
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
}
- public org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier toNormalized(
- final InstanceIdentifier<? extends DataObject> binding) {
+ public YangInstanceIdentifier toNormalized(final InstanceIdentifier<? extends DataObject> binding) {
return codecRegistry.toYangInstanceIdentifier(binding);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- public Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
+ public Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
final InstanceIdentifier<? extends DataObject> bindingPath, final DataObject bindingObject) {
return codecRegistry.toNormalizedNode((InstanceIdentifier) bindingPath, bindingObject);
}
- public Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
- final Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject> binding) {
+ public Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
+ final Entry<InstanceIdentifier<? extends DataObject>, DataObject> binding) {
return toNormalizedNode(binding.getKey(),binding.getValue());
}
* augmentation.
*
*/
- public Optional<InstanceIdentifier<? extends DataObject>> toBinding(
- final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier normalized)
+ public Optional<InstanceIdentifier<? extends DataObject>> toBinding(final YangInstanceIdentifier normalized)
throws DeserializationException {
try {
- return Optional.<InstanceIdentifier<? extends DataObject>>of(codecRegistry.fromYangInstanceIdentifier(normalized));
+ return Optional.<InstanceIdentifier<? extends DataObject>>fromNullable(codecRegistry.fromYangInstanceIdentifier(normalized));
} catch (IllegalArgumentException e) {
return Optional.absent();
}
return legacyToNormalized;
}
- @SuppressWarnings("unchecked")
- public Optional<Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject>> toBinding(
- final Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized)
+ public Optional<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> toBinding(
+ final @Nonnull Entry<YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized)
throws DeserializationException {
try {
- @SuppressWarnings("rawtypes")
- Entry binding = codecRegistry.fromNormalizedNode(normalized.getKey(), normalized.getValue());
- return Optional.<Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject>>fromNullable(binding);
+ /*
+ * This cast is required, due to generics behaviour in openjdk / oracle javac
+ *
+ * InstanceIdentifier has definition InstanceIdentifier<T extends DataObject>,
+ * this means '?' is always  <? extends DataObject>. Eclipse compiler
+ * is able to determine this relationship and treats
+ * Entry<InstanceIdentifier<?>,DataObject> and Entry<InstanceIdentifier<? extends DataObject,DataObject>
+ * as assignable. However openjdk / oracle javac treats this two types
+ * as incompatible and issues a compile error.
+ *
+ * It is safe to loose generic information and cast it to other generic signature.
+ *
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final Entry<InstanceIdentifier<? extends DataObject>, DataObject> binding = (Entry) codecRegistry.fromNormalizedNode(normalized.getKey(), normalized.getValue());
+ return Optional.fromNullable(binding);
} catch (IllegalArgumentException e) {
return Optional.absent();
}
* @param path DOM Path
* @return Node with defaults set on.
*/
- public NormalizedNode<?, ?> getDefaultNodeFor(final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier path) {
+ public NormalizedNode<?, ?> getDefaultNodeFor(final YangInstanceIdentifier path) {
Iterator<PathArgument> iterator = path.getPathArguments().iterator();
DataNormalizationOperation<?> currentOp = legacyToNormalized.getRootOperation();
while (iterator.hasNext()) {
}
@Override
- public void close() throws Exception {
+ public void close() {
// NOOP Intentionally
}
}
package org.opendaylight.controller.md.sal.binding.impl.test;
+import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.impl.ForwardedBackwardsCompatibleDataBroker;
import org.opendaylight.controller.md.sal.binding.test.AbstractSchemaAwareTest;
testCustomizer = createDataBrokerTestCustomizer();
domBroker = testCustomizer.createDOMDataBroker();
- dataBroker = testCustomizer.createBackwardsCompatibleDataBroker();
+ dataBroker = createBackwardsCompatibleDataBroker();
testCustomizer.updateSchema(context);
}
+ public ForwardedBackwardsCompatibleDataBroker createBackwardsCompatibleDataBroker() {
+ return new ForwardedBackwardsCompatibleDataBroker(domBroker, testCustomizer.getBindingToNormalized(), testCustomizer.getSchemaService(), MoreExecutors
+ .sameThreadExecutor());
+ }
+
+
+
/**
* The purpose of this test is to exercise the backwards compatible broker
* @see org.opendaylight.controller.md.sal.binding.impl.AbstractReadWriteTransaction#ensureParentsByMerge(org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType, org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, org.opendaylight.yangtools.yang.binding.InstanceIdentifier)
*/
@Test
- public void test() throws InterruptedException, ExecutionException {
+ public void testEnsureParentsByMerge() throws InterruptedException, ExecutionException {
DataModificationTransaction writeTx =
dataBroker.beginTransaction();
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-
import javassist.ClassPool;
-
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
-import org.opendaylight.controller.md.sal.binding.impl.ForwardedBackwardsCompatibleDataBroker;
import org.opendaylight.controller.md.sal.binding.impl.ForwardedBindingDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
return new ForwardedBindingDataBroker(getDOMDataBroker(), bindingToNormalized, schemaService );
}
- public ForwardedBackwardsCompatibleDataBroker createBackwardsCompatibleDataBroker() {
- return new ForwardedBackwardsCompatibleDataBroker(getDOMDataBroker(), bindingToNormalized, getSchemaService(), MoreExecutors.sameThreadExecutor());
+ public BindingToNormalizedNodeCodec getBindingToNormalized() {
+ return bindingToNormalized;
}
- private SchemaService getSchemaService() {
+ public SchemaService getSchemaService() {
return schemaService;
}
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
import java.util.Collections;
import java.util.List;
*/
public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
- private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
private final ActorContext actorContext;
- private final List<ActorPath> cohortPaths;
+ private final List<Future<ActorPath>> cohortPathFutures;
+ private volatile List<ActorPath> cohortPaths;
private final String transactionId;
- public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths,
- String transactionId) {
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+ List<Future<ActorPath>> cohortPathFutures, String transactionId) {
this.actorContext = actorContext;
- this.cohortPaths = cohortPaths;
+ this.cohortPathFutures = cohortPathFutures;
this.transactionId = transactionId;
}
+ private Future<Void> buildCohortPathsList() {
+
+ Future<Iterable<ActorPath>> combinedFutures = Futures.sequence(cohortPathFutures,
+ actorContext.getActorSystem().dispatcher());
+
+ return combinedFutures.transform(new AbstractFunction1<Iterable<ActorPath>, Void>() {
+ @Override
+ public Void apply(Iterable<ActorPath> paths) {
+ cohortPaths = Lists.newArrayList(paths);
+
+ LOG.debug("Tx {} successfully built cohort path list: {}",
+ transactionId, cohortPaths);
+ return null;
+ }
+ }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+ }
+
@Override
public ListenableFuture<Boolean> canCommit() {
- LOG.debug("txn {} canCommit", transactionId);
+ LOG.debug("Tx {} canCommit", transactionId);
+
+ final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+ // The first phase of canCommit is to gather the list of cohort actor paths that will
+ // participate in the commit. buildCohortPathsList combines the cohort path Futures into
+ // one Future which we wait on asynchronously here. The cohort actor paths are
+ // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
+ // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
+
+ buildCohortPathsList().onComplete(new OnComplete<Void>() {
+ @Override
+ public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+ returnFuture.setException(failure);
+ } else {
+ finishCanCommit(returnFuture);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+
+ return returnFuture;
+ }
+
+ private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
+
+ LOG.debug("Tx {} finishCanCommit", transactionId);
+
+ // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
+ // their canCommit processing. If any one fails then we'll fail canCommit.
Future<Iterable<Object>> combinedFuture =
invokeCohorts(new CanCommitTransaction().toSerializable());
- final SettableFuture<Boolean> returnFuture = SettableFuture.create();
-
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
if(failure != null) {
+ LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
returnFuture.setException(failure);
return;
}
}
}
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
returnFuture.set(Boolean.valueOf(result));
}
}, actorContext.getActorSystem().dispatcher());
-
- return returnFuture;
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
for(ActorPath actorPath : cohortPaths) {
- LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
ActorSelection cohort = actorContext.actorSelection(actorPath);
@Override
public ListenableFuture<Void> preCommit() {
- LOG.debug("txn {} preCommit", transactionId);
- return voidOperation(new PreCommitTransaction().toSerializable(),
+ return voidOperation("preCommit", new PreCommitTransaction().toSerializable(),
PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
}
@Override
public ListenableFuture<Void> abort() {
- LOG.debug("txn {} abort", transactionId);
-
// Note - we pass false for propagateException. In the front-end data broker, this method
// is called when one of the 3 phases fails with an exception. We'd rather have that
// original exception propagated to the client. If our abort fails and we propagate the
// exception then that exception will supersede and suppress the original exception. But
// it's the original exception that is the root cause and of more interest to the client.
- return voidOperation(new AbortTransaction().toSerializable(),
+ return voidOperation("abort", new AbortTransaction().toSerializable(),
AbortTransactionReply.SERIALIZABLE_CLASS, false);
}
@Override
public ListenableFuture<Void> commit() {
- LOG.debug("txn {} commit", transactionId);
- return voidOperation(new CommitTransaction().toSerializable(),
+ return voidOperation("commit", new CommitTransaction().toSerializable(),
CommitTransactionReply.SERIALIZABLE_CLASS, true);
}
- private ListenableFuture<Void> voidOperation(final Object message,
+ private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
final Class<?> expectedResponseClass, final boolean propagateException) {
- Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+ LOG.debug("Tx {} {}", transactionId, operationName);
final SettableFuture<Void> returnFuture = SettableFuture.create();
+ // The cohort actor list should already be built at this point by the canCommit phase but,
+ // if not for some reason, we'll try to build it here.
+
+ if(cohortPaths != null) {
+ finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
+ returnFuture);
+ } else {
+ buildCohortPathsList().onComplete(new OnComplete<Void>() {
+ @Override
+ public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
+ operationName, failure);
+
+ if(propagateException) {
+ returnFuture.setException(failure);
+ } else {
+ returnFuture.set(null);
+ }
+ } else {
+ finishVoidOperation(operationName, message, expectedResponseClass,
+ propagateException, returnFuture);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ }
+
+ return returnFuture;
+ }
+
+ private void finishVoidOperation(final String operationName, final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException,
+ final SettableFuture<Void> returnFuture) {
+
+ LOG.debug("Tx {} finish {}", transactionId, operationName);
+
+ Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
}
if(exceptionToPropagate != null) {
+ LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
+ operationName, exceptionToPropagate);
+
if(propagateException) {
// We don't log the exception here to avoid redundant logging since we're
// propagating to the caller in MD-SAL core who will log it.
returnFuture.set(null);
}
} else {
+ LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
returnFuture.set(null);
}
}
}, actorContext.getActorSystem().dispatcher());
-
- return returnFuture;
}
- public List<ActorPath> getCohortPaths() {
- return Collections.unmodifiableList(this.cohortPaths);
+ @VisibleForTesting
+ List<Future<ActorPath>> getCohortPathFutures() {
+ return Collections.unmodifiableList(cohortPathFutures);
}
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorPath;
-import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.Props;
import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Function1;
import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
READ_WRITE
}
+ static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
+ Throwable, Throwable>() {
+ @Override
+ public Throwable apply(Throwable failure) {
+ return failure;
+ }
+ };
+
private static final AtomicLong counter = new AtomicLong();
private static final Logger
}
+ @VisibleForTesting
+ List<Future<Object>> getRecordedOperationFutures() {
+ List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+ for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+ recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+ }
+
+ return recordedOperationFutures;
+ }
+
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Read operation on write-only transaction is not allowed");
- LOG.debug("txn {} read {}", identifier, path);
+ LOG.debug("Tx {} read {}", identifier, path);
createTransactionIfMissing(actorContext, path);
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Exists operation on write-only transaction is not allowed");
- LOG.debug("txn {} exists {}", identifier, path);
+ LOG.debug("Tx {} exists {}", identifier, path);
createTransactionIfMissing(actorContext, path);
checkModificationState();
- LOG.debug("txn {} write {}", identifier, path);
+ LOG.debug("Tx {} write {}", identifier, path);
createTransactionIfMissing(actorContext, path);
checkModificationState();
- LOG.debug("txn {} merge {}", identifier, path);
+ LOG.debug("Tx {} merge {}", identifier, path);
createTransactionIfMissing(actorContext, path);
checkModificationState();
- LOG.debug("txn {} delete {}", identifier, path);
+ LOG.debug("Tx {} delete {}", identifier, path);
createTransactionIfMissing(actorContext, path);
inReadyState = true;
- List<ActorPath> cohortPaths = new ArrayList<>();
-
- LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
+ LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
remoteTransactionPaths.size());
+ List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- LOG.debug("txn {} Readying transaction for shard {}", identifier,
+ LOG.debug("Tx {} Readying transaction for shard {}", identifier,
transactionContext.getShardName());
- Object result = transactionContext.readyTransaction();
-
- if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
- ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
- actorContext.getActorSystem(),result);
- String resolvedCohortPath = transactionContext.getResolvedCohortPath(
- reply.getCohortPath().toString());
- cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
- } else {
- LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
- result.getClass());
- }
+ cohortPathFutures.add(transactionContext.readyTransaction());
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+ identifier.toString());
}
@Override
String transactionPath = reply.getTransactionPath();
- LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
+ LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
ActorSelection transactionActor =
actorContext.actorSelection(transactionPath);
remoteTransactionPaths.put(shardName, transactionContext);
} else {
- LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
- response.getClass());
+ throw new IllegalArgumentException(String.format(
+ "Invalid reply type {} for CreateTransaction", response.getClass()));
}
} catch(Exception e){
- LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+ LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
}
}
private interface TransactionContext {
String getShardName();
- String getResolvedCohortPath(String cohortPath);
+ void closeTransaction();
- public void closeTransaction();
+ Future<ActorPath> readyTransaction();
- public Object readyTransaction();
+ void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
void deleteData(YangInstanceIdentifier path);
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
final YangInstanceIdentifier path);
- void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
-
CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
- }
+ List<Future<Object>> getRecordedOperationFutures();
+ }
- private class TransactionContextImpl implements TransactionContext {
- private final String shardName;
- private final String actorPath;
- private final ActorSelection actor;
+ private abstract class AbstractTransactionContext implements TransactionContext {
+ protected final String shardName;
+ protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
- private TransactionContextImpl(String shardName, String actorPath,
- ActorSelection actor) {
+ AbstractTransactionContext(String shardName) {
this.shardName = shardName;
- this.actorPath = actorPath;
- this.actor = actor;
}
@Override
return shardName;
}
+ @Override
+ public List<Future<Object>> getRecordedOperationFutures() {
+ return recordedOperationFutures;
+ }
+ }
+
+ private class TransactionContextImpl extends AbstractTransactionContext {
+ private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+
+ private final String actorPath;
+ private final ActorSelection actor;
+
+ private TransactionContextImpl(String shardName, String actorPath,
+ ActorSelection actor) {
+ super(shardName);
+ this.actorPath = actorPath;
+ this.actor = actor;
+ }
+
private ActorSelection getActor() {
return actor;
}
- @Override
- public String getResolvedCohortPath(String cohortPath) {
+ private String getResolvedCohortPath(String cohortPath) {
return actorContext.resolvePath(actorPath, cohortPath);
}
@Override
public void closeTransaction() {
+ LOG.debug("Tx {} closeTransaction called", identifier);
actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
}
@Override
- public Object readyTransaction() {
- return actorContext.executeRemoteOperation(getActor(),
+ public Future<ActorPath> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ identifier, recordedOperationFutures.size());
+
+ // Send the ReadyTransaction message to the Tx actor.
+
+ final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+
+ // Combine all the previously recorded put/merge/delete operation reply Futures and the
+ // ReadyTransactionReply Future into one Future. If any one fails then the combined
+ // Future will fail. We need all prior operations and the ready operation to succeed
+ // in order to attempt commit.
+
+ List<Future<Object>> futureList =
+ Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
+ futureList.addAll(recordedOperationFutures);
+ futureList.add(replyFuture);
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
+ actorContext.getActorSystem().dispatcher());
+
+ // Transform the combined Future into a Future that returns the cohort actor path from
+ // the ReadyTransactionReply. That's the end result of the ready operation.
+
+ return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+ @Override
+ public ActorPath apply(Iterable<Object> notUsed) {
+
+ LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+ identifier);
+
+ // At this point all the Futures succeeded and we need to extract the cohort
+ // actor path from the ReadyTransactionReply. For the recorded operations, they
+ // don't return any data so we're only interested that they completed
+ // successfully. We could be paranoid and verify the correct reply types but
+ // that really should never happen so it's not worth the overhead of
+ // de-serializing each reply.
+
+ // Note the Future get call here won't block as it's complete.
+ Object serializedReadyReply = replyFuture.value().get().get();
+ if(serializedReadyReply.getClass().equals(
+ ReadyTransactionReply.SERIALIZABLE_CLASS)) {
+ ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
+ actorContext.getActorSystem(), serializedReadyReply);
+
+ String resolvedCohortPath = getResolvedCohortPath(
+ reply.getCohortPath().toString());
+
+ LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
+ identifier, resolvedCohortPath);
+
+ return actorContext.actorFor(resolvedCohortPath);
+ } else {
+ // Throwing an exception here will fail the Future.
+
+ throw new IllegalArgumentException(String.format("Invalid reply type {}",
+ serializedReadyReply.getClass()));
+ }
+ }
+ }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
}
@Override
public void deleteData(YangInstanceIdentifier path) {
- actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- actorContext.sendRemoteOperationAsync(getActor(),
- new MergeData(path, data, schemaContext).toSerializable());
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ new MergeData(path, data, schemaContext).toSerializable(),
+ ActorContext.ASK_DURATION));
+ }
+
+ @Override
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ new WriteData(path, data, schemaContext).toSerializable(),
+ ActorContext.ASK_DURATION));
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
- final YangInstanceIdentifier path) {
+ final YangInstanceIdentifier path) {
+
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
+ // If there were any previous recorded put/merge/delete operation reply Futures then we
+ // must wait for them to successfully complete. This is necessary to honor the read
+ // uncommitted semantics of the public API contract. If any one fails then fail the read.
+
+ if(recordedOperationFutures.isEmpty()) {
+ finishReadData(path, returnFuture);
+ } else {
+ LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+ identifier, recordedOperationFutures.size());
+
+ // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+ // Futures#sequence accesses the passed List on a different thread, as
+ // recordedOperationFutures is not synchronized.
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+ Lists.newArrayList(recordedOperationFutures),
+ actorContext.getActorSystem().dispatcher());
+ OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> notUsed)
+ throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {} readData: a recorded operation failed: {}",
+ identifier, failure);
+
+ returnFuture.setException(new ReadFailedException(
+ "The read could not be performed because a previous put, merge,"
+ + "or delete operation failed", failure));
+ } else {
+ finishReadData(path, returnFuture);
+ }
+ }
+ };
+
+ combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ }
+
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+ }
+
+ private void finishReadData(final YangInstanceIdentifier path,
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
+
+ LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) throws Throwable {
+ public void onComplete(Throwable failure, Object readResponse) throws Throwable {
if(failure != null) {
+ LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+
returnFuture.setException(new ReadFailedException(
"Error reading data for path " + path, failure));
} else {
- if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ LOG.debug("Tx {} read operation succeeded", identifier, failure);
+
+ if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
- path, response);
+ path, readResponse);
if (reply.getNormalizedNode() == null) {
returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
} else {
}
};
- Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+ Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
- future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
- return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
- }
-
- @Override
- public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- actorContext.sendRemoteOperationAsync(getActor(),
- new WriteData(path, data, schemaContext).toSerializable());
+ readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
@Override
public CheckedFuture<Boolean, ReadFailedException> dataExists(
final YangInstanceIdentifier path) {
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+
final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+ // If there were any previous recorded put/merge/delete operation reply Futures then we
+ // must wait for them to successfully complete. This is necessary to honor the read
+ // uncommitted semantics of the public API contract. If any one fails then fail this
+ // request.
+
+ if(recordedOperationFutures.isEmpty()) {
+ finishDataExists(path, returnFuture);
+ } else {
+ LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+ identifier, recordedOperationFutures.size());
+
+ // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+ // Futures#sequence accesses the passed List on a different thread, as
+ // recordedOperationFutures is not synchronized.
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+ Lists.newArrayList(recordedOperationFutures),
+ actorContext.getActorSystem().dispatcher());
+ OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> notUsed)
+ throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+ identifier, failure);
+
+ returnFuture.setException(new ReadFailedException(
+ "The data exists could not be performed because a previous "
+ + "put, merge, or delete operation failed", failure));
+ } else {
+ finishDataExists(path, returnFuture);
+ }
+ }
+ };
+
+ combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ }
+
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+ }
+
+ private void finishDataExists(final YangInstanceIdentifier path,
+ final SettableFuture<Boolean> returnFuture) {
+
+ LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
if(failure != null) {
+ LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+
returnFuture.setException(new ReadFailedException(
- "Error checking exists for path " + path, failure));
+ "Error checking data exists for path " + path, failure));
} else {
+ LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+
if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
returnFuture.set(Boolean.valueOf(DataExistsReply.
fromSerializable(response).exists()));
Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
- return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
}
}
- private class NoOpTransactionContext implements TransactionContext {
+ private class NoOpTransactionContext extends AbstractTransactionContext {
- private final Logger
- LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
+ private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
- private final String shardName;
private final Exception failure;
- private ActorRef cohort;
-
public NoOpTransactionContext(String shardName, Exception failure){
- this.shardName = shardName;
+ super(shardName);
this.failure = failure;
}
@Override
- public String getShardName() {
- return shardName;
-
+ public void closeTransaction() {
+ LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
}
@Override
- public String getResolvedCohortPath(String cohortPath) {
- return cohort.path().toString();
+ public Future<ActorPath> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called", identifier);
+ return akka.dispatch.Futures.failed(failure);
}
@Override
- public void closeTransaction() {
- LOG.warn("txn {} closeTransaction called", identifier);
- }
-
- @Override public Object readyTransaction() {
- LOG.warn("txn {} readyTransaction called", identifier);
- cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
- return new ReadyTransactionReply(cohort.path()).toSerializable();
+ public void deleteData(YangInstanceIdentifier path) {
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
}
@Override
- public void deleteData(YangInstanceIdentifier path) {
- LOG.warn("txt {} deleteData called path = {}", identifier, path);
+ public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
}
@Override
- public void mergeData(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
- LOG.warn("txn {} mergeData called path = {}", identifier, path);
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
- LOG.warn("txn {} readData called path = {}", identifier, path);
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error reading data for path " + path, failure));
}
- @Override public void writeData(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
- LOG.warn("txn {} writeData called path = {}", identifier, path);
- }
-
- @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> dataExists(
YangInstanceIdentifier path) {
- LOG.warn("txn {} dataExists called path = {}", identifier, path);
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error checking exists for path " + path, failure));
}
}
-
-
-
}
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
+
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.isA;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+
+import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
+ @SuppressWarnings("serial")
+ static class TestException extends RuntimeException {
+ }
+
@Mock
private ActorContext actorContext;
doReturn(getSystem()).when(actorContext).getActorSystem();
}
- private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
- List<ActorPath> cohorts = Lists.newArrayList();
+ private Future<ActorPath> newCohortPath() {
+ ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
+ doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+ return Futures.successful(path);
+ }
+
+ private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
+ List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
for(int i = 1; i <= nCohorts; i++) {
- ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
- cohorts.add(path);
- doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+ cohortPathFutures.add(newCohortPath());
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+ }
+
+ private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
+ throws Exception {
+ List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+ cohortPathFutures.add(newCohortPath());
+ cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
+
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
}
private void setupMockActorContext(Class<?> requestType, Object... responses) {
any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
}
+ private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Expected ExecutionException");
+ } catch(ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
@Test
public void testCanCommitWithOneCohort() throws Exception {
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", true, future.get());
+ assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
new CanCommitTransactionReply(false));
future = proxy.canCommit();
- assertEquals("canCommit", false, future.get());
+ assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", true, future.get());
+ assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", false, future.get());
+ assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
}
- @Test(expected = ExecutionException.class)
- public void testCanCommitWithExceptionFailure() throws Exception {
+ @Test(expected = TestException.class)
+ public void testCanCommitWithExceptionFailure() throws Throwable {
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
- proxy.canCommit().get();
+ propagateExecutionExceptionCause(proxy.canCommit());
}
@Test(expected = ExecutionException.class)
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply());
- proxy.canCommit().get();
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ }
+
+ @Test(expected = TestException.class)
+ public void testCanCommitWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ try {
+ propagateExecutionExceptionCause(proxy.canCommit());
+ } finally {
+ verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
+ }
}
@Test
setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply());
- proxy.preCommit().get();
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
}
setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply(), new RuntimeException("mock"));
- proxy.preCommit().get();
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
}
@Test
setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
- proxy.abort().get();
+ proxy.abort().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
}
setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
// The exception should not get propagated.
- proxy.abort().get();
+ proxy.abort().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
}
+ @Test
+ public void testAbortWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ // The exception should not get propagated.
+ proxy.abort().get(5, TimeUnit.SECONDS);
+
+ verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
+ }
+
@Test
public void testCommit() throws Exception {
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
new CommitTransactionReply());
- proxy.commit().get();
+ proxy.commit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
}
- @Test(expected = ExecutionException.class)
- public void testCommitWithFailure() throws Exception {
+ @Test(expected = TestException.class)
+ public void testCommitWithFailure() throws Throwable {
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
- new RuntimeException("mock"));
+ new TestException());
- proxy.commit().get();
+ propagateExecutionExceptionCause(proxy.commit());
}
@Test(expected = ExecutionException.class)
- public void teseCommitWithInvalidResponseType() throws Exception {
+ public void testCommitWithInvalidResponseType() throws Exception {
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
- proxy.commit().get();
+ proxy.commit().get(5, TimeUnit.SECONDS);
+ }
+
+ @Test(expected = TestException.class)
+ public void testCommitWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ try {
+ propagateExecutionExceptionCause(proxy.commit());
+ } finally {
+ verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+ }
}
@Test
- public void testGetCohortPaths() {
+ public void testAllThreePhasesSuccessful() throws Exception {
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
- List<ActorPath> paths = proxy.getCohortPaths();
- assertNotNull("getCohortPaths returned null", paths);
- assertEquals("getCohortPaths size", 2, paths.size());
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+
+ setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+ new PreCommitTransactionReply(), new PreCommitTransactionReply());
+
+ setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
+ new CommitTransactionReply(), new CommitTransactionReply());
+
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
+ proxy.commit().get(5, TimeUnit.SECONDS);
+
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
}
}
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
+
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.times;
@SuppressWarnings("resource")
public class TransactionProxyTest extends AbstractActorTest {
}
static interface Invoker {
- void invoke(TransactionProxy proxy) throws Exception;
+ CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
}
private final Configuration configuration = new MockConfiguration();
schemaContext = TestModel.createTestContext();
doReturn(getSystem()).when(mockActorContext).getActorSystem();
+ doReturn(memberName).when(mockActorContext).getCurrentMemberName();
ShardStrategyFactory.setConfiguration(configuration);
}
ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
@Override
public boolean matches(Object argument) {
- DataExists obj = DataExists.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH);
+ return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
}
};
ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
@Override
public boolean matches(Object argument) {
- ReadData obj = ReadData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH);
+ return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
}
};
ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
@Override
public boolean matches(Object argument) {
+ if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ return false;
+ }
+
WriteData obj = WriteData.fromSerializable(argument, schemaContext);
return obj.getPath().equals(TestModel.TEST_PATH) &&
obj.getData().equals(nodeToWrite);
ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
@Override
public boolean matches(Object argument) {
+ if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ return false;
+ }
+
MergeData obj = MergeData.fromSerializable(argument, schemaContext);
return obj.getPath().equals(TestModel.TEST_PATH) &&
obj.getData().equals(nodeToWrite);
ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
@Override
public boolean matches(Object argument) {
- DeleteData obj = DeleteData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH);
+ return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
}
};
return argThat(matcher);
}
- private Object readyTxReply(ActorPath path) {
- return new ReadyTransactionReply(path).toSerializable();
+ private Future<Object> readyTxReply(ActorPath path) {
+ return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
}
private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(schemaContext, data)
- .toSerializable());
+ return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
}
private Future<Object> dataExistsReply(boolean exists) {
return Futures.successful(new DataExistsReply(exists).toSerializable());
}
+ private Future<Object> writeDataReply() {
+ return Futures.successful(new WriteDataReply().toSerializable());
+ }
+
+ private Future<Object> mergeDataReply() {
+ return Futures.successful(new MergeDataReply().toSerializable());
+ }
+
+ private Future<Object> deleteDataReply() {
+ return Futures.successful(new DeleteDataReply().toSerializable());
+ }
+
private ActorSelection actorSelection(ActorRef actorRef) {
return getSystem().actorSelection(actorRef.path());
}
ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
doReturn(getSystem().actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
- doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
eqCreateTransaction(memberName, type), anyDuration());
return actorRef;
}
+ private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
+ throws Throwable {
+
+ try {
+ future.checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ throw e.getCause();
+ }
+ }
+
@Test
public void testRead() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
}
@Test(expected = ReadFailedException.class)
- public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
+ public void testReadWithInvalidReplyMessageType() throws Exception {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
- doThrow(new TestException()).when(mockActorContext).
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY, schemaContext);
- try {
- transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
- fail("Expected ReadFailedException");
- } catch(ReadFailedException e) {
- // Expected - throw cause - expects TestException.
- throw e.getCause();
- }
+ propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
}
private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY, schemaContext);
- try {
- invoker.invoke(transactionProxy);
- fail("Expected ReadFailedException");
- } catch(ReadFailedException e) {
- // Expected - throw cause - expects TestException.
- throw e.getCause();
- }
+ propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
}
private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
@Override
- public void invoke(TransactionProxy proxy) throws Exception {
- proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
+ return proxy.read(TestModel.TEST_PATH);
}
});
}
testReadWithExceptionOnInitialCreateTransaction(new TestException());
}
+ @Test(expected = TestException.class)
+ public void testReadWithPriorRecordingOperationFailure() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
+ anyDuration());
+
+ doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ try {
+ propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
+ } finally {
+ verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ }
+ }
+
+ @Test
+ public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, expectedNode);
+
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+ TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+
+ assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testReadPreConditionCheck() {
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+
@Test
public void testExists() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
@Override
- public void invoke(TransactionProxy proxy) throws Exception {
- proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
+ return proxy.exists(TestModel.TEST_PATH);
}
});
}
@Test(expected = ReadFailedException.class)
- public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception {
+ public void testExistsWithInvalidReplyMessageType() throws Exception {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
- doThrow(new TestException()).when(mockActorContext).
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY, schemaContext);
+ propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
+ }
+
+ @Test(expected = TestException.class)
+ public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
+ anyDuration());
+
+ doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
try {
- transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
- fail("Expected ReadFailedException");
- } catch(ReadFailedException e) {
- // Expected - throw cause - expects TestException.
- throw e.getCause();
+ propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
+ } finally {
+ verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
}
}
@Test
- public void testWrite() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+
+ assertEquals("Exists response", true, exists);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testxistsPreConditionCheck() {
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY, schemaContext);
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+
+ private void verifyRecordingOperationFutures(List<Future<Object>> futures,
+ Class<?>... expResultTypes) throws Exception {
+ assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
+
+ int i = 0;
+ for( Future<Object> future: futures) {
+ assertNotNull("Recording operation Future is null", future);
+
+ Class<?> expResultType = expResultTypes[i++];
+ if(Throwable.class.isAssignableFrom(expResultType)) {
+ try {
+ Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ fail("Expected exception from recording operation Future");
+ } catch(Exception e) {
+ // Expected
+ }
+ } else {
+ assertEquals("Recording operation Future result type", expResultType,
+ Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).sendRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ verify(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.SERIALIZABLE_CLASS);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testWritePreConditionCheck() {
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testWriteAfterReadyPreConditionCheck() {
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.ready();
+
+ transactionProxy.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
}
@Test
public void testMerge() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY, schemaContext);
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).sendRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+ verify(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS);
}
@Test
public void testDelete() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY, schemaContext);
transactionProxy.delete(TestModel.TEST_PATH);
- verify(mockActorContext).sendRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData());
+ verify(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ DeleteDataReply.SERIALIZABLE_CLASS);
+ }
+
+ private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
+ Object... expReplies) throws Exception {
+ assertEquals("getReadyOperationFutures size", expReplies.length,
+ proxy.getCohortPathFutures().size());
+
+ int i = 0;
+ for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
+ assertNotNull("Ready operation Future is null", future);
+
+ Object expReply = expReplies[i++];
+ if(expReply instanceof ActorPath) {
+ ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ assertEquals("Cohort actor path", expReply, actual);
+ } else {
+ // Expecting exception.
+ try {
+ Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ fail("Expected exception from ready operation Future");
+ } catch(Exception e) {
+ // Expected
+ }
+ }
+ }
}
@SuppressWarnings("unchecked")
public void testReady() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation(
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.read(TestModel.TEST_PATH);
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortPathFutures(proxy, actorRef.path());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithRecordingOperationFailure() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
+ anyDuration());
+
+ doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+
+ verifyCohortPathFutures(proxy, TestException.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithReplyFailure() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortPathFutures(proxy, TestException.class);
+ }
+
+ @Test
+ public void testReadyWithInitialCreateTransactionFailure() throws Exception {
+
+ doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
+ anyString(), any(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithInvalidReplyMessageType() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.successful(new Object())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths());
+ verifyCohortPathFutures(proxy, IllegalArgumentException.class);
}
@Test
LogicalDatastoreType store, YangInstanceIdentifier path);
/**
- * Checks if data is available in the logical data store located at provided path
+ /**
+ * Checks if data is available in the logical data store located at provided path.
+ * <p>
*
+ * Note: a successful result from this method makes no guarantee that a subsequent call to {@link #read}
+ * will succeed. It is possible that the data resides in a data store on a remote node and, if that
+ * node goes down or a network failure occurs, a subsequent read would fail. Another scenario is if
+ * the data is deleted in between the calls to <code>exists</code> and <code>read</code>
+ *
+ * @param store
+ * Logical data store from which read should occur.
* @param path
* Path which uniquely identifies subtree which client want to
* check existence of
CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(YangInstanceIdentifier path);
/**
- * Checks if data is available in the logical data store located at provided path
+ * Checks if data is available in the logical data store located at provided path.
+ * <p>
+ *
+ * Note: a successful result from this method makes no guarantee that a subsequent call to {@link #read}
+ * will succeed. It is possible that the data resides in a data store on a remote node and, if that
+ * node goes down or a network failure occurs, a subsequent read would fail. Another scenario is if
+ * the data is deleted in between the calls to <code>exists</code> and <code>read</code>
*
* @param path
* Path which uniquely identifies subtree which client want to
return true;
}
- // FIXME: do we need anything else? If not, flip this to 'false'
- return true;
+ return false;
}
/**
}
@Override
- public void onDeviceDisconnected() {
+ public synchronized void onDeviceDisconnected() {
salProvider.getDatastoreAdapter().updateDeviceState(false, Collections.<QName>emptySet());
salProvider.getMountInstance().onDeviceDisconnected();
}
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.io.InputStream;
-import java.util.concurrent.ExecutionException;
import org.apache.commons.io.IOUtils;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
rpc.invokeRpc(GET_SCHEMA_QNAME, getSchemaRequest),
new ResultToYangSourceTransformer(id, sourceIdentifier, moduleName, revision));
- // FIXME remove this get, it is only present to wait until source is retrieved
+ final CheckedFuture<YangTextSchemaSource, SchemaSourceException> checked = Futures.makeChecked(transformed, MAPPER);
+
+ // / FIXME remove this get, it is only present to wait until source is retrieved
// (goal is to limit concurrent schema download, since NetconfDevice listener does not handle concurrent messages properly)
try {
logger.trace("{}: Blocking for {}", id, sourceIdentifier);
- transformed.get();
- } catch (final InterruptedException e) {
- throw new RuntimeException(e);
- } catch (final ExecutionException e) {
- throw new IllegalStateException(id + ": Failed while getting source: " + sourceIdentifier, e);
+ checked.checkedGet();
+ } catch (final SchemaSourceException e) {
+ return Futures.immediateFailedCheckedFuture(e);
}
- return Futures.makeChecked(transformed, MAPPER);
+ return checked;
}
/**
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
+
+ <!-- dependencies to use AbstractDataBrokerTest -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-broker-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-broker-impl</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <artifactId>junit</artifactId>
+ <groupId>junit</groupId>
+ <scope>test</scope>
+ </dependency>
+ <!-- used to mock up classes -->
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
--- /dev/null
+/*
+* Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+*
+* This program and the accompanying materials are made available under the
+* terms of the Eclipse Public License v1.0 which accompanies this distribution,
+* and is available at http://www.eclipse.org/legal/epl-v10.html
+*/
+package org.opendaylight.controller.sample.toaster.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.Future;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.DisplayString;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInput;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInputBuilder;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.Toaster;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.WheatBread;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Optional;
+
+public class OpenDaylightToasterTest extends AbstractDataBrokerTest{
+
+ private static InstanceIdentifier<Toaster> TOASTER_IID =
+ InstanceIdentifier.builder( Toaster.class ).build();
+ OpendaylightToaster toaster;
+
+ @Override
+ protected void setupWithDataBroker(DataBroker dataBroker) {
+ toaster = new OpendaylightToaster();
+ toaster.setDataProvider( dataBroker );
+
+ /**
+ * Doesn't look like we have support for the NotificationProviderService yet, so mock it
+ * for now.
+ */
+ NotificationProviderService mockNotification = mock( NotificationProviderService.class );
+ toaster.setNotificationProvider( mockNotification );
+ }
+
+ @Test
+ public void testToasterInitOnStartUp() throws Exception {
+ DataBroker broker = getDataBroker();
+
+ ReadOnlyTransaction rTx = broker.newReadOnlyTransaction();
+ Optional<Toaster> optional = rTx.read( LogicalDatastoreType.OPERATIONAL, TOASTER_IID ).get();
+ assertNotNull( optional );
+ assertTrue( "Operational toaster not present", optional.isPresent() );
+
+ Toaster toaster = optional.get();
+
+ assertEquals( Toaster.ToasterStatus.Up, toaster.getToasterStatus() );
+ assertEquals( new DisplayString("Opendaylight"),
+ toaster.getToasterManufacturer() );
+ assertEquals( new DisplayString("Model 1 - Binding Aware"),
+ toaster.getToasterModelNumber() );
+
+ Optional<Toaster> configToaster =
+ rTx.read( LogicalDatastoreType.CONFIGURATION, TOASTER_IID ).get();
+ assertFalse( "Didn't expect config data for toaster.",
+ configToaster.isPresent() );
+ }
+
+ @Test
+ @Ignore //ignored because it is not an e test right now. Illustrative purposes only.
+ public void testSomething() throws Exception{
+ MakeToastInput toastInput = new MakeToastInputBuilder()
+ .setToasterDoneness( 1L )
+ .setToasterToastType( WheatBread.class )
+ .build();
+
+ //NOTE: In a real test we would want to override the Thread.sleep() to prevent our junit test
+ //for sleeping for a second...
+ Future<RpcResult<Void>> makeToast = toaster.makeToast( toastInput );
+
+ RpcResult<Void> rpcResult = makeToast.get();
+
+ assertNotNull( rpcResult );
+ assertTrue( rpcResult.isSuccessful() );
+ //etc
+ }
+
+}
import java.net.UnknownHostException;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Prefix;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.MacAddressFilter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.EthernetMatch;
import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.Layer3Match;
import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.layer._3.match.Ipv4Match;
import org.slf4j.Logger;
if (statsFlow.getEthernetMatch() != null) {
return false;
}
- } else if(!storedFlow.getEthernetMatch().equals(statsFlow.getEthernetMatch())) {
+ } else if(!ethernetMatchEquals(statsFlow.getEthernetMatch(),storedFlow.getEthernetMatch())) {
return false;
}
if (storedFlow.getIcmpv4Match()== null) {
return true;
}
+ /*
+ * Custom EthernetMatch is required because mac address string provided by user in EthernetMatch can be in
+ * any case (upper or lower or mix). Ethernet Match which controller receives from switch is always
+ * an upper case string. Default EthernetMatch equals doesn't use equalsIgnoreCase() and hence it fails.
+ * E.g User provided mac address string in flow match is aa:bb:cc:dd:ee:ff and when controller fetch
+ * statistic data, openflow driver library returns AA:BB:CC:DD:EE:FF and default eqauls fails here.
+ */
+ @VisibleForTesting
+ static boolean ethernetMatchEquals(EthernetMatch statsEthernetMatch, EthernetMatch storedEthernetMatch){
+ boolean verdict = true;
+ Boolean checkNullValues = checkNullValues(statsEthernetMatch, storedEthernetMatch);
+ if (checkNullValues != null) {
+ verdict = checkNullValues;
+ } else {
+ if(verdict){
+ verdict = ethernetMatchFieldsEquals(statsEthernetMatch.getEthernetSource(),storedEthernetMatch.getEthernetSource());
+ }
+ if(verdict){
+ verdict = ethernetMatchFieldsEquals(statsEthernetMatch.getEthernetDestination(),storedEthernetMatch.getEthernetDestination());
+ }
+ if(verdict){
+ if(statsEthernetMatch.getEthernetType() == null){
+ if(storedEthernetMatch.getEthernetType() != null){
+ verdict = false;
+ }
+ }else{
+ verdict = statsEthernetMatch.getEthernetType().equals(storedEthernetMatch.getEthernetType());
+ }
+ }
+ }
+ return verdict;
+ }
+
+ private static boolean ethernetMatchFieldsEquals(MacAddressFilter statsEthernetMatchFields,
+ MacAddressFilter storedEthernetMatchFields){
+ boolean verdict = true;
+ Boolean checkNullValues = checkNullValues(statsEthernetMatchFields, storedEthernetMatchFields);
+ if (checkNullValues != null) {
+ verdict = checkNullValues;
+ } else {
+ if(verdict){
+ verdict = macAddressEquals(statsEthernetMatchFields.getAddress(),storedEthernetMatchFields.getAddress());
+ }
+ if(verdict){
+ verdict = macAddressEquals(statsEthernetMatchFields.getMask(),storedEthernetMatchFields.getMask());
+ }
+ }
+ return verdict;
+ }
+
+ private static boolean macAddressEquals(MacAddress statsMacAddress, MacAddress storedMacAddress){
+ boolean verdict = true;
+ Boolean checkNullValues = checkNullValues(statsMacAddress, storedMacAddress);
+ if (checkNullValues != null) {
+ verdict = checkNullValues;
+ } else {
+ verdict = statsMacAddress.getValue().equalsIgnoreCase(storedMacAddress.getValue());
+ }
+ return verdict;
+ }
+
@VisibleForTesting
static boolean layer3MatchEquals(Layer3Match statsLayer3Match, Layer3Match storedLayer3Match){
boolean verdict = true;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Prefix;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.l2.types.rev130827.EtherType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.ethernet.match.fields.EthernetSourceBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.ethernet.match.fields.EthernetTypeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.EthernetMatch;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.EthernetMatchBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.layer._3.match.Ipv4Match;
import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.layer._3.match.Ipv4MatchBuilder;
import org.slf4j.Logger;
return ipv4MatchBuilder.build();
}
+ /**
+ * Test method for {@link org.opendaylight.controller.md.statistics.manager.FlowComparator#ethernetMatchEquals(EthernetMatch, EthernetMatch)
+ */
+ @Test
+ public void testEthernetMatchEquals() {
+ String[][][] ethernetMatchSeeds = new String[][][] {
+ {{"aa:bb:cc:dd:ee:ff", "ff:ff:ff:ff:ff:ff","0800"}, {"aa:bb:cc:dd:ee:ff", "ff:ff:ff:ff:ff:ff","0800"}},
+ {{"aa:bb:cc:dd:ee:ff", "ff:ff:ff:ff:ff:ff","0800"}, {"aa:bb:bc:cd:ee:ff", "ff:ff:ff:ff:ff:ff","0800"}},
+ {{"aa:bb:cc:dd:ee:ff", "ff:ff:ff:ff:ff:ff","0800"}, {"AA:BB:CC:DD:EE:FF", "ff:ff:ff:ff:ff:ff","0800"}},
+ {{"AA:BB:CC:dd:ee:ff", "ff:ff:ff:ff:ff:ff","0800"}, {"aa:bb:cc:dd:ee:ff", "ff:ff:ff:ff:ff:ff","0800"}},
+ {{"AA:BB:CC:dd:ee:ff", "ff:ff:ff:ff:ff:ff","0800"}, {"aa:bb:cc:dd:ee:ff", "FF:FF:FF:FF:FF:FF","0800"}},
+ {{"AA:BB:CC:dd:ee:ff", "ff:ff:ff:ee:ee:ee","0800"}, {"aa:bb:cc:dd:ee:ff", "FF:FF:FF:FF:FF:FF","0800"}},
+
+ {{"AA:BB:CC:dd:ee:ff", null,"0800"}, {"aa:bb:cc:dd:ee:ff", null,"0800"}},
+ {{"AA:BB:CC:dd:ee:ff", null,"0800"}, {"aa:bb:cc:dd:ee:ff", null,"0806"}},
+ {{"AA:BB:CC:dd:ee:ff", null,"0800"}, {"aa:bb:cc:dd:ee:ff", "FF:FF:FF:FF:FF:FF","0800"}},
+ {{"AA:BB:CC:dd:ee:ff", null,"0800"}, {null, "FF:FF:FF:FF:FF:FF","0800"}},
+
+ {{"AA:BB:CC:dd:ee:ff", "ff:ff:ff:ff:ff:ff",null}, {null, "FF:FF:FF:FF:FF:FF","0800"}},
+ {{"AA:BB:CC:dd:ee:ff", "ff:ff:ff:ff:ff:ff",null}, {"aa:bb:cc:dd:ee:ff", "FF:FF:FF:FF:FF:FF",null}},
+ {{"AA:BB:CC:dd:ee:ff", "ff:ff:ff:ff:ff:ff",null}, {null, "FF:FF:FF:FF:FF:FF",null}},
+
+ {{null, null,null}, {null, null,"0800"}},
+ {{null, null,null}, {null, null,null}},
+ };
+
+ boolean[] matches = new boolean[] {
+ true,
+ false,
+ true,
+ true,
+ true,
+ false,
+
+ true,
+ false,
+ false,
+ false,
+ false,
+ true,
+ false,
+
+ false,
+ true
+ };
+
+ for (int i = 0; i < matches.length; i++) {
+ checkComparisonOfEthernetMatch(
+ ethernetMatchSeeds[i][0][0], ethernetMatchSeeds[i][0][1],ethernetMatchSeeds[i][0][2],
+ ethernetMatchSeeds[i][1][0], ethernetMatchSeeds[i][1][1],ethernetMatchSeeds[i][1][2],
+ matches[i]);
+ }
+ }
+
+ /*
+ * @param ethernetMatch1
+ * @param ethernetMatch2
+ */
+ private static void checkComparisonOfEthernetMatch(String macAddress1, String macAddressMask1,String etherType1,
+ String macAddress2, String macAddressMask2,String etherType2, boolean expectedResult) {
+ EthernetMatch ethernetMatch1 = prepareEthernetMatch(macAddress1, macAddressMask1,etherType1);
+ EthernetMatch ethernetMatch2 = prepareEthernetMatch(macAddress2, macAddressMask2,etherType2);
+ boolean comparisonResult;
+ try {
+ comparisonResult = FlowComparator.ethernetMatchEquals(ethernetMatch1, ethernetMatch2);
+ Assert.assertEquals("failed to compare: "+ethernetMatch1+" vs. "+ethernetMatch2,
+ expectedResult, comparisonResult);
+ } catch (Exception e) {
+ LOG.error("failed to compare: {} vs. {}", ethernetMatch1, ethernetMatch2, e);
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private static EthernetMatch prepareEthernetMatch(String macAddress, String macAddressMask, String etherType) {
+ EthernetMatchBuilder ethernetMatchBuilder = new EthernetMatchBuilder();
+ EthernetSourceBuilder ethernetSourceBuilder = new EthernetSourceBuilder();
+ if (macAddress != null) {
+ ethernetSourceBuilder.setAddress(new MacAddress(macAddress));
+ }
+ if (macAddressMask != null) {
+ ethernetSourceBuilder.setMask(new MacAddress(macAddressMask));
+ }
+ if(etherType != null){
+ EthernetTypeBuilder ethernetType = new EthernetTypeBuilder();
+ ethernetType.setType(new EtherType(Long.parseLong(etherType,16)));
+ ethernetMatchBuilder.setEthernetType(ethernetType.build());
+ }
+ ethernetMatchBuilder.setEthernetSource(ethernetSourceBuilder.build());
+
+ return ethernetMatchBuilder.build();
+ }
}
* of some module. Contains default value extracted from yang file.
*/
public class AttributeConfigElement {
- private final Object dafaultValue;
+ private final Object defaultValue;
private final Object value;
private Optional<?> resolvedValue;
private Object resolvedDefaultValue;
private String jmxName;
- public AttributeConfigElement(Object dafaultValue, Object value) {
- this.dafaultValue = dafaultValue;
+ public AttributeConfigElement(Object defaultValue, Object value) {
+ this.defaultValue = defaultValue;
this.value = value;
}
public void resolveValue(AttributeResolvingStrategy<?, ? extends OpenType<?>> attributeResolvingStrategy,
String attrName) throws NetconfDocumentedException {
resolvedValue = attributeResolvingStrategy.parseAttribute(attrName, value);
- Optional<?> resolvedDefault = attributeResolvingStrategy.parseAttribute(attrName, dafaultValue);
+ Optional<?> resolvedDefault = attributeResolvingStrategy.parseAttribute(attrName, defaultValue);
resolvedDefaultValue = resolvedDefault.isPresent() ? resolvedDefault.get() : null;
}
return value;
}
+ public Object getDefaultValue() {
+ return defaultValue;
+ }
+
public Optional<?> getResolvedValue() {
return resolvedValue;
}
@Override
public String toString() {
- return "AttributeConfigElement [dafaultValue=" + dafaultValue + ", value=" + value + "]";
+ return "AttributeConfigElement [defaultValue=" + defaultValue + ", value=" + value + "]";
}
}
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
List<XmlElement> recognisedChildren = Lists.newArrayList();
for (Entry<String, AttributeReadingStrategy> innerAttrEntry : innerStrategies.entrySet()) {
- List<XmlElement> childItem = null;
- childItem = complexElement.getChildElementsWithSameNamespace(innerAttrEntry.getKey());
+ List<XmlElement> childItem = complexElement.getChildElementsWithSameNamespace(
+ innerAttrEntry.getKey());
recognisedChildren.addAll(childItem);
AttributeConfigElement resolvedInner = innerAttrEntry.getValue().readElement(childItem);
- innerMap.put(innerAttrEntry.getKey(), resolvedInner.getValue());
+ Object value = resolvedInner.getValue();
+ if(value == null) {
+ value = resolvedInner.getDefaultValue();
+ }
+
+ innerMap.put(innerAttrEntry.getKey(), value);
}
complexElement.checkUnrecognisedElements(recognisedChildren);
@Override
protected AttributeReadingStrategy caseTOAttribute(CompositeType openType) {
- Preconditions.checkState(getLastAttribute() instanceof TOAttribute);
- Map<String, AttributeIfc> inner = ((TOAttribute)getLastAttribute()).getYangPropertiesToTypesMap();
+ AttributeIfc lastAttribute = getLastAttribute();
+ Preconditions.checkState(lastAttribute instanceof TOAttribute);
+ Map<String, AttributeIfc> inner = ((TOAttribute)lastAttribute).getYangPropertiesToTypesMap();
Map<String, AttributeReadingStrategy> innerStrategies = Maps.newHashMap();
innerStrategies.put(innerAttrEntry.getKey(), innerStrat);
}
- return new CompositeAttributeReadingStrategy(getLastAttribute().getNullableDefault(), innerStrategies);
+ return new CompositeAttributeReadingStrategy(lastAttribute.getNullableDefault(), innerStrategies);
}
@Override
protected AttributeReadingStrategy caseListAttribute(ArrayType<?> openType) {
- Preconditions.checkState(getLastAttribute() instanceof ListAttribute);
- AttributeReadingStrategy innerStrategy = prepareReadingStrategy(key, ((ListAttribute) getLastAttribute()).getInnerAttribute());
- return new ArrayAttributeReadingStrategy(getLastAttribute().getNullableDefault(), innerStrategy);
+ AttributeIfc lastAttribute = getLastAttribute();
+ Preconditions.checkState(lastAttribute instanceof ListAttribute);
+ AttributeReadingStrategy innerStrategy = prepareReadingStrategy(key, ((ListAttribute) lastAttribute).getInnerAttribute());
+ return new ArrayAttributeReadingStrategy(lastAttribute.getNullableDefault(), innerStrategy);
}
@Override
protected AttributeReadingStrategy caseListDependeciesAttribute(ArrayType<?> openType) {
- Preconditions.checkState(getLastAttribute() instanceof ListDependenciesAttribute);
+ AttributeIfc lastAttribute = getLastAttribute();
+ Preconditions.checkState(lastAttribute instanceof ListDependenciesAttribute);
AttributeReadingStrategy innerStrategy = caseDependencyAttribute(SimpleType.OBJECTNAME);
- return new ArrayAttributeReadingStrategy(getLastAttribute().getNullableDefault(), innerStrategy);
+ return new ArrayAttributeReadingStrategy(lastAttribute.getNullableDefault(), innerStrategy);
}
}
@Override
public void initialize(final Channel ch, final Promise<NetconfClientSession> promise) {
try {
+ // ssh handler has to be the first handler in pipeline
ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler));
super.initialize(ch,promise);
} catch (final IOException e) {
connectPromise.setSuccess();
connectPromise = null;
- sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut());
+ sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
ctx.fireChannelActive();
sshWriteAsyncHandler.write(ctx, msg, promise);
}
- private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
- logger.debug("SSH session closed on channel: {}", ctx.channel());
- ctx.fireChannelInactive();
- }
-
@Override
public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
this.connectPromise = promise;
channel = null;
promise.setSuccess();
- handleSshSessionClosed(ctx);
+ logger.debug("SSH session closed on channel: {}", ctx.channel());
+ ctx.fireChannelInactive();
}
/**
private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
private static final int BUFFER_SIZE = 8192;
+ private final ChannelOutboundHandler asyncSshHandler;
private final ChannelHandlerContext ctx;
private IoInputStream asyncOut;
private Buffer buf;
private IoReadFuture currentReadFuture;
- public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+ public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+ this.asyncSshHandler = asyncSshHandler;
this.ctx = ctx;
this.asyncOut = asyncOut;
buf = new Buffer(BUFFER_SIZE);
if(future.getException() != null) {
if(asyncOut.isClosed() || asyncOut.isClosing()) {
- // We are closing
- handleSshSessionClosed(ctx);
+
+ // Ssh dropped
+ logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+ invokeDisconnect();
+ return;
} else {
logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
- throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException());
+ invokeDisconnect();
}
}
}
}
+ private void invokeDisconnect() {
+ try {
+ asyncSshHandler.disconnect(ctx, ctx.newPromise());
+ } catch (final Exception e) {
+ // This should not happen
+ throw new IllegalStateException(e);
+ }
+ }
+
@Override
public synchronized void close() {
// Remove self as listener on close to prevent reading from closed input
this.asyncIn = asyncIn;
}
+ int c = 0;
+
public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
try {
- if(asyncIn.isClosed() || asyncIn.isClosing()) {
- handleSshSessionClosed(ctx);
+ if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+ // If we are closed/closing, set immediate fail
+ promise.setFailure(new IllegalStateException("Channel closed"));
} else {
lastWriteFuture = asyncIn.write(toBuffer(msg));
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
// Notify success or failure
if (future.isWritten()) {
promise.setSuccess();
+ } else {
+ promise.setFailure(future.getException());
}
- promise.setFailure(future.getException());
// Reset last pending future
synchronized (SshWriteAsyncHandler.this) {
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(final IoWriteFuture future) {
- if(future.isWritten()) {
+ if (future.isWritten()) {
synchronized (SshWriteAsyncHandler.this) {
// Pending done, decrease counter
pendingWriteCounter--;
new Thread(new EchoServer(), "EchoServer").start();
AuthProvider authProvider = mock(AuthProvider.class);
doReturn(true).when(authProvider).authenticated(anyString(), anyString());
+ doReturn("auth").when(authProvider).toString();
+
NetconfSSHServer netconfSSHServer = NetconfSSHServer.start(10831, NetconfConfigUtil.getNetconfLocalAddress(),
new NioEventLoopGroup(), PEMGenerator.generate().toCharArray());
netconfSSHServer.setAuthProvider(authProvider);