<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>mockito-configuration</artifactId>
+ </dependency>
</dependencies>
<build>
--- /dev/null
+package org.opendaylight.controller.config.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AttributeEntryTest {
+
+ private AttributeEntry attributeEntryClient;
+ private final String key = "myKey";
+ private final String description = "myDescription";
+ private final String type = "myType";
+ private final boolean boolValue = false;
+
+ @Before
+ public void setUp() throws Exception {
+ attributeEntryClient = new AttributeEntry("myKey", "myDescription", null, "myType", false);
+ }
+
+ @Test
+ public void testAttributeEntryGetters() throws Exception{
+ assertEquals(key, attributeEntryClient.getKey());
+ assertEquals(description, attributeEntryClient.getDescription());
+ final Object value = attributeEntryClient.getValue();
+ assertNull(value);
+ assertEquals(type, attributeEntryClient.getType());
+ assertEquals(boolValue, attributeEntryClient.isRw());
+ }
+}
*/
package org.opendaylight.controller.config.util;
-import com.google.common.collect.Sets;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.config.api.ConfigRegistry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Set;
-import static org.junit.Assert.assertEquals;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.config.api.ConfigRegistry;
+
+import com.google.common.collect.Sets;
public class ConfigRegistryClientsTest {
private ObjectName testingRegistryON;
private final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
private ConfigRegistryClient jmxRegistryClient;
+ private ConfigTransactionClient jmxTransactionClient;
+ private Map<String, ObjectName> map;
@Before
public void setUp() throws Exception {
mbs.registerMBean(testingRegistry, testingRegistryON);
jmxRegistryClient = new ConfigRegistryJMXClient(
ManagementFactory.getPlatformMBeanServer());
+ map = new HashMap<>();
}
@After
}
}
+ @Test
+ public void testCreateTransaction() throws Exception{
+ jmxTransactionClient = jmxRegistryClient.createTransaction();
+ assertNotNull(jmxTransactionClient);
+ }
+
+ @Test
+ public void testGetConfigTransactionClient2() throws Exception{
+ jmxTransactionClient = jmxRegistryClient.getConfigTransactionClient("transactionName");
+ assertNotNull(jmxTransactionClient);
+ }
+
+ @Test
+ public void testGetConfigTransactionClient() throws Exception{
+ jmxTransactionClient = jmxRegistryClient.getConfigTransactionClient(testingRegistryON);
+ assertNotNull(jmxTransactionClient);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNewMXBeanProxy() throws Exception{
+ if (jmxRegistryClient instanceof ConfigRegistryJMXClient) {
+ ConfigRegistryJMXClient client = (ConfigRegistryJMXClient) jmxRegistryClient;
+ assertNull(client.newMXBeanProxy(testingRegistryON, String.class));
+ } else {
+ throw new AssertionError("brm msg");
+ }
+ }
+
+ @Test
+ public void testBeginConfig() throws Exception{
+ Assert.assertNotNull(jmxRegistryClient.beginConfig());
+ }
+
+ @Test
+ public void testCommitConfig() throws Exception{
+ assertNull(jmxRegistryClient.commitConfig(testingRegistryON));
+ }
+
+ @Test
+ public void testGetOpenConfigs() throws Exception{
+ assertNull(jmxRegistryClient.getOpenConfigs());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testGetVersion() throws Exception{
+ assertEquals(3, jmxRegistryClient.getVersion());
+ }
+
+ @Test
+ public void testGetAvailableModuleNames() throws Exception{
+ assertNull(jmxRegistryClient.getAvailableModuleNames());
+ }
+
+ @Test
+ public void testIsHealthy() throws Exception{
+ assertEquals(false, jmxRegistryClient.isHealthy());
+ }
+
+ @Test
+ public void testLookupConfigBeans3() throws Exception{
+ Set<ObjectName> son = jmxRegistryClient.lookupConfigBeans();
+ assertEquals(3, son.size());
+ }
+
+ @Test
+ public void testLookupConfigBeans2() throws Exception{
+ Set<ObjectName> son = jmxRegistryClient.lookupConfigBeans(TestingConfigRegistry.moduleName1);
+ assertEquals(2, son.size());
+ }
+
+ @Test
+ public void testLookupConfigBeans() throws Exception{
+ Set<ObjectName> son = jmxRegistryClient.lookupConfigBeans(TestingConfigRegistry.moduleName1, TestingConfigRegistry.instName1);
+ Set<ObjectName> on = Sets.newHashSet(TestingConfigRegistry.conf2);
+ assertEquals(on, son);
+ }
+
+ @Test
+ public void testLookupConfigBean() throws Exception{
+ ObjectName on = jmxRegistryClient.lookupConfigBean(TestingConfigRegistry.moduleName1, null);
+ assertEquals(TestingConfigRegistry.conf3, on);
+ }
+
@Test
public void testLookupRuntimeBeans() throws Exception {
Set<ObjectName> jmxLookup = lookupRuntimeBeans(jmxRegistryClient);
}
return beans;
}
+
+ @Test
+ public void testCheckConfigBeanExists() throws Exception{
+ jmxRegistryClient.checkConfigBeanExists(testingRegistryON);
+ assertEquals(true, TestingConfigRegistry.checkBool);
+ }
+
+ @Test
+ public void testLookupConfigBeanByServiceInterfaceName() throws Exception{
+ ObjectName on = clientLookupConfigBeanByServiceInterfaceName();
+ assertEquals(TestingConfigRegistry.conf1, on);
+ }
+
+ private ObjectName clientLookupConfigBeanByServiceInterfaceName(){
+ return jmxRegistryClient.lookupConfigBeanByServiceInterfaceName("qnameA", "refA");
+ }
+
+ @Test
+ public void testGetServiceMapping() throws Exception{
+ assertNull(jmxRegistryClient.getServiceMapping());
+ }
+
+ @Test
+ public void testLookupServiceReferencesByServiceInterfaceName() throws Exception{
+ map.put("conf2", TestingConfigRegistry.conf2);
+ assertEquals(map, jmxRegistryClient.lookupServiceReferencesByServiceInterfaceName("qnameB"));
+ }
+
+ @Test
+ public void testLookupServiceInterfaceNames() throws Exception{
+ assertThat(clientLookupServiceInterfaceNames(testingRegistryON), hasItem(TestingConfigRegistry.serviceQName1));
+ assertThat(clientLookupServiceInterfaceNames(testingRegistryON), hasItem(TestingConfigRegistry.serviceQName2));
+ }
+
+ private Set<String> clientLookupServiceInterfaceNames(ObjectName client) throws InstanceNotFoundException{
+ return jmxRegistryClient.lookupServiceInterfaceNames(client);
+ }
+
+ @Test
+ public void testGetServiceInterfaceName() throws Exception{
+ assertNull(jmxRegistryClient.getServiceInterfaceName(null, null));
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testInvokeMethod() throws Exception{
+ assertNull(jmxRegistryClient.invokeMethod(testingRegistryON, "name", null, null));
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testGetAttributeCurrentValue() throws Exception{
+ assertNull(jmxRegistryClient.getAttributeCurrentValue(testingRegistryON, "attrName"));
+ }
+
+ @Test
+ public void testGetAvailableModuleFactoryQNames() throws Exception{
+ for(String str : jmxRegistryClient.getAvailableModuleFactoryQNames()){
+ if(str != TestingConfigRegistry.moduleName1){
+ assertEquals(TestingConfigRegistry.moduleName2, str);
+ }
+ else{
+ assertEquals(TestingConfigRegistry.moduleName1, str);
+ }
+ }
+ }
+
+ @Test
+ public void testGetServiceReference() throws Exception{
+ Assert.assertNotNull(jmxRegistryClient.getServiceReference(null, null));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testcheckServiceReferenceExists() throws Exception{
+ jmxRegistryClient.checkServiceReferenceExists(testingRegistryON);
+ }
}
*/
package org.opendaylight.controller.config.util;
-import com.google.common.collect.Sets;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.Attribute;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+import org.opendaylight.controller.config.api.ValidationException;
+import org.opendaylight.controller.config.api.ValidationException.ExceptionMessageWithStackTrace;
import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
+import com.google.common.collect.Sets;
public class ConfigTransactionClientsTest {
private final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
private TestingConfigTransactionController transactionController;
private ObjectName transactionControllerON;
private ConfigTransactionClient jmxTransactionClient;
+ Attribute attr;
+
@Before
public void setUp() throws Exception {
transactionControllerON = new ObjectName(ObjectNameUtil.ON_DOMAIN + ":"
+ ObjectNameUtil.TYPE_KEY + "=TransactionController");
mbs.registerMBean(transactionController, transactionControllerON);
- jmxTransactionClient = new ConfigTransactionJMXClient(null, transactionControllerON,
+ jmxTransactionClient = new ConfigTransactionJMXClient(null,
+ transactionControllerON,
ManagementFactory.getPlatformMBeanServer());
}
public void testLookupConfigBeans() throws Exception {
Set<ObjectName> jmxLookup = testClientLookupConfigBeans(jmxTransactionClient);
assertEquals(Sets.newHashSet(transactionController.conf1,
- transactionController.conf2, transactionController.conf3), jmxLookup);
+ transactionController.conf2, transactionController.conf3),
+ jmxLookup);
}
private Set<ObjectName> testClientLookupConfigBeans(
assertEquals(3, beans.size());
return beans;
}
+
+ @Test
+ public void testGetObjectName() throws Exception {
+ testClientGetObjectName(jmxTransactionClient);
+ assertEquals(testClientGetObjectName(jmxTransactionClient), true);
+ }
+
+ private boolean testClientGetObjectName(ConfigTransactionClient client) {
+ return transactionControllerON.equals(client.getObjectName());
+ }
+
+ @Test
+ public void testGetAvailableModuleNames() throws Exception {
+ Set<String> jmxMN = testClientGetAvailableModuleNames(jmxTransactionClient);
+ assertNull(jmxMN);
+ }
+
+ private Set<String> testClientGetAvailableModuleNames(
+ ConfigTransactionClient client) {
+ return client.getAvailableModuleNames();
+ }
+
+ @Test
+ public void testGetTransactionName() throws Exception {
+ String jmxTN = testClientGetTransactionName(jmxTransactionClient);
+ assertEquals("transactionName", jmxTN);
+ }
+
+ private String testClientGetTransactionName(ConfigTransactionClient client) {
+ return client.getTransactionName();
+ }
+
+ @Ignore
+ public void testGetVersion() throws Exception {
+ long jmxVersion = jmxTransactionClient.getVersion();
+ assertNull(jmxVersion);
+ }
+
+ @Ignore
+ public void testGetParentVersion() throws Exception {
+ long jmxParentVersion = jmxTransactionClient.getParentVersion();
+ assertNull(jmxParentVersion);
+ }
+
+ @Test
+ public void testValidateConfig() throws Exception {
+ jmxTransactionClient.validateConfig();
+ }
+
+ @Test
+ public void testAbortConfig() throws Exception {
+ jmxTransactionClient.abortConfig();
+ }
+
+ @Test
+ public void testDestroyModule2() throws Exception {
+ jmxTransactionClient.destroyModule("moduleB", "instB");
+ assertNull(transactionController.conf4);
+ }
+
+ @Test
+ public void testDestroyModule() throws Exception {
+ ObjectName on = testClientCreateModule(jmxTransactionClient);
+ jmxTransactionClient.destroyModule(on);
+ }
+
+ @Test
+ public void testCreateModule() throws Exception {
+ ObjectName on = testClientCreateModule(jmxTransactionClient);
+ Assert.assertNotNull(on);
+ }
+
+ private ObjectName testClientCreateModule(ConfigTransactionClient client)
+ throws Exception {
+ return client.createModule("testModuleName", "testInstanceName");
+ }
+
+ @Ignore
+ public void testAssertVersion() {
+ jmxTransactionClient.assertVersion((int)jmxTransactionClient.getParentVersion(),
+ (int)jmxTransactionClient.getVersion());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCommit() throws Exception {
+ jmxTransactionClient.commit();
+ }
+
+ @Test
+ public void testLookupConfigBeans2() throws Exception {
+ Set<ObjectName> jmxLookup = testClientLookupConfigBeans2(
+ jmxTransactionClient, "moduleB");
+ assertEquals(Sets.newHashSet(transactionController.conf3), jmxLookup);
+ }
+
+ private Set<ObjectName> testClientLookupConfigBeans2(
+ ConfigTransactionClient client, String moduleName) {
+ Set<ObjectName> beans = client.lookupConfigBeans(moduleName);
+ assertEquals(1, beans.size());
+ return beans;
+ }
+
+ @Test
+ public void testLookupConfigBean() throws Exception {
+ Set<ObjectName> jmxLookup = testClientLookupConfigBean(
+ jmxTransactionClient, "moduleB", "instB");
+ assertEquals(Sets.newHashSet(transactionController.conf3), jmxLookup);
+ }
+
+ private Set<ObjectName> testClientLookupConfigBean(
+ ConfigTransactionClient client, String moduleName,
+ String instanceName) {
+ Set<ObjectName> beans = client.lookupConfigBeans(moduleName,
+ instanceName);
+ assertEquals(1, beans.size());
+ return beans;
+ }
+
+ @Test
+ public void testLookupConfigBeans3() throws Exception {
+ Set<ObjectName> jmxLookup = testClientLookupConfigBeans3(
+ jmxTransactionClient, "moduleB", "instB");
+ assertEquals(Sets.newHashSet(transactionController.conf3), jmxLookup);
+ }
+
+ private Set<ObjectName> testClientLookupConfigBeans3(
+ ConfigTransactionClient client, String moduleName,
+ String instanceName) {
+ Set<ObjectName> beans = client.lookupConfigBeans(moduleName,
+ instanceName);
+ assertEquals(1, beans.size());
+ return beans;
+ }
+
+ @Test
+ public void testCheckConfigBeanExists() throws Exception {
+ jmxTransactionClient.checkConfigBeanExists(transactionControllerON);
+ assertEquals("configBeanExists", transactionController.check);
+ }
+
+ @Test
+ public void testSaveServiceReference() throws Exception {
+ assertEquals(transactionControllerON, jmxTransactionClient.saveServiceReference("serviceInterfaceName", "refName", transactionControllerON));
+ }
+
+ @Test
+ public void testRemoveServiceReference() throws Exception {
+ jmxTransactionClient.removeServiceReference("serviceInterface", "refName");
+ assertEquals("refName", transactionController.check);
+ }
+
+ @Test
+ public void testRemoveAllServiceReferences() throws Exception {
+ jmxTransactionClient.removeAllServiceReferences();
+ assertNull(transactionController.check);
+ }
+
+ @Test
+ public void testLookupConfigBeanByServiceInterfaceName() throws Exception {
+ assertEquals(transactionController.conf3, jmxTransactionClient.lookupConfigBeanByServiceInterfaceName("serviceInterface", "refName"));
+ }
+
+ @Test
+ public void testGetServiceMapping() throws Exception {
+ Assert.assertNotNull(jmxTransactionClient.getServiceMapping());
+ }
+
+ @Test
+ public void testLookupServiceReferencesByServiceInterfaceName() throws Exception {
+ Assert.assertNotNull(jmxTransactionClient.lookupServiceReferencesByServiceInterfaceName("serviceInterfaceQName"));
+ }
+
+ @Test
+ public void testLookupServiceInterfaceNames() throws Exception {
+ assertEquals(Sets.newHashSet("setA"), jmxTransactionClient.lookupServiceInterfaceNames(transactionControllerON));
+ }
+
+ @Test
+ public void testGetServiceInterfaceName() throws Exception {
+ assertEquals("namespace" + "localName", jmxTransactionClient.getServiceInterfaceName("namespace", "localName"));
+ }
+
+ @Test
+ public void removeServiceReferences() throws Exception {
+ assertEquals(true, jmxTransactionClient.removeServiceReferences(transactionControllerON));
+ }
+
+ @Test
+ public void testGetServiceReference() throws Exception {
+ assertEquals(transactionController.conf3, jmxTransactionClient.getServiceReference("serviceInterfaceQName", "refName"));
+ }
+
+ @Test
+ public void testCheckServiceReferenceExists() throws Exception {
+ jmxTransactionClient.checkServiceReferenceExists(transactionControllerON);
+ assertEquals("referenceExist", transactionController.check);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testValidateBean() throws Exception {
+ jmxTransactionClient.validateBean(transactionControllerON);
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testValidateBean2() throws Exception {
+ MBeanServer mbsLocal = mock(MBeanServer.class);
+ MBeanException mBeanException = new MBeanException(new ValidationException(
+ Collections.<String, Map<String, ExceptionMessageWithStackTrace>>emptyMap()));
+ doThrow(mBeanException).when(mbsLocal).invoke(transactionControllerON, "validate", null, null);
+
+ ConfigTransactionJMXClient jmxTransactionClientFake = new ConfigTransactionJMXClient(null,
+ transactionControllerON,
+ mbsLocal);
+ jmxTransactionClientFake.validateBean(transactionControllerON);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testValidateBean3() throws Exception {
+ MBeanServer mbsLocal = mock(MBeanServer.class);
+ MBeanException mBeanException = new MBeanException(new RuntimeException());
+ doThrow(mBeanException).when(mbsLocal).invoke(transactionControllerON, "validate", null, null);
+ ConfigTransactionJMXClient jmxTransactionClientFake = new ConfigTransactionJMXClient(null,
+ transactionControllerON,
+ mbsLocal);
+ jmxTransactionClientFake.validateBean(transactionControllerON);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetAttribute() throws Exception {
+ attr = null;
+ jmxTransactionClient.setAttribute(transactionControllerON, "attrName", attr);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetAttribute() throws Exception {
+ attr = jmxTransactionClient.getAttribute(transactionController.conf3, "attrName");
+ assertNull(attr);
+ }
+
+ @Test
+ public void testGetAvailableModuleFactoryQNames() throws Exception {
+ Assert.assertNotNull(jmxTransactionClient.getAvailableModuleFactoryQNames());
+ }
}
*/
package org.opendaylight.controller.config.util;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class TestingConfigRegistry implements ConfigRegistryMXBean {
static final ObjectName conf1, conf2, conf3, run1, run2, run3;
+ public static String check;
+ public static boolean checkBool;
+ private Map<String, ObjectName> map = new HashMap<>();
public static final String moduleName1 = "moduleA";
public static final String moduleName2 = "moduleB";
public static final String instName1 = "instA";
public static final String instName2 = "instB";
+ public static final String refName1 = "refA";
+ public static final String refName2 = "refB";
+ public static final String serviceQName1 = "qnameA";
+ public static final String serviceQName2 = "qnameB";
static {
conf1 = ObjectNameUtil.createON(ObjectNameUtil.ON_DOMAIN
+ ":type=Module," + ObjectNameUtil.MODULE_FACTORY_NAME_KEY
- + "=" + moduleName1);
+ + "=" + moduleName1 + "," + ObjectNameUtil.SERVICE_QNAME_KEY
+ + "=" + serviceQName1 + "," + ObjectNameUtil.REF_NAME_KEY
+ + "=" + refName1);
conf2 = ObjectNameUtil.createON(ObjectNameUtil.ON_DOMAIN
+ ":type=Module," + ObjectNameUtil.MODULE_FACTORY_NAME_KEY
+ "=" + moduleName1 + "," + ObjectNameUtil.INSTANCE_NAME_KEY
- + "=" + instName1);
+ + "=" + instName1 + "," + ObjectNameUtil.SERVICE_QNAME_KEY
+ + "=" + serviceQName2 + "," + ObjectNameUtil.REF_NAME_KEY
+ + "=" + refName1);
conf3 = ObjectNameUtil.createON(ObjectNameUtil.ON_DOMAIN
+ ":type=Module," + ObjectNameUtil.MODULE_FACTORY_NAME_KEY
+ "=" + moduleName2 + "," + ObjectNameUtil.INSTANCE_NAME_KEY
+ ":type=RuntimeBean," + ObjectNameUtil.MODULE_FACTORY_NAME_KEY
+ "=" + moduleName2 + "," + ObjectNameUtil.INSTANCE_NAME_KEY
+ "=" + instName2);
+
+ check = null;
+ checkBool = false;
+
}
@Override
public ObjectName beginConfig() {
- return null;
+ return conf2;
}
@Override
@Override
public void checkConfigBeanExists(ObjectName objectName) throws InstanceNotFoundException {
- throw new UnsupportedOperationException();
+ Set<ObjectName> configBeans = Sets.<ObjectName> newHashSet(run1, run2, run3);
+ if(configBeans.size()>0){
+ checkBool = true;
+ }
}
@Override
public ObjectName lookupConfigBeanByServiceInterfaceName(String serviceInterfaceQName, String refName) {
- throw new UnsupportedOperationException();
+ if (serviceInterfaceQName.equals(serviceQName1) && refName.equals(refName1)) {
+ return conf1;
+ }
+ else{
+ return null;
+ }
}
@Override
public Map<String, Map<String, ObjectName>> getServiceMapping() {
- throw new UnsupportedOperationException();
+ return null;
}
@Override
public Map<String, ObjectName> lookupServiceReferencesByServiceInterfaceName(String serviceInterfaceQName) {
- throw new UnsupportedOperationException();
+
+ if(serviceInterfaceQName.equals(serviceQName1)){
+ map.put("conf1", conf1);
+ }
+ else if(serviceInterfaceQName.equals(serviceQName2)){
+ map.put("conf2", conf2);
+ }
+ else{
+ map.put("conf3", conf3);
+ }
+ return map;
}
@Override
public Set<String> lookupServiceInterfaceNames(ObjectName objectName) throws InstanceNotFoundException {
- throw new UnsupportedOperationException();
+ return Sets.<String> newHashSet(serviceQName1, serviceQName2);
}
@Override
public String getServiceInterfaceName(String namespace, String localName) {
- throw new UnsupportedOperationException();
+ return null;
}
@Override
public Set<String> getAvailableModuleFactoryQNames() {
- throw new UnsupportedOperationException();
+ return Sets.<String> newHashSet(moduleName1, moduleName2);
}
@Override
public ObjectName getServiceReference(String serviceInterfaceQName, String refName) throws InstanceNotFoundException {
- throw new UnsupportedOperationException();
+ return conf1;
}
@Override
*/
package org.opendaylight.controller.config.util;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
ConfigTransactionControllerMXBean {
public final ObjectName conf1, conf2, conf3;
+ public ObjectName conf4;
+ public String check;
+ Map<String, ObjectName> mapSub;
+ Map<String, Map<String, ObjectName>> map;
public static final String moduleName1 = "moduleA";
public static final String moduleName2 = "moduleB";
+ ":type=Module," + ObjectNameUtil.MODULE_FACTORY_NAME_KEY
+ "=" + moduleName2 + "," + ObjectNameUtil.INSTANCE_NAME_KEY
+ "=" + instName2);
+ conf4 = ObjectNameUtil.createON(ObjectNameUtil.ON_DOMAIN
+ + ":type=Module," + ObjectNameUtil.MODULE_FACTORY_NAME_KEY
+ + "=" + moduleName2 + "," + ObjectNameUtil.INSTANCE_NAME_KEY
+ + "=" + instName2);
+ mapSub = new HashMap<String, ObjectName>();
+ map = new HashMap<String, Map<String,ObjectName>>();
}
@Override
public ObjectName createModule(String moduleName, String instanceName)
throws InstanceAlreadyExistsException {
- return null;
+ //return null;
+ return ObjectNameUtil.createON(ObjectNameUtil.ON_DOMAIN
+ + ":type=Module," + ObjectNameUtil.MODULE_FACTORY_NAME_KEY
+ + "=" + moduleName);
}
@Override
public void destroyModule(ObjectName objectName)
throws InstanceNotFoundException {
+ if(objectName != null){
+ conf4 = null;
+ }
}
@Override
@Override
public String getTransactionName() {
- return null;
+ //return null;
+ return "transactionName";
}
@Override
@Override
public void checkConfigBeanExists(ObjectName objectName) throws InstanceNotFoundException {
- throw new UnsupportedOperationException();
+ check = "configBeanExists";
}
@Override
public ObjectName saveServiceReference(String serviceInterfaceName, String refName, ObjectName moduleON) throws InstanceNotFoundException {
- throw new UnsupportedOperationException();
+ return moduleON;
}
@Override
public void removeServiceReference(String serviceInterfaceName, String refName) {
- throw new UnsupportedOperationException();
+ check = refName;
}
@Override
public void removeAllServiceReferences() {
- throw new UnsupportedOperationException();
+ check = null;
}
@Override
public ObjectName lookupConfigBeanByServiceInterfaceName(String serviceInterfaceQName, String refName) {
- throw new UnsupportedOperationException();
+ return conf3;
}
@Override
public Map<String, Map<String, ObjectName>> getServiceMapping() {
- throw new UnsupportedOperationException();
+ mapSub.put("A",conf2);
+ map.put("AA", mapSub);
+ return map;
}
@Override
public Map<String, ObjectName> lookupServiceReferencesByServiceInterfaceName(String serviceInterfaceQName) {
- throw new UnsupportedOperationException();
+ mapSub.put("A",conf2);
+ return mapSub;
}
@Override
public Set<String> lookupServiceInterfaceNames(ObjectName objectName) throws InstanceNotFoundException {
- throw new UnsupportedOperationException();
+ return Sets.newHashSet("setA");
}
@Override
public String getServiceInterfaceName(String namespace, String localName) {
- throw new UnsupportedOperationException();
+ return check=namespace+localName;
}
@Override
public boolean removeServiceReferences(ObjectName objectName) throws InstanceNotFoundException {
- throw new UnsupportedOperationException();
+ return true;
}
@Override
public Set<String> getAvailableModuleFactoryQNames() {
- throw new UnsupportedOperationException();
+ return Sets.newHashSet("availableModuleFactoryQNames");
}
@Override
public ObjectName getServiceReference(String serviceInterfaceQName, String refName) throws InstanceNotFoundException {
- throw new UnsupportedOperationException();
+ return conf3;
}
@Override
public void checkServiceReferenceExists(ObjectName objectName) throws InstanceNotFoundException {
- throw new UnsupportedOperationException();
+ check = "referenceExist";
}
}
enableStrongPasswordCheck = false
#Jolokia configurations
-org.jolokia.listenForHttpService=false
+#org.jolokia.listenForHttpService=false
# Logging configuration for Tomcat-JUL logging
java.util.logging.config.file=configuration/tomcat-logging.properties
applySnapshot(ByteString.copyFrom(snapshot.getState()));
} else if (message instanceof ReplicatedLogEntry) {
- replicatedLog.append((ReplicatedLogEntry) message);
+ ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
+
+ // Apply State immediately
+ replicatedLog.append(logEntry);
+ applyState(null, "recovery", logEntry.getData());
+ context.setLastApplied(logEntry.getIndex());
+ context.setCommitIndex(logEntry.getIndex());
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
} else if (message instanceof UpdateElectionTerm) {
} else if (message instanceof RecoveryCompleted) {
LOG.debug(
"RecoveryCompleted - Switching actor to Follower - " +
- "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+ "Persistence Id = " + persistenceId() +
+ " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
"journal-size={}",
replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
replicatedLog.snapshotTerm, replicatedLog.size());
</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-osgi_${scala.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_${scala.version}</artifactId>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
--- /dev/null
+package org.opendaylight.controller.cluster.common.actor;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractConfig implements UnifiedConfig {
+
+ private Config config;
+
+ public AbstractConfig(Config config){
+ this.config = config;
+ }
+
+ @Override
+ public Config get() {
+ return config;
+ }
+
+ public static abstract class Builder<T extends Builder>{
+ protected Map<String, Object> configHolder;
+ protected Config fallback;
+
+ private final String actorSystemName;
+
+ public Builder(String actorSystemName){
+ Preconditions.checkArgument(actorSystemName != null, "Actor system name must not be null");
+ this.actorSystemName = actorSystemName;
+ configHolder = new HashMap<>();
+ }
+
+ public T withConfigReader(AkkaConfigurationReader reader){
+ fallback = reader.read().getConfig(actorSystemName);
+ return (T)this;
+ }
+
+ protected Config merge(){
+ if (fallback == null)
+ fallback = ConfigFactory.load().getConfig(actorSystemName);
+
+ return ConfigFactory.parseMap(configHolder).withFallback(fallback);
+ }
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.cluster.datastore;
+package org.opendaylight.controller.cluster.common.actor;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.Monitor;
public abstract class AbstractUntypedActor extends UntypedActor {
protected final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
-
public AbstractUntypedActor() {
LOG.debug("Actor created {}", getSelf());
getContext().
system().
actorSelection("user/termination-monitor").
tell(new Monitor(getSelf()), getSelf());
+
}
@Override public void onReceive(Object message) throws Exception {
- LOG.debug("Received message {}", message.getClass().getSimpleName());
+ final String messageType = message.getClass().getSimpleName();
+ LOG.debug("Received message {}", messageType);
+
handleReceive(message);
- LOG.debug("Done handling message {}",
- message.getClass().getSimpleName());
+
+ LOG.debug("Done handling message {}", messageType);
}
protected abstract void handleReceive(Object message) throws Exception;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+/**
+ * Actor with its behaviour metered. Metering is enabled by configuration.
+ */
+public abstract class AbstractUntypedActorWithMetering extends AbstractUntypedActor {
+
+ public AbstractUntypedActorWithMetering() {
+ if (isMetricsCaptureEnabled())
+ getContext().become(new MeteringBehavior(this));
+ }
+
+ private boolean isMetricsCaptureEnabled(){
+ CommonConfig config = new CommonConfig(getContext().system().settings().config());
+ return config.isMetricCaptureEnabled();
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.utils;
+package org.opendaylight.controller.cluster.common.actor;
import com.typesafe.config.Config;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CommonConfig extends AbstractConfig {
+
+ protected static final String TAG_ACTOR_SYSTEM_NAME = "actor-system-name";
+ protected static final String TAG_METRIC_CAPTURE_ENABLED = "metric-capture-enabled";
+ protected static final String TAG_MAILBOX_CAPACITY = "mailbox-capacity";
+ protected static final String TAG_MAILBOX = "bounded-mailbox";
+ protected static final String TAG_MAILBOX_PUSH_TIMEOUT = "mailbox-push-timeout-time";
+
+ //TODO: Ideally these defaults should go to reference.conf
+ // https://bugs.opendaylight.org/show_bug.cgi?id=1709
+ private static final int DEFAULT_MAILBOX_CAPACITY = 1000;
+ private static final int DEFAULT_MAILBOX_PUSH_TIMEOUT = 100;
+
+ //locally cached values
+ private FiniteDuration cachedMailBoxPushTimeout;
+ private Integer cachedMailBoxCapacity;
+ private Boolean cachedMetricCaptureEnableFlag;
+
+ public CommonConfig(Config config) {
+ super(config);
+ }
+
+ public String getActorSystemName() {
+ return get().getString(TAG_ACTOR_SYSTEM_NAME);
+ }
+
+ public boolean isMetricCaptureEnabled(){
+ if (cachedMetricCaptureEnableFlag != null){
+ return cachedMetricCaptureEnableFlag;
+ }
+
+ cachedMetricCaptureEnableFlag = get().hasPath(TAG_METRIC_CAPTURE_ENABLED)
+ ? get().getBoolean(TAG_METRIC_CAPTURE_ENABLED)
+ : false;
+
+ return cachedMetricCaptureEnableFlag;
+ }
+
+ public String getMailBoxName() {
+ return TAG_MAILBOX;
+ }
+
+ public Integer getMailBoxCapacity() {
+
+ if (cachedMailBoxCapacity != null) {
+ return cachedMailBoxCapacity;
+ }
+
+ final String PATH = new StringBuilder(TAG_MAILBOX).append(".").append(TAG_MAILBOX_CAPACITY).toString();
+ cachedMailBoxCapacity = get().hasPath(PATH)
+ ? get().getInt(PATH)
+ : DEFAULT_MAILBOX_CAPACITY;
+
+ return cachedMailBoxCapacity;
+ }
+
+ public FiniteDuration getMailBoxPushTimeout() {
+
+ if (cachedMailBoxPushTimeout != null) {
+ return cachedMailBoxPushTimeout;
+ }
+
+ final String PATH = new StringBuilder(TAG_MAILBOX).append(".").append(TAG_MAILBOX_PUSH_TIMEOUT).toString();
+
+ long timeout = get().hasPath(PATH)
+ ? get().getDuration(PATH, TimeUnit.NANOSECONDS)
+ : DEFAULT_MAILBOX_PUSH_TIMEOUT;
+
+ cachedMailBoxPushTimeout = new FiniteDuration(timeout, TimeUnit.NANOSECONDS);
+ return cachedMailBoxPushTimeout;
+ }
+
+ public static class Builder<T extends Builder> extends AbstractConfig.Builder<T>{
+
+ public Builder(String actorSystemName) {
+ super(actorSystemName);
+
+ //actor system config
+ configHolder.put(TAG_ACTOR_SYSTEM_NAME, actorSystemName);
+
+ //config for bounded mailbox
+ configHolder.put(TAG_MAILBOX, new HashMap<String, Object>());
+ }
+
+ public T metricCaptureEnabled(boolean enabled) {
+ configHolder.put(TAG_METRIC_CAPTURE_ENABLED, String.valueOf(enabled));
+ return (T)this;
+ }
+
+ public T mailboxCapacity(int capacity) {
+ Preconditions.checkArgument(capacity > 0, "mailbox capacity must be >0");
+
+ Map<String, Object> boundedMailbox = (Map) configHolder.get(TAG_MAILBOX);
+ boundedMailbox.put(TAG_MAILBOX_CAPACITY, capacity);
+ return (T)this;
+ }
+
+ public T mailboxPushTimeout(String timeout){
+ Duration pushTimeout = Duration.create(timeout);
+ Preconditions.checkArgument(pushTimeout.isFinite(), "invalid value for mailbox push timeout");
+
+ Map<String, Object> boundedMailbox = (Map) configHolder.get(TAG_MAILBOX);
+ boundedMailbox.put(TAG_MAILBOX_PUSH_TIMEOUT, timeout);
+ return (T)this;
+ }
+
+ public CommonConfig build() {
+ return new CommonConfig(merge());
+ }
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.utils;
+package org.opendaylight.controller.cluster.common.actor;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.BoundedDequeBasedMailbox;
+import akka.dispatch.MailboxType;
+import akka.dispatch.ProducesMessageQueue;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
+
+ private final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
+
+ private MeteredMessageQueue queue;
+ private Integer capacity;
+ private FiniteDuration pushTimeOut;
+ private MetricRegistry registry;
+
+ private final String QUEUE_SIZE = "q-size";
+
+ public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
+
+ CommonConfig commonConfig = new CommonConfig(settings.config());
+ this.capacity = commonConfig.getMailBoxCapacity();
+ this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
+
+ MetricsReporter reporter = MetricsReporter.getInstance();
+ registry = reporter.getMetricsRegistry();
+ }
+
+
+ @Override
+ public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+ this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
+ monitorQueueSize(owner, this.queue);
+ return this.queue;
+ }
+
+ private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+ if (owner.isEmpty()) {
+ return; //there's no actor to monitor
+ }
+ String actorName = owner.get().path().toStringWithoutAddress();
+ String metricName = registry.name(actorName, QUEUE_SIZE);
+
+ if (registry.getMetrics().containsKey(metricName))
+ return; //already registered
+
+ Gauge queueSize = getQueueSizeGuage(monitoredQueue);
+ registerQueueSizeMetric(metricName, queueSize);
+ }
+
+
+ public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
+
+ public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
+ super(capacity, pushTimeOut);
+ }
+ }
+
+ private Gauge getQueueSizeGuage(final MeteredMessageQueue monitoredQueue ){
+ return new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return monitoredQueue.size();
+ }
+ };
+ }
+
+ private void registerQueueSizeMetric(String metricName, Gauge metric){
+ try {
+ registry.register(metricName,metric);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e);
+ }
+ }
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.UntypedActor;
+import akka.japi.Procedure;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+
+/**
+ * Represents behaviour that can be exhibited by actors of type {@link akka.actor.UntypedActor}
+ * <p/>
+ * This behaviour meters actor's default behaviour. It captures 2 metrics:
+ * <ul>
+ * <li>message processing rate of actor's receive block</li>
+ * <li>message processing rate by message type</li>
+ * </ul>
+ *
+ * The information is reported to {@link org.opendaylight.controller.cluster.reporting.MetricsReporter}
+ */
+public class MeteringBehavior implements Procedure<Object> {
+
+ private final UntypedActor meteredActor;
+
+ private final MetricRegistry METRICREGISTRY = MetricsReporter.getInstance().getMetricsRegistry();
+ private final String MSG_PROCESSING_RATE = "msg-rate";
+
+ private String actorName;
+ private Timer msgProcessingTimer;
+
+ /**
+ *
+ * @param actor whose behaviour needs to be metered
+ */
+ public MeteringBehavior(UntypedActor actor){
+ Preconditions.checkArgument(actor != null, "actor must not be null");
+
+ this.meteredActor = actor;
+ actorName = meteredActor.getSelf().path().toStringWithoutAddress();
+ final String msgProcessingTime = MetricRegistry.name(actorName, MSG_PROCESSING_RATE);
+ msgProcessingTimer = METRICREGISTRY.timer(msgProcessingTime);
+ }
+
+ /**
+ * Uses 2 timers to measure message processing rate. One for overall message processing rate and
+ * another to measure rate by message type. The timers are re-used if they were previously created.
+ * <p/>
+ * {@link com.codahale.metrics.MetricRegistry} maintains a reservoir for different timers where
+ * collected timings are kept. It exposes various metrics for each timer based on collected
+ * data. Eg: count of messages, 99, 95, 50... percentiles, max, mean etc.
+ * <p/>
+ * These metrics are exposed as JMX bean.
+ *
+ * @see <a href="http://dropwizard.github.io/metrics/manual/core/#timers">
+ * http://dropwizard.github.io/metrics/manual/core/#timers</a>
+ *
+ * @param message
+ * @throws Exception
+ */
+ @Override
+ public void apply(Object message) throws Exception {
+ final String messageType = message.getClass().getSimpleName();
+
+ final String msgProcessingTimeByMsgType =
+ MetricRegistry.name(actorName, MSG_PROCESSING_RATE, messageType);
+
+ final Timer msgProcessingTimerByMsgType = METRICREGISTRY.timer(msgProcessingTimeByMsgType);
+
+ //start timers
+ final Timer.Context context = msgProcessingTimer.time();
+ final Timer.Context contextByMsgType = msgProcessingTimerByMsgType.time();
+
+ meteredActor.onReceive(message);
+
+ //stop timers
+ contextByMsgType.stop();
+ context.stop();
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.messages;
+package org.opendaylight.controller.cluster.common.actor;
import akka.actor.ActorRef;
private final ActorRef actorRef;
public Monitor(ActorRef actorRef){
-
this.actorRef = actorRef;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import com.typesafe.config.Config;
+
+/**
+ * Represents a unified view of configuration.
+ * <p/>
+ * It merges configuration from:
+ * <ul>
+ * <li>Config subsystem</li>
+ * <li>Akka configuration files</li>
+ * </ul>
+ *
+ * Configurations defined in config subsystem takes precedence.
+ */
+public interface UnifiedConfig {
+
+ /**
+ * Returns an immutable instance of unified configuration
+ * @return
+ */
+ public Config get();
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.common.reporting;
+package org.opendaylight.controller.cluster.reporting;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
public class MetricsReporter implements AutoCloseable{
private final MetricRegistry METRICS_REGISTRY = new MetricRegistry();
- private final String DOMAIN = "org.opendaylight.controller";
+ private final String DOMAIN = "org.opendaylight.controller.actor.metric";
public final JmxReporter jmxReporter = JmxReporter.forRegistry(METRICS_REGISTRY).inDomain(DOMAIN).build();
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.common.actor;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.BoundedDequeBasedMailbox;
-import akka.dispatch.MailboxType;
-import akka.dispatch.ProducesMessageQueue;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.opendaylight.controller.common.reporting.MetricsReporter;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
-
- private MeteredMessageQueue queue;
- private Integer capacity;
- private FiniteDuration pushTimeOut;
- private ActorPath actorPath;
- private MetricsReporter reporter;
-
- private final String QUEUE_SIZE = "queue-size";
- private final String CAPACITY = "mailbox-capacity";
- private final String TIMEOUT = "mailbox-push-timeout-time";
- private final Long DEFAULT_TIMEOUT = 10L;
-
- public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
- Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" );
- this.capacity = config.getInt(CAPACITY);
- Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0");
-
- Long timeout = -1L;
- if ( config.hasPath(TIMEOUT) ){
- timeout = config.getDuration(TIMEOUT, TimeUnit.NANOSECONDS);
- } else {
- timeout = DEFAULT_TIMEOUT;
- }
- Preconditions.checkArgument( timeout > 0, "mailbox-push-timeout-time must be > 0");
- this.pushTimeOut = new FiniteDuration(timeout, TimeUnit.NANOSECONDS);
-
- reporter = MetricsReporter.getInstance();
- }
-
-
- @Override
- public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
- this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
- monitorQueueSize(owner, this.queue);
- return this.queue;
- }
-
- private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
- if (owner.isEmpty()) {
- return; //there's no actor to monitor
- }
- actorPath = owner.get().path();
- String actorInstanceId = Integer.toString(owner.get().hashCode());
-
- MetricRegistry registry = reporter.getMetricsRegistry();
- String actorName = registry.name(actorPath.toString(), actorInstanceId, QUEUE_SIZE);
-
- if (registry.getMetrics().containsKey(actorName))
- return; //already registered
-
- registry.register(actorName,
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return monitoredQueue.size();
- }
- });
- }
-
-
- public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
-
- public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
- super(capacity, pushTimeOut);
- }
- }
-
-}
-
}
public interface CloseTransactionChainOrBuilder
extends com.google.protobuf.MessageOrBuilder {
+
+ // optional string transactionChainId = 1;
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ boolean hasTransactionChainId();
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ java.lang.String getTransactionChainId();
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ com.google.protobuf.ByteString
+ getTransactionChainIdBytes();
}
/**
* Protobuf type {@code org.opendaylight.controller.mdsal.CloseTransactionChain}
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
+ int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
}
break;
}
+ case 10: {
+ bitField0_ |= 0x00000001;
+ transactionChainId_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
return PARSER;
}
+ private int bitField0_;
+ // optional string transactionChainId = 1;
+ public static final int TRANSACTIONCHAINID_FIELD_NUMBER = 1;
+ private java.lang.Object transactionChainId_;
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ public boolean hasTransactionChainId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ public java.lang.String getTransactionChainId() {
+ java.lang.Object ref = transactionChainId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ transactionChainId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ public com.google.protobuf.ByteString
+ getTransactionChainIdBytes() {
+ java.lang.Object ref = transactionChainId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ transactionChainId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
+ transactionChainId_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, getTransactionChainIdBytes());
+ }
getUnknownFields().writeTo(output);
}
if (size != -1) return size;
size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, getTransactionChainIdBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
public Builder clear() {
super.clear();
+ transactionChainId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
public org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain buildPartial() {
org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain result = new org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.transactionChainId_ = transactionChainId_;
+ result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain other) {
if (other == org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain.getDefaultInstance()) return this;
+ if (other.hasTransactionChainId()) {
+ bitField0_ |= 0x00000001;
+ transactionChainId_ = other.transactionChainId_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
}
return this;
}
+ private int bitField0_;
+
+ // optional string transactionChainId = 1;
+ private java.lang.Object transactionChainId_ = "";
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ public boolean hasTransactionChainId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ public java.lang.String getTransactionChainId() {
+ java.lang.Object ref = transactionChainId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ transactionChainId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ public com.google.protobuf.ByteString
+ getTransactionChainIdBytes() {
+ java.lang.Object ref = transactionChainId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ transactionChainId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ public Builder setTransactionChainId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ transactionChainId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ public Builder clearTransactionChainId() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ transactionChainId_ = getDefaultInstance().getTransactionChainId();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string transactionChainId = 1;</code>
+ */
+ public Builder setTransactionChainIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ transactionChainId_ = value;
+ onChanged();
+ return this;
+ }
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CloseTransactionChain)
}
static {
java.lang.String[] descriptorData = {
"\n\033ShardTransactionChain.proto\022!org.opend" +
- "aylight.controller.mdsal\"\027\n\025CloseTransac" +
- "tionChain\"\034\n\032CloseTransactionChainReply\"" +
- "\030\n\026CreateTransactionChain\";\n\033CreateTrans" +
- "actionChainReply\022\034\n\024transactionChainPath" +
- "\030\001 \002(\tB[\n:org.opendaylight.controller.pr" +
- "otobuff.messages.transactionB\035ShardTrans" +
- "actionChainMessages"
+ "aylight.controller.mdsal\"3\n\025CloseTransac" +
+ "tionChain\022\032\n\022transactionChainId\030\001 \001(\t\"\034\n" +
+ "\032CloseTransactionChainReply\"\030\n\026CreateTra" +
+ "nsactionChain\";\n\033CreateTransactionChainR" +
+ "eply\022\034\n\024transactionChainPath\030\001 \002(\tB[\n:or" +
+ "g.opendaylight.controller.protobuff.mess" +
+ "ages.transactionB\035ShardTransactionChainM" +
+ "essages"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_mdsal_CloseTransactionChain_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_mdsal_CloseTransactionChain_descriptor,
- new java.lang.String[] { });
+ new java.lang.String[] { "TransactionChainId", });
internal_static_org_opendaylight_controller_mdsal_CloseTransactionChainReply_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_org_opendaylight_controller_mdsal_CloseTransactionChainReply_fieldAccessorTable = new
* <code>required int32 transactionType = 2;</code>
*/
int getTransactionType();
+
+ // optional string transactionChainId = 3;
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ boolean hasTransactionChainId();
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ java.lang.String getTransactionChainId();
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ com.google.protobuf.ByteString
+ getTransactionChainIdBytes();
}
/**
* Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransaction}
transactionType_ = input.readInt32();
break;
}
+ case 26: {
+ bitField0_ |= 0x00000004;
+ transactionChainId_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
return transactionType_;
}
+ // optional string transactionChainId = 3;
+ public static final int TRANSACTIONCHAINID_FIELD_NUMBER = 3;
+ private java.lang.Object transactionChainId_;
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ public boolean hasTransactionChainId() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ public java.lang.String getTransactionChainId() {
+ java.lang.Object ref = transactionChainId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ transactionChainId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ public com.google.protobuf.ByteString
+ getTransactionChainIdBytes() {
+ java.lang.Object ref = transactionChainId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ transactionChainId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
transactionId_ = "";
transactionType_ = 0;
+ transactionChainId_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeInt32(2, transactionType_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(3, getTransactionChainIdBytes());
+ }
getUnknownFields().writeTo(output);
}
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(2, transactionType_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(3, getTransactionChainIdBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000001);
transactionType_ = 0;
bitField0_ = (bitField0_ & ~0x00000002);
+ transactionChainId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
to_bitField0_ |= 0x00000002;
}
result.transactionType_ = transactionType_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.transactionChainId_ = transactionChainId_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
if (other.hasTransactionType()) {
setTransactionType(other.getTransactionType());
}
+ if (other.hasTransactionChainId()) {
+ bitField0_ |= 0x00000004;
+ transactionChainId_ = other.transactionChainId_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional string transactionChainId = 3;
+ private java.lang.Object transactionChainId_ = "";
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ public boolean hasTransactionChainId() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ public java.lang.String getTransactionChainId() {
+ java.lang.Object ref = transactionChainId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ transactionChainId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ public com.google.protobuf.ByteString
+ getTransactionChainIdBytes() {
+ java.lang.Object ref = transactionChainId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ transactionChainId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ public Builder setTransactionChainId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ transactionChainId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ public Builder clearTransactionChainId() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ transactionChainId_ = getDefaultInstance().getTransactionChainId();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string transactionChainId = 3;</code>
+ */
+ public Builder setTransactionChainIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ transactionChainId_ = value;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransaction)
}
java.lang.String[] descriptorData = {
"\n\026ShardTransaction.proto\022!org.opendaylig" +
"ht.controller.mdsal\032\014Common.proto\"\022\n\020Clo" +
- "seTransaction\"\027\n\025CloseTransactionReply\"C" +
+ "seTransaction\"\027\n\025CloseTransactionReply\"_" +
"\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" +
- "(\t\022\027\n\017transactionType\030\002 \002(\005\"M\n\026CreateTra" +
- "nsactionReply\022\034\n\024transactionActorPath\030\001 " +
- "\002(\t\022\025\n\rtransactionId\030\002 \002(\t\"\022\n\020ReadyTrans" +
- "action\"*\n\025ReadyTransactionReply\022\021\n\tactor" +
- "Path\030\001 \002(\t\"l\n\nDeleteData\022^\n\037instanceIden" +
- "tifierPathArguments\030\001 \002(\01325.org.opendayl",
- "ight.controller.mdsal.InstanceIdentifier" +
- "\"\021\n\017DeleteDataReply\"j\n\010ReadData\022^\n\037insta" +
- "nceIdentifierPathArguments\030\001 \002(\01325.org.o" +
- "pendaylight.controller.mdsal.InstanceIde" +
- "ntifier\"P\n\rReadDataReply\022?\n\016normalizedNo" +
- "de\030\001 \001(\0132\'.org.opendaylight.controller.m" +
- "dsal.Node\"\254\001\n\tWriteData\022^\n\037instanceIdent" +
- "ifierPathArguments\030\001 \002(\01325.org.opendayli" +
- "ght.controller.mdsal.InstanceIdentifier\022" +
- "?\n\016normalizedNode\030\002 \002(\0132\'.org.opendaylig",
- "ht.controller.mdsal.Node\"\020\n\016WriteDataRep" +
- "ly\"\254\001\n\tMergeData\022^\n\037instanceIdentifierPa" +
- "thArguments\030\001 \002(\01325.org.opendaylight.con" +
- "troller.mdsal.InstanceIdentifier\022?\n\016norm" +
- "alizedNode\030\002 \002(\0132\'.org.opendaylight.cont" +
- "roller.mdsal.Node\"\020\n\016MergeDataReply\"l\n\nD" +
- "ataExists\022^\n\037instanceIdentifierPathArgum" +
- "ents\030\001 \002(\01325.org.opendaylight.controller" +
- ".mdsal.InstanceIdentifier\"!\n\017DataExistsR" +
- "eply\022\016\n\006exists\030\001 \002(\010BV\n:org.opendaylight",
- ".controller.protobuff.messages.transacti" +
- "onB\030ShardTransactionMessages"
+ "(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" +
+ "onChainId\030\003 \001(\t\"M\n\026CreateTransactionRepl" +
+ "y\022\034\n\024transactionActorPath\030\001 \002(\t\022\025\n\rtrans" +
+ "actionId\030\002 \002(\t\"\022\n\020ReadyTransaction\"*\n\025Re" +
+ "adyTransactionReply\022\021\n\tactorPath\030\001 \002(\t\"l" +
+ "\n\nDeleteData\022^\n\037instanceIdentifierPathAr",
+ "guments\030\001 \002(\01325.org.opendaylight.control" +
+ "ler.mdsal.InstanceIdentifier\"\021\n\017DeleteDa" +
+ "taReply\"j\n\010ReadData\022^\n\037instanceIdentifie" +
+ "rPathArguments\030\001 \002(\01325.org.opendaylight." +
+ "controller.mdsal.InstanceIdentifier\"P\n\rR" +
+ "eadDataReply\022?\n\016normalizedNode\030\001 \001(\0132\'.o" +
+ "rg.opendaylight.controller.mdsal.Node\"\254\001" +
+ "\n\tWriteData\022^\n\037instanceIdentifierPathArg" +
+ "uments\030\001 \002(\01325.org.opendaylight.controll" +
+ "er.mdsal.InstanceIdentifier\022?\n\016normalize",
+ "dNode\030\002 \002(\0132\'.org.opendaylight.controlle" +
+ "r.mdsal.Node\"\020\n\016WriteDataReply\"\254\001\n\tMerge" +
+ "Data\022^\n\037instanceIdentifierPathArguments\030" +
+ "\001 \002(\01325.org.opendaylight.controller.mdsa" +
+ "l.InstanceIdentifier\022?\n\016normalizedNode\030\002" +
+ " \002(\0132\'.org.opendaylight.controller.mdsal" +
+ ".Node\"\020\n\016MergeDataReply\"l\n\nDataExists\022^\n" +
+ "\037instanceIdentifierPathArguments\030\001 \002(\01325" +
+ ".org.opendaylight.controller.mdsal.Insta" +
+ "nceIdentifier\"!\n\017DataExistsReply\022\016\n\006exis",
+ "ts\030\001 \002(\010BV\n:org.opendaylight.controller." +
+ "protobuff.messages.transactionB\030ShardTra" +
+ "nsactionMessages"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_mdsal_CreateTransaction_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_mdsal_CreateTransaction_descriptor,
- new java.lang.String[] { "TransactionId", "TransactionType", });
+ new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", });
internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor =
getDescriptor().getMessageTypes().get(3);
internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new
message CreateTransaction{
required string transactionId = 1;
required int32 transactionType =2;
+ optional string transactionChainId = 3;
}
message CreateTransactionReply{
option java_outer_classname = "ShardTransactionChainMessages";
message CloseTransactionChain {
-
+ optional string transactionChainId = 1;
}
message CloseTransactionChainReply{
-
-}
-
-message CreateTransactionChain {
-
-}
-
-message CreateTransactionChainReply{
-required string transactionChainPath = 1;
-
}
--- /dev/null
+package org.opendaylight.controller.cluster.common.actor;
+
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CommonConfigTest {
+
+ @Test
+ public void testCommonConfigDefaults(){
+ CommonConfig config = new CommonConfig.Builder<>("testsystem").build();
+
+ assertNotNull(config.getActorSystemName());
+ assertNotNull(config.getMailBoxCapacity());
+ assertNotNull(config.getMailBoxName());
+ assertNotNull(config.getMailBoxPushTimeout());
+ assertNotNull(config.isMetricCaptureEnabled());
+ }
+
+ @Test
+ public void testCommonConfigOverride(){
+
+ int expectedCapacity = 123;
+ String timeoutValue = "1000ms";
+ CommonConfig config = new CommonConfig.Builder<>("testsystem")
+ .mailboxCapacity(expectedCapacity)
+ .mailboxPushTimeout(timeoutValue)
+ .metricCaptureEnabled(true)
+ .build();
+
+ assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+
+ FiniteDuration expectedTimeout = FiniteDuration.create(1000, TimeUnit.MILLISECONDS);
+ assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+
+ assertTrue(config.isMetricCaptureEnabled());
+ }
+}
\ No newline at end of file
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.common.actor;
+package org.opendaylight.controller.cluster.common.actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
public class MeteredBoundedMailboxTest {
private static ActorSystem actorSystem;
+ private static CommonConfig config;
private final ReentrantLock lock = new ReentrantLock();
@Before
public void setUp() throws Exception {
- actorSystem = ActorSystem.create("testsystem");
+ config = new CommonConfig.Builder<>("testsystem").build();
+ actorSystem = ActorSystem.create("testsystem", config.get());
}
@After
}
@Test
- public void test_WhenQueueIsFull_ShouldSendMsgToDeadLetter() throws InterruptedException {
+ public void shouldSendMsgToDeadLetterWhenQueueIsFull() throws InterruptedException {
final JavaTestKit mockReceiver = new JavaTestKit(actorSystem);
actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class);
final FiniteDuration TWENTY_SEC = new FiniteDuration(20, TimeUnit.SECONDS);
- String boundedMailBox = actorSystem.name() + ".bounded-mailbox";
- ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(boundedMailBox),
+ ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(config.getMailBoxName()),
"pingpongactor");
actorSystem.mailboxes().settings();
testsystem {
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 10
mailbox-push-timeout-time = 100ms
}
-testsystem {
-
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
-}
\ No newline at end of file
<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
<name>dom-broker</name>
</dom-broker>
+ <enable-metric-capture xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">true</enable-metric-capture>
+ <actor-system-name xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">odl-cluster-rpc</actor-system-name>
+ <bounded-mailbox-capacity xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">1000</bounded-mailbox-capacity>
</module>
</modules>
odl-cluster-data {
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
- }
+ }
+
+ metric-capture-enabled = true
+
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
odl-cluster-rpc {
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
+
+ metric-capture-enabled = true
+
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"]
+ seed-nodes = ["akka.tcp://odl-cluster-rpc@127.0.0.1:2551"]
auto-down-unreachable-after = 10s
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.osgi.BundleDelegatingClassLoader;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.osgi.framework.BundleContext;
-
-import java.io.File;
-
-public class ActorSystemFactory {
-
- public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
- public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
- public static final String CONFIGURATION_NAME = "odl-cluster-data";
-
- private static volatile ActorSystem actorSystem = null;
-
- public static final ActorSystem getInstance(){
- return actorSystem;
- }
-
- /**
- * This method should be called only once during initialization
- *
- * @param bundleContext
- */
- public static final ActorSystem createInstance(final BundleContext bundleContext) {
- if(actorSystem == null) {
- // Create an OSGi bundle classloader for actor system
- BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
- Thread.currentThread().getContextClassLoader());
- synchronized (ActorSystemFactory.class) {
- // Double check
-
- if (actorSystem == null) {
- ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
- ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
- system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
- actorSystem = system;
- }
- }
- }
-
- return actorSystem;
- }
-
-
- private static final Config readAkkaConfiguration(){
- File defaultConfigFile = new File(AKKA_CONF_PATH);
- Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
- return ConfigFactory.parseFile(defaultConfigFile);
- }
-}
import java.util.List;
import java.util.Map;
+import java.util.Set;
public interface Configuration {
* @return
*/
List<String> getMembersFromShardName(String shardName);
+
+ /**
+ *
+ * @return
+ */
+ Set<String> getAllShardNames();
}
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class ConfigurationImpl implements Configuration {
return Collections.EMPTY_LIST;
}
+ @Override public Set<String> getAllShardNames() {
+ Set<String> shardNames = new LinkedHashSet<>();
+ for(ModuleShard ms : moduleShards){
+ for(Shard s : ms.getShards()) {
+ shardNames.add(s.getName());
+ }
+ }
+ return shardNames;
+ }
+
private void readModules(Config modulesConfig) {
import akka.japi.Creator;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.japi.Creator;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
* </p>
*/
public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
- private final ActorSelection listenerRegistrationActor;
+ private volatile ActorSelection listenerRegistrationActor;
private final AsyncDataChangeListener listener;
private final ActorRef dataChangeListenerActor;
+ private boolean closed = false;
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
DataChangeListenerRegistrationProxy(
this.dataChangeListenerActor = dataChangeListenerActor;
}
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ DataChangeListenerRegistrationProxy(
+ L listener, ActorRef dataChangeListenerActor) {
+ this(null, listener, dataChangeListenerActor);
+ }
+
@Override
public Object getInstance() {
return listener;
}
+ public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+ boolean sendCloseMessage = false;
+ synchronized(this) {
+ if(closed) {
+ sendCloseMessage = true;
+ } else {
+ this.listenerRegistrationActor = listenerRegistrationActor;
+ }
+ }
+ if(sendCloseMessage) {
+ listenerRegistrationActor.tell(new
+ CloseDataChangeListenerRegistration().toSerializable(), null);
+ }
+
+ this.listenerRegistrationActor = listenerRegistrationActor;
+ }
+
+ public ActorSelection getListenerRegistrationActor() {
+ return listenerRegistrationActor;
+ }
+
@Override
public void close() {
- listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(), null);
+
+ boolean sendCloseMessage;
+ synchronized(this) {
+ sendCloseMessage = !closed && listenerRegistrationActor != null;
+ closed = true;
+ }
+ if(sendCloseMessage) {
+ listenerRegistrationActor.tell(new
+ CloseDataChangeListenerRegistration().toSerializable(), null);
+ }
+
dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
import com.google.common.base.Preconditions;
-
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
/**
*
public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+ public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout
private final ActorContext actorContext;
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
ListenerRegistration<L> registerChangeListener(
- YangInstanceIdentifier path, L listener,
+ final YangInstanceIdentifier path, L listener,
AsyncDataBroker.DataChangeScope scope) {
Preconditions.checkNotNull(path, "path should not be null");
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
- Object result = actorContext.executeLocalShardOperation(shardName,
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope));
-
- if (result != null) {
- RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
- return new DataChangeListenerRegistrationProxy(actorContext
- .actorSelection(reply.getListenerRegistrationPath()), listener,
- dataChangeListenerActor);
+ Future future = actorContext.executeLocalShardOperationAsync(shardName,
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ new Timeout(actorContext.getOperationDuration().$times(
+ REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));
+
+ if (future != null) {
+ final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
+ new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor);
+
+ future.onComplete(new OnComplete(){
+
+ @Override public void onComplete(Throwable failure, Object result)
+ throws Throwable {
+ if(failure != null){
+ LOG.error("Failed to register listener at path " + path.toString(), failure);
+ return;
+ }
+ RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+ listenerRegistrationProxy.setListenerRegistrationActor(actorContext
+ .actorSelection(reply.getListenerRegistrationPath()));
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ return listenerRegistrationProxy;
}
LOG.debug(
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSystem;
-
+import akka.actor.Props;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.osgi.framework.BundleContext;
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
public class DistributedDataStoreFactory {
+
+ public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
+ public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+ public static final String CONFIGURATION_NAME = "odl-cluster-data";
+ private static AtomicReference<ActorSystem> actorSystem = new AtomicReference<>();
+
public static DistributedDataStore createInstance(String name, SchemaService schemaService,
- DatastoreContext datastoreContext, BundleContext bundleContext) {
+ DatastoreContext datastoreContext, BundleContext bundleContext) {
- ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
+ ActorSystem actorSystem = getOrCreateInstance(bundleContext);
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
- new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
- config, datastoreContext );
+ new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+ config, datastoreContext);
+
ShardStrategyFactory.setConfiguration(config);
schemaService.registerSchemaContextListener(dataStore);
return dataStore;
}
+
+ synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext) {
+
+ if (actorSystem.get() != null){
+ return actorSystem.get();
+ }
+ // Create an OSGi bundle classloader for actor system
+ BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+ Thread.currentThread().getContextClassLoader());
+
+ ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
+ ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
+ system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+
+ actorSystem.set(system);
+ return system;
+ }
+
+
+ private static final Config readAkkaConfiguration() {
+ File defaultConfigFile = new File(AKKA_CONF_PATH);
+ Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
+ return ConfigFactory.parseFile(defaultConfigFile);
+ }
}
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final DatastoreContext datastoreContext;
-
private SchemaContext schemaContext;
private ActorRef createSnapshotTransaction;
+ private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+
private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
DatastoreContext datastoreContext, SchemaContext schemaContext) {
super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
+ if (isMetricsCaptureEnabled()) {
+ getContext().become(new MeteringBehavior(this));
+ }
}
private static Map<String, String> mapPeerAddresses(
LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
getSender());
- if (message.getClass()
- .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
- if (isLeader()) {
- createTransactionChain();
- } else if (getLeader() != null) {
- getLeader().forward(message, getContext());
- }
- } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
// This must be for install snapshot. Don't want to open this up and trigger
// deSerialization
- self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self());
+ self()
+ .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+ self());
// Send a PoisonPill instead of sending close transaction because we do not really need
// a response
getSender().tell(PoisonPill.getInstance(), self());
+ } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
+ closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
registerChangeListener((RegisterChangeListener) message);
} else if (message instanceof UpdateSchemaContext) {
createTransaction(CreateTransaction.fromSerializable(message));
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
+ } else {
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+ "Could not find leader so transaction cannot be created")), getSelf());
}
} else if (message instanceof PeerAddressResolved) {
PeerAddressResolved resolved = (PeerAddressResolved) message;
}
}
+ private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
+ DOMStoreTransactionChain chain =
+ transactionChains.remove(closeTransactionChain.getTransactionChainId());
+
+ if(chain != null) {
+ chain.close();
+ }
+ }
+
private ActorRef createTypedTransactionActor(
int transactionType,
- ShardTransactionIdentifier transactionId) {
+ ShardTransactionIdentifier transactionId,
+ String transactionChainId ) {
+
+ DOMStoreTransactionFactory factory = store;
+
+ if(!transactionChainId.isEmpty()) {
+ factory = transactionChains.get(transactionChainId);
+ if(factory == null){
+ DOMStoreTransactionChain transactionChain = store.createTransactionChain();
+ transactionChains.put(transactionChainId, transactionChain);
+ factory = transactionChain;
+ }
+ }
if(this.schemaContext == null){
throw new NullPointerException("schemaContext should not be null");
shardMBean.incrementReadOnlyTransactionCount();
return getContext().actorOf(
- ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
+ ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
schemaContext,datastoreContext, shardMBean), transactionId.toString());
} else if (transactionType
shardMBean.incrementReadWriteTransactionCount();
return getContext().actorOf(
- ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
+ ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
schemaContext, datastoreContext, shardMBean), transactionId.toString());
shardMBean.incrementWriteOnlyTransactionCount();
return getContext().actorOf(
- ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
+ ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
schemaContext, datastoreContext, shardMBean), transactionId.toString());
} else {
throw new IllegalArgumentException(
private void createTransaction(CreateTransaction createTransaction) {
createTransaction(createTransaction.getTransactionType(),
- createTransaction.getTransactionId());
+ createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
}
- private ActorRef createTransaction(int transactionType, String remoteTransactionId) {
+ private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
ShardTransactionIdentifier transactionId =
ShardTransactionIdentifier.builder()
.build();
LOG.debug("Creating transaction : {} ", transactionId);
ActorRef transactionActor =
- createTypedTransactionActor(transactionType, transactionId);
+ createTypedTransactionActor(transactionType, transactionId, transactionChainId);
getSender()
.tell(new CreateTransactionReply(
modification);
DOMStoreWriteTransaction transaction =
store.newWriteOnlyTransaction();
+
+ LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
+
modification.apply(transaction);
try {
syncCommitTransaction(transaction);
return;
}
+
+ if(sender == null){
+ LOG.error("Commit failed. Sender cannot be null");
+ return;
+ }
+
final ListenableFuture<Void> future = cohort.commit();
final ActorRef self = getSelf();
ActorRef transactionChain = getContext().actorOf(
ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
- getSelf());
+ getSelf());
+ }
+
+ private boolean isMetricsCaptureEnabled(){
+ CommonConfig config = new CommonConfig(getContext().system().settings().config());
+ return config.isMetricCaptureEnabled();
}
@Override protected void applyState(ActorRef clientActor, String identifier,
// so that this actor does not get block building the snapshot
createSnapshotTransaction = createTransaction(
TransactionProxy.TransactionType.READ_ONLY.ordinal(),
- "createSnapshot");
+ "createSnapshot", "");
createSnapshotTransaction.tell(
new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
shardMBean.setRaftState(getRaftState().name());
shardMBean.setCurrentTerm(getCurrentTerm());
+
+ // If this actor is no longer the leader close all the transaction chains
+ if(!isLeader()){
+ for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
+ LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId());
+ entry.getValue().close();
+ }
+
+ transactionChains.clear();
+ }
}
@Override public String persistenceId() {
import akka.japi.Creator;
import akka.japi.Function;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
* <li> Monitor the cluster members and store their addresses
* <ul>
*/
-public class ShardManager extends AbstractUntypedActor {
+public class ShardManager extends AbstractUntypedActorWithMetering {
// Stores a mapping between a member name and the address of the member
// Member names look like "member-1", "member-2" etc and are as specified
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorPath;
+import akka.dispatch.Futures;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.Collections;
+import java.util.List;
/**
* TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
*/
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
+ private final String transactionChainId;
+ private volatile List<Future<ActorPath>> cohortPathFutures = Collections.emptyList();
public TransactionChainProxy(ActorContext actorContext) {
this.actorContext = actorContext;
+ transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis();
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, this);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_WRITE);
+ TransactionProxy.TransactionType.READ_WRITE, this);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.WRITE_ONLY);
+ TransactionProxy.TransactionType.WRITE_ONLY, this);
}
@Override
public void close() {
- // FIXME : The problem here is don't know which shard the transaction chain is to be created on ???
- throw new UnsupportedOperationException("close - not sure what to do here?");
+ // Send a close transaction chain request to each and every shard
+ actorContext.broadcast(new CloseTransactionChain(transactionChainId));
+ }
+
+ public String getTransactionChainId() {
+ return transactionChainId;
+ }
+
+ public void onTransactionReady(List<Future<ActorPath>> cohortPathFutures){
+ this.cohortPathFutures = cohortPathFutures;
+ }
+
+ public void waitTillCurrentTransactionReady(){
+ try {
+ Await.result(Futures
+ .sequence(this.cohortPathFutures, actorContext.getActorSystem().dispatcher()),
+ actorContext.getOperationDuration());
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e);
+ }
}
}
* </p>
*/
public class TransactionProxy implements DOMStoreReadWriteTransaction {
+
+ private final TransactionChainProxy transactionChainProxy;
+
+
+
public enum TransactionType {
READ_ONLY,
WRITE_ONLY,
private boolean inReadyState;
public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+ this(actorContext, transactionType, null);
+ }
+
+ @VisibleForTesting
+ List<Future<Object>> getRecordedOperationFutures() {
+ List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+ for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+ recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+ }
+
+ return recordedOperationFutures;
+ }
+
+ public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
this.actorContext = Preconditions.checkNotNull(actorContext,
- "actorContext should not be null");
+ "actorContext should not be null");
this.transactionType = Preconditions.checkNotNull(transactionType,
- "transactionType should not be null");
+ "transactionType should not be null");
this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
- "schemaContext should not be null");
+ "schemaContext should not be null");
+ this.transactionChainProxy = transactionChainProxy;
String memberName = actorContext.getCurrentMemberName();
if(memberName == null){
}
this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
- counter.getAndIncrement()).build();
+ counter.getAndIncrement()).build();
if(transactionType == TransactionType.READ_ONLY) {
// Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
remoteTransactionActorsMB = new AtomicBoolean();
TransactionProxyCleanupPhantomReference cleanup =
- new TransactionProxyCleanupPhantomReference(this);
+ new TransactionProxyCleanupPhantomReference(this);
phantomReferenceCache.put(cleanup, cleanup);
}
LOG.debug("Created txn {} of type {}", identifier, transactionType);
}
- @VisibleForTesting
- List<Future<Object>> getRecordedOperationFutures() {
- List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
- for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
- }
-
- return recordedOperationFutures;
- }
-
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
cohortPathFutures.add(transactionContext.readyTransaction());
}
+ if(transactionChainProxy != null){
+ transactionChainProxy.onTransactionReady(cohortPathFutures);
+ }
+
return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
identifier.toString());
}
return ShardStrategyFactory.getStrategy(path).findShard(path);
}
- private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
+ private void createTransactionIfMissing(ActorContext actorContext,
+ YangInstanceIdentifier path) {
+
+ if(transactionChainProxy != null){
+ transactionChainProxy.waitTillCurrentTransactionReady();
+ }
+
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
TransactionContext transactionContext =
remoteTransactionPaths.get(shardName);
- if(transactionContext != null){
+ if (transactionContext != null) {
// A transaction already exists with that shard
return;
}
try {
Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable());
+ new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
+ getTransactionChainId()).toSerializable());
if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
CreateTransactionReply reply =
CreateTransactionReply.fromSerializable(response);
ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
- if(transactionType == TransactionType.READ_ONLY) {
+ if (transactionType == TransactionType.READ_ONLY) {
// Add the actor to the remoteTransactionActors list for access by the
// cleanup PhantonReference.
remoteTransactionActors.add(transactionActor);
}
transactionContext = new TransactionContextImpl(shardName, transactionPath,
- transactionActor, identifier, actorContext, schemaContext);
+ transactionActor, identifier, actorContext, schemaContext);
remoteTransactionPaths.put(shardName, transactionContext);
} else {
throw new IllegalArgumentException(String.format(
- "Invalid reply type {} for CreateTransaction", response.getClass()));
+ "Invalid reply type {} for CreateTransaction", response.getClass()));
}
- } catch(Exception e){
+ } catch (Exception e) {
LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
- remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
+ remoteTransactionPaths
+ .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
}
}
+ public String getTransactionChainId() {
+ if(transactionChainProxy == null){
+ return "";
+ }
+ return transactionChainProxy.getTransactionChainId();
+ }
+
+
private interface TransactionContext {
String getShardName();
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
-public class CloseTransactionChain implements SerializableMessage{
- public static final Class SERIALIZABLE_CLASS = ShardTransactionChainMessages.CloseTransactionChain.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionChainMessages.CloseTransactionChain.newBuilder().build();
- }
+public class CloseTransactionChain implements SerializableMessage {
+ public static final Class SERIALIZABLE_CLASS =
+ ShardTransactionChainMessages.CloseTransactionChain.class;
+ private final String transactionChainId;
+
+ public CloseTransactionChain(String transactionChainId){
+ this.transactionChainId = transactionChainId;
+ }
+
+ @Override
+ public Object toSerializable() {
+ return ShardTransactionChainMessages.CloseTransactionChain.newBuilder()
+ .setTransactionChainId(transactionChainId).build();
+ }
+
+ public static CloseTransactionChain fromSerializable(Object message){
+ ShardTransactionChainMessages.CloseTransactionChain closeTransactionChain
+ = (ShardTransactionChainMessages.CloseTransactionChain) message;
+
+ return new CloseTransactionChain(closeTransactionChain.getTransactionChainId());
+ }
+
+ public String getTransactionChainId() {
+ return transactionChainId;
+ }
}
public class CreateTransaction implements SerializableMessage {
- public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class;
- private final String transactionId;
- private final int transactionType;
-
- public CreateTransaction(String transactionId, int transactionType){
-
- this.transactionId = transactionId;
- this.transactionType = transactionType;
- }
-
- public String getTransactionId() {
- return transactionId;
- }
-
- public int getTransactionType() { return transactionType;}
-
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.CreateTransaction.newBuilder().setTransactionId(transactionId).setTransactionType(transactionType).build();
- }
-
- public static CreateTransaction fromSerializable(Object message){
- ShardTransactionMessages.CreateTransaction createTransaction = (ShardTransactionMessages.CreateTransaction)message;
- return new CreateTransaction(createTransaction.getTransactionId(),createTransaction.getTransactionType() );
- }
-
+ public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class;
+ private final String transactionId;
+ private final int transactionType;
+ private final String transactionChainId;
+
+ public CreateTransaction(String transactionId, int transactionType) {
+ this(transactionId, transactionType, "");
+ }
+
+ public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
+
+ this.transactionId = transactionId;
+ this.transactionType = transactionType;
+ this.transactionChainId = transactionChainId;
+
+ }
+
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public int getTransactionType() {
+ return transactionType;
+ }
+
+ @Override
+ public Object toSerializable() {
+ return ShardTransactionMessages.CreateTransaction.newBuilder()
+ .setTransactionId(transactionId)
+ .setTransactionType(transactionType)
+ .setTransactionChainId(transactionChainId).build();
+ }
+
+ public static CreateTransaction fromSerializable(Object message) {
+ ShardTransactionMessages.CreateTransaction createTransaction =
+ (ShardTransactionMessages.CreateTransaction) message;
+ return new CreateTransaction(createTransaction.getTransactionId(),
+ createTransaction.getTransactionType(), createTransaction.getTransactionChainId());
+ }
+
+ public String getTransactionChainId() {
+ return transactionChainId;
+ }
}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
+import akka.pattern.Patterns;
import akka.util.Timeout;
-
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
*/
public Object executeRemoteOperation(ActorSelection actor, Object message) {
- LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
+ actor.toString());
Future<Object> future = ask(actor, message, operationTimeout);
actor.tell(message, ActorRef.noSender());
}
+ public void sendShardOperationAsync(String shardName, Object message) {
+ ActorSelection primary = findPrimary(shardName);
+
+ primary.tell(message, ActorRef.noSender());
+ }
+
+
/**
* Execute an operation on the primary for a given shard
* <p>
}
+ /**
+ * Execute an operation on the the local shard only asynchronously
+ *
+ * <p>
+ * This method first finds the address of the local shard if any. It then
+ * executes the operation on it.
+ * </p>
+ *
+ * @param shardName the name of the shard on which the operation needs to be executed
+ * @param message the message that needs to be sent to the shard
+ * @param timeout the amount of time that this method should wait for a response before timing out
+ * @return null if the shard could not be located else a future on which the caller can wait
+ *
+ */
+ public Future executeLocalShardOperationAsync(String shardName, Object message, Timeout timeout) {
+ ActorRef local = findLocalShard(shardName);
+ if(local == null){
+ return null;
+ }
+ return Patterns.ask(local, message, timeout);
+ }
+
+
+
public void shutdown() {
shardManager.tell(PoisonPill.getInstance(), null);
actorSystem.shutdown();
return clusterWrapper.getCurrentMemberName();
}
+ /**
+ * Send the message to each and every shard
+ *
+ * @param message
+ */
+ public void broadcast(Object message){
+ for(String shardName : configuration.getAllShardNames()){
+ try {
+ sendShardOperationAsync(shardName, message);
+ } catch(Exception e){
+ LOG.warn("broadcast failed to send message " + message.getClass().getSimpleName() + " to shard " + shardName, e);
+ }
+ }
+ }
+
+ public FiniteDuration getOperationDuration() {
+ return operationDuration;
+ }
}
odl-cluster-data {
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
+
+ metric-capture-enabled = true
+
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
cluster {
range "1..max";
}
}
-
+
typedef operation-timeout-type {
type uint16 {
range "5..max";
}
}
-
+
grouping data-store-properties {
leaf max-shard-data-change-executor-queue-size {
default 1000;
type non-zero-uint16-type;
description "The maximum queue size for each shard's data store executor.";
}
-
+
leaf shard-transaction-idle-timeout-in-minutes {
default 10;
type non-zero-uint16-type;
description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
}
-
+
leaf operation-timeout-in-seconds {
default 5;
type operation-timeout-type;
description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
}
+
+ leaf enable-metric-capture {
+ default false;
+ type boolean;
+ description "Enable or disable metric capture.";
+ }
+
+ leaf bounded-mailbox-capacity {
+ default 1000;
+ type non-zero-uint16-type;
+ description "Max queue size that an actor's mailbox can reach";
+ }
}
-
+
// Augments the 'configuration' choice node under modules/module.
augment "/config:modules/config:module/config:configuration" {
case distributed-config-datastore-provider {
import akka.actor.Props;
import akka.event.Logging;
import akka.testkit.JavaTestKit;
-
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
assertEquals(true, result);
- // 1. Create a TransactionChain
- shard.tell(new CreateTransactionChain().toSerializable(), getRef());
-
- final ActorSelection transactionChain =
- new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionChainReply") {
- @Override
- protected ActorSelection match(Object in) {
- if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)) {
- ActorPath transactionChainPath =
- CreateTransactionChainReply.fromSerializable(getSystem(),in)
- .getTransactionChainPath();
- return getSystem()
- .actorSelection(transactionChainPath);
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertNotNull(transactionChain);
-
- System.out.println("Successfully created transaction chain");
-
- // 2. Create a Transaction on the TransactionChain
- transactionChain.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
+ // Create a transaction on the shard
+ shard.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
final ActorSelection transaction =
new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionReply") {
import java.io.File;
import java.util.List;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
File f = new File("./module-shards.conf");
ConfigFactory.parseFile(f);
}
+
+ @Test
+ public void testGetAllShardNames(){
+ Set<String> allShardNames = configuration.getAllShardNames();
+
+ assertEquals(4, allShardNames.size());
+ assertTrue(allShardNames.contains("default"));
+ assertTrue(allShardNames.contains("people-1"));
+ assertTrue(allShardNames.contains("cars-1"));
+ assertTrue(allShardNames.contains("test-1"));
+ }
}
import java.util.List;
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertNotNull;
+import static junit.framework.TestCase.assertTrue;
+
public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class));
Object messages = testContext
.executeLocalOperation(actorRef, "messages");
- Assert.assertNotNull(messages);
+ assertNotNull(messages);
- Assert.assertTrue(messages instanceof List);
+ assertTrue(messages instanceof List);
List<Object> listMessages = (List<Object>) messages;
- Assert.assertEquals(1, listMessages.size());
+ assertEquals(1, listMessages.size());
+
+ assertTrue(listMessages.get(0).getClass()
+ .equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS));
+ }
+
+ @Test
+ public void testCloseWhenRegistrationIsNull() throws Exception {
+ final Props props = Props.create(MessageCollectorActor.class);
+ final ActorRef actorRef = getSystem().actorOf(props);
+
+ DataChangeListenerRegistrationProxy proxy =
+ new DataChangeListenerRegistrationProxy(
+ new MockDataChangeListener(), dataChangeListenerActor);
+
+ proxy.close();
+
+ //Check if it was received by the remote actor
+ ActorContext
+ testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
+ Object messages = testContext
+ .executeLocalOperation(actorRef, "messages");
+
+ assertNotNull(messages);
+
+ assertTrue(messages instanceof List);
+
+ List<Object> listMessages = (List<Object>) messages;
- Assert.assertTrue(listMessages.get(0).getClass().equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS));
+ assertEquals(0, listMessages.size());
}
}
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
}
+ @Test
+ public void transactionChainIntegrationTest() throws Exception {
+ final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+ ShardStrategyFactory.setConfiguration(configuration);
+
+
+
+ new JavaTestKit(getSystem()) {
+ {
+
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+ try {
+ final DistributedDataStore distributedDataStore =
+ new DistributedDataStore(getSystem(), "config",
+ new MockClusterWrapper(), configuration,
+ new DatastoreContext());
+
+ distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
+
+ // Wait for a specific log message to show up
+ final boolean result =
+ new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+ ) {
+ @Override
+ protected Boolean run() {
+ return true;
+ }
+ }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
+ .message("Switching from state Candidate to Leader")
+ .occurrences(1).exec();
+
+ assertEquals(true, result);
+
+ DOMStoreTransactionChain transactionChain =
+ distributedDataStore.createTransactionChain();
+
+ DOMStoreReadWriteTransaction transaction =
+ transactionChain.newReadWriteTransaction();
+
+ transaction
+ .write(TestModel.TEST_PATH, ImmutableNodes
+ .containerNode(TestModel.TEST_QNAME));
+
+ ListenableFuture<Optional<NormalizedNode<?, ?>>>
+ future =
+ transaction.read(TestModel.TEST_PATH);
+
+ Optional<NormalizedNode<?, ?>> optional =
+ future.get();
+
+ Assert.assertTrue("Node not found", optional.isPresent());
+
+ NormalizedNode<?, ?> normalizedNode =
+ optional.get();
+
+ assertEquals(TestModel.TEST_QNAME,
+ normalizedNode.getNodeType());
+
+ DOMStoreThreePhaseCommitCohort ready =
+ transaction.ready();
+
+ ListenableFuture<Boolean> canCommit =
+ ready.canCommit();
+
+ assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+
+ ListenableFuture<Void> preCommit =
+ ready.preCommit();
+
+ preCommit.get(5, TimeUnit.SECONDS);
+
+ ListenableFuture<Void> commit = ready.commit();
+
+ commit.get(5, TimeUnit.SECONDS);
+
+ transactionChain.close();
+ } catch (ExecutionException | TimeoutException | InterruptedException e){
+ fail(e.getMessage());
+ }
+ }
+ };
+ }
+ };
+
+ }
+
//FIXME : Disabling test because it's flaky
//@Test
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorPath;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class DistributedDataStoreTest extends AbstractActorTest{
@Test
public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
+ ActorContext actorContext = mock(ActorContext.class);
+
+ distributedDataStore = new DistributedDataStore(actorContext);
+ distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
- mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()));
+ Future future = mock(Future.class);
+ when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
+ when(actorContext.getActorSystem()).thenReturn(getSystem());
+ when(actorContext
+ .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(future);
ListenerRegistration registration =
- distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
- @Override
- public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- throw new UnsupportedOperationException("onDataChanged");
- }
- }, AsyncDataBroker.DataChangeScope.BASE);
+ distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
+ mock(AsyncDataChangeListener.class),
+ AsyncDataBroker.DataChangeScope.BASE);
- assertTrue(registration instanceof DataChangeListenerRegistrationProxy);
+ assertNotNull(registration);
+
+ assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
+ }
+
+ @Test
+ public void testRegisterChangeListenerWhenSuccessfulReplyReceived() throws Exception {
+ ActorContext actorContext = mock(ActorContext.class);
+
+ distributedDataStore = new DistributedDataStore(actorContext);
+ distributedDataStore.onGlobalContextUpdated(
+ TestModel.createTestContext());
+
+ ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
+
+ // Make Future successful
+ Future f = Futures.successful(new RegisterChangeListenerReply(doNothingActorRef.path()));
+
+ // Setup the mocks
+ ActorSystem actorSystem = mock(ActorSystem.class);
+ ActorSelection actorSelection = mock(ActorSelection.class);
+
+ when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
+ when(actorSystem.dispatcher()).thenReturn(executor);
+ when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
+ when(actorContext.getActorSystem()).thenReturn(actorSystem);
+ when(actorContext
+ .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f);
+ when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
+
+ ListenerRegistration registration =
+ distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
+ mock(AsyncDataChangeListener.class),
+ AsyncDataBroker.DataChangeScope.BASE);
assertNotNull(registration);
+
+ assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
+
+ ActorSelection listenerRegistrationActor =
+ ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
+
+ assertNotNull(listenerRegistrationActor);
+
+ assertEquals(actorSelection, listenerRegistrationActor);
+ }
+
+ @Test
+ public void testRegisterChangeListenerWhenSuccessfulReplyFailed() throws Exception {
+ ActorContext actorContext = mock(ActorContext.class);
+
+ distributedDataStore = new DistributedDataStore(actorContext);
+ distributedDataStore.onGlobalContextUpdated(
+ TestModel.createTestContext());
+
+ ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
+
+ // Make Future fail
+ Future f = Futures.failed(new IllegalArgumentException());
+
+ // Setup the mocks
+ ActorSystem actorSystem = mock(ActorSystem.class);
+ ActorSelection actorSelection = mock(ActorSelection.class);
+
+ when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
+ when(actorSystem.dispatcher()).thenReturn(executor);
+ when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
+ when(actorContext.getActorSystem()).thenReturn(actorSystem);
+ when(actorContext
+ .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f);
+ when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
+
+ ListenerRegistration registration =
+ distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
+ mock(AsyncDataChangeListener.class),
+ AsyncDataBroker.DataChangeScope.BASE);
+
+ assertNotNull(registration);
+
+ assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
+
+ ActorSelection listenerRegistrationActor =
+ ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
+
+ assertNull(listenerRegistrationActor);
+
}
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
- @Test
- public void testOnReceiveCreateTransactionChain() throws Exception {
- new JavaTestKit(getSystem()) {{
- final ShardIdentifier identifier =
- ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
-
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
- final ActorRef subject =
- getSystem().actorOf(props, "testCreateTransactionChain");
-
-
- // Wait for a specific log message to show up
- final boolean result =
- new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
- ) {
- @Override
- protected Boolean run() {
- return true;
- }
- }.from(subject.path().toString())
- .message("Switching from state Candidate to Leader")
- .occurrences(1).exec();
-
- Assert.assertEquals(true, result);
-
- new Within(duration("3 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new CreateTransactionChain().toSerializable(), getRef());
-
- final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
- CreateTransactionChainReply reply =
- CreateTransactionChainReply.fromSerializable(getSystem(),in);
- return reply.getTransactionChainPath()
- .toString();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("Unexpected transaction path " + out,
- "akka://test/user/testCreateTransactionChain/$a",
- out);
-
- expectNoMsg();
- }
-
-
- };
- }};
- }
-
@Test
public void testOnReceiveRegisterListener() throws Exception {
new JavaTestKit(getSystem()) {{
}};
}
+ @Test
+ public void testCreateTransactionOnChain(){
+ new JavaTestKit(getSystem()) {{
+ final ShardIdentifier identifier =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
+ final ActorRef subject =
+ getSystem().actorOf(props, "testCreateTransactionOnChain");
+
+ // Wait for a specific log message to show up
+ final boolean result =
+ new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+ ) {
+ @Override
+ protected Boolean run() {
+ return true;
+ }
+ }.from(subject.path().toString())
+ .message("Switching from state Candidate to Leader")
+ .occurrences(1).exec();
+
+ Assert.assertEquals(true, result);
+
+ new Within(duration("3 seconds")) {
+ @Override
+ protected void run() {
+
+ subject.tell(
+ new UpdateSchemaContext(TestModel.createTestContext()),
+ getRef());
+
+ subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
+ getRef());
+
+ final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ @Override
+ protected String match(Object in) {
+ if (in instanceof CreateTransactionReply) {
+ CreateTransactionReply reply =
+ (CreateTransactionReply) in;
+ return reply.getTransactionActorPath()
+ .toString();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertTrue("Unexpected transaction path " + out,
+ out.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
+ expectNoMsg();
+ }
+ };
+ }};
+ }
+
@Test
public void testPeerAddressResolved(){
new JavaTestKit(getSystem()) {{
+++ /dev/null
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
-import static org.junit.Assert.assertEquals;
-
-public class ShardTransactionChainTest extends AbstractActorTest {
-
- private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
- private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
- MoreExecutors.sameThreadExecutor());
-
- private static final SchemaContext testSchemaContext = TestModel.createTestContext();
-
- private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
-
- private static final String mockShardName = "mockShardName";
-
- private final ShardStats shardStats = new ShardStats(mockShardName, "DataStore");
-
- @BeforeClass
- public static void staticSetup() {
- store.onGlobalContextUpdated(testSchemaContext);
- }
-
- @Test
- public void testOnReceiveCreateTransaction() throws Exception {
- new JavaTestKit(getSystem()) {{
- final Props props = ShardTransactionChain.props(store.createTransactionChain(),
- testSchemaContext, DATA_STORE_CONTEXT, shardStats);
- final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
-
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
- return CreateTransactionReply.fromSerializable(in).getTransactionPath();
- }else{
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("Unexpected transaction path " + out,
- "akka://test/user/testCreateTransaction/shard-txn-1",
- out);
-
- // Will wait for the rest of the 3 seconds
- expectNoMsg();
- }
-
-
- };
- }};
- }
-
- @Test
- public void testOnReceiveCloseTransactionChain() throws Exception {
- new JavaTestKit(getSystem()) {{
- final Props props = ShardTransactionChain.props(store.createTransactionChain(),
- testSchemaContext, DATA_STORE_CONTEXT, shardStats );
- final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
-
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new CloseTransactionChain().toSerializable(), getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
- // Will wait for the rest of the 3 seconds
- expectNoMsg();
- }
-
-
- };
- }};
- }
-}
package org.opendaylight.controller.cluster.datastore;
-import static org.mockito.Mockito.doReturn;
-
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
public class TransactionChainProxyTest {
- ActorContext actorContext = Mockito.mock(ActorContext.class);
- SchemaContext schemaContext = Mockito.mock(SchemaContext.class);
+ ActorContext actorContext = mock(ActorContext.class);
+ SchemaContext schemaContext = mock(SchemaContext.class);
@Before
public void setUp() {
}
- @Test(expected=UnsupportedOperationException.class)
+ @Test
public void testClose() throws Exception {
- new TransactionChainProxy(actorContext).close();
+ ActorContext context = mock(ActorContext.class);
+
+ new TransactionChainProxy(context).close();
+
+ verify(context, times(1)).broadcast(anyObject());
}
}
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class MockConfiguration implements Configuration{
@Override public List<String> getMemberShardNames(String memberName) {
return Collections.EMPTY_LIST;
}
+
+ @Override public Set<String> getAllShardNames() {
+ return Collections.emptySet();
+ }
}
}
}
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
/**
* Inherited from immediate parent
*/
- private final Iterable<Builder> inheritedOne;
+ private final Collection<Builder> inheritedOne;
private final YangInstanceIdentifier nodeId;
private final Collection<Node> nodes;
- private final Map<DataChangeListenerRegistration<?>, Builder> subBuilders = new HashMap<>();
- private final Map<DataChangeListenerRegistration<?>, Builder> oneBuilders = new HashMap<>();
- private final Map<DataChangeListenerRegistration<?>, Builder> baseBuilders = new HashMap<>();
+ private final Map<DataChangeListenerRegistration<?>, Builder> subBuilders;
+ private final Map<DataChangeListenerRegistration<?>, Builder> oneBuilders;
+ private final Map<DataChangeListenerRegistration<?>, Builder> baseBuilders;
private ResolveDataChangeState(final YangInstanceIdentifier nodeId,
- final Iterable<Builder> inheritedSub, final Iterable<Builder> inheritedOne,
+ final Iterable<Builder> inheritedSub, final Collection<Builder> inheritedOne,
final Collection<Node> nodes) {
this.nodeId = Preconditions.checkNotNull(nodeId);
this.nodes = Preconditions.checkNotNull(nodes);
/*
* Collect the nodes which need to be propagated from us to the child.
*/
+ final Map<DataChangeListenerRegistration<?>, Builder> sub = new HashMap<>();
+ final Map<DataChangeListenerRegistration<?>, Builder> one = new HashMap<>();
+ final Map<DataChangeListenerRegistration<?>, Builder> base = new HashMap<>();
for (Node n : nodes) {
for (DataChangeListenerRegistration<?> l : n.getListeners()) {
final Builder b = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE);
switch (l.getScope()) {
case BASE:
- baseBuilders.put(l, b);
+ base.put(l, b);
break;
case ONE:
- oneBuilders.put(l, b);
+ one.put(l, b);
break;
case SUBTREE:
- subBuilders.put(l, b);
+ sub.put(l, b);
break;
}
}
}
+
+ baseBuilders = maybeEmpty(base);
+ oneBuilders = maybeEmpty(one);
+ subBuilders = maybeEmpty(sub);
+ }
+
+ private static <K, V> Map<K, V> maybeEmpty(final Map<K, V> map) {
+ if (map.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ return map;
}
/**
* @return State handle
*/
public ResolveDataChangeState child(final PathArgument childId) {
- return new ResolveDataChangeState(nodeId.node(childId),
- Iterables.concat(inheritedSub, subBuilders.values()),
+ /*
+ * We instantiate a concatenation only when needed:
+ *
+ * 1) If our collection is empty, we reuse the parent's. This is typically the case
+ * for intermediate node, which should be the vast majority.
+ * 2) If the parent's iterable is a Collection and it is empty, reuse our collection.
+ * This is the case for the first node which defines a subtree listener in a
+ * particular subtree.
+ * 3) Concatenate the two collections. This happens when we already have some
+ * subtree listeners and we encounter a node which adds a few more.
+ *
+ * This allows us to lower number of objects allocated and also
+ * speeds up Iterables.isEmpty() in needsProcessing().
+ *
+ * Note that the check for Collection in 2) relies on precisely this logic, which
+ * ensures that we simply cannot see an empty concatenation, but rather start off with
+ * an empty collection, then switch to a non-empty collection and finally switch to
+ * a concatenation. This saves us from instantiating iterators, which a trivial
+ * Iterables.isEmpty() would do as soon as we cross case 3).
+ */
+ final Iterable<Builder> sb;
+ if (!subBuilders.isEmpty()) {
+ if (inheritedSub instanceof Collection && ((Collection<?>) inheritedSub).isEmpty()) {
+ sb = subBuilders.values();
+ } else {
+ sb = Iterables.concat(inheritedSub, subBuilders.values());
+ }
+ } else {
+ sb = inheritedSub;
+ }
+
+ return new ResolveDataChangeState(nodeId.node(childId), sb,
oneBuilders.values(), getListenerChildrenWildcarded(nodes, childId));
}
if (!nodes.isEmpty()) {
return true;
}
- // Have SUBTREE listeners
- if (!Iterables.isEmpty(inheritedSub)) {
- return true;
- }
// Have ONE listeners
- if (!Iterables.isEmpty(inheritedOne)) {
+ if (!inheritedOne.isEmpty()) {
return true;
}
- return false;
+ /*
+ * Have SUBTREE listeners
+ *
+ * This is slightly magical replacement for !Iterables.isEmpty(inheritedSub).
+ * It relies on the logic in child(), which gives us the guarantee that when
+ * inheritedSub is not a Collection, it is guaranteed to be non-empty (which
+ * means we need to process). If it is a collection, we still need to check
+ * it for emptiness.
+ *
+ * Unlike Iterables.isEmpty() this code does not instantiate any temporary
+ * objects and is thus more efficient.
+ */
+ if (inheritedSub instanceof Collection) {
+ return !((Collection<?>) inheritedSub).isEmpty();
+ }
+
+ // Non-Collection => non-empty => have to process
+ return true;
}
/**
return fromStrings(session.getServerCapabilities());
}
+ private static final QName cachedQName(String namespace, String revision, String moduleName) {
+ return QName.cachedReference(QName.create(namespace, revision, moduleName));
+ }
+
public static NetconfSessionCapabilities fromStrings(final Collection<String> capabilities) {
final Set<QName> moduleBasedCaps = new HashSet<>();
final Set<String> nonModuleCaps = Sets.newHashSet(capabilities);
String revision = REVISION_PARAM.from(queryParams);
if (revision != null) {
- moduleBasedCaps.add(QName.create(namespace, revision, moduleName));
+ moduleBasedCaps.add(cachedQName(namespace, revision, moduleName));
nonModuleCaps.remove(capability);
continue;
}
}
// FIXME: do we really want to continue here?
- moduleBasedCaps.add(QName.cachedReference(QName.create(namespace, revision, moduleName)));
+ moduleBasedCaps.add(cachedQName(namespace, revision, moduleName));
nonModuleCaps.remove(capability);
}
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_${scala.version}</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+ </dependency>
<!-- SAL Dependencies -->
<dependency>
package org.opendaylight.controller.config.yang.config.remote_rpc_connector;
+import org.opendaylight.controller.cluster.common.actor.DefaultAkkaConfigurationReader;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory;
import org.opendaylight.controller.sal.core.api.Broker;
import org.osgi.framework.BundleContext;
@Override
public java.lang.AutoCloseable createInstance() {
Broker broker = getDomBrokerDependency();
- return RemoteRpcProviderFactory.createInstance(broker, bundleContext);
+
+ RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder(getActorSystemName())
+ .metricCaptureEnabled(getEnableMetricCapture())
+ .mailboxCapacity(getBoundedMailboxCapacity())
+ .withConfigReader(new DefaultAkkaConfigurationReader())
+ .build();
+
+ return RemoteRpcProviderFactory.createInstance(broker, bundleContext, config);
}
public void setBundleContext(BundleContext bundleContext) {
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.messages.Monitor;
-
-public abstract class AbstractUntypedActor extends UntypedActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
-
-
- public AbstractUntypedActor(){
- LOG.debug("Actor created {}", getSelf());
- getContext().
- system().
- actorSelection("user/termination-monitor").
- tell(new Monitor(getSelf()), getSelf());
- }
-
- @Override public void onReceive(Object message) throws Exception {
- LOG.debug("Received message {}", message);
- handleReceive(message);
- LOG.debug("Done handling message {}", message);
- }
-
- protected abstract void handleReceive(Object message) throws Exception;
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-public class ActorConstants {
- public static final String RPC_BROKER = "rpc-broker";
- public static final String RPC_REGISTRY = "rpc-registry";
- public static final String RPC_MANAGER = "rpc";
-
- public static final String RPC_BROKER_PATH= "/user/rpc/rpc-broker";
- public static final String RPC_REGISTRY_PATH = "/user/rpc/rpc-registry";
- public static final String RPC_MANAGER_PATH = "/user/rpc";
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorSystem;
-import akka.osgi.BundleDelegatingClassLoader;
-import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
-import org.osgi.framework.BundleContext;
-
-
-public class ActorSystemFactory {
-
- public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-rpc";
- public static final String CONFIGURATION_NAME = "odl-cluster-rpc";
-
- private static volatile ActorSystem actorSystem = null;
-
- public static final ActorSystem getInstance(){
- return actorSystem;
- }
-
- /**
- * This method should be called only once during initialization
- *
- * @param bundleContext
- */
- public static final void createInstance(final BundleContext bundleContext, AkkaConfigurationReader akkaConfigurationReader) {
- if(actorSystem == null) {
- // Create an OSGi bundle classloader for actor system
- BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
- Thread.currentThread().getContextClassLoader());
- synchronized (ActorSystemFactory.class) {
- // Double check
- if (actorSystem == null) {
- ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
- akkaConfigurationReader.read().getConfig(CONFIGURATION_NAME), classLoader);
- actorSystem = system;
- }
- }
- } else {
- throw new IllegalStateException("Actor system should be created only once. Use getInstance method to access existing actor system");
- }
- }
-
-}
package org.opendaylight.controller.remote.rpc;
-import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.dispatch.OnComplete;
-import akka.util.Timeout;
-
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
-import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.concurrent.ExecutionContext;
import java.util.Collections;
import java.util.Set;
+import static akka.pattern.Patterns.ask;
+
public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation {
private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
private final ActorRef rpcBroker;
private final SchemaContext schemaContext;
+ private final RemoteRpcProviderConfig config;
- public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
+ public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext, RemoteRpcProviderConfig config) {
this.rpcBroker = rpcBroker;
this.schemaContext = schemaContext;
+ this.config = config;
}
@Override
final SettableFuture<RpcResult<CompositeNode>> listenableFuture = SettableFuture.create();
- scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg,
- new Timeout(ActorUtil.ASK_DURATION));
+ scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
- private final ActorSystem actorSystem;
private final RpcProvisionRegistry rpcProvisionRegistry;
+
+ private ActorSystem actorSystem;
private Broker.ProviderSession brokerSession;
private SchemaContext schemaContext;
private ActorRef rpcManager;
+ private RemoteRpcProviderConfig config;
public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
this.actorSystem = actorSystem;
this.rpcProvisionRegistry = rpcProvisionRegistry;
+ this.config = new RemoteRpcProviderConfig(actorSystem.settings().config());
}
@Override
public void close() throws Exception {
- this.actorSystem.shutdown();
+ if (this.actorSystem != null)
+ this.actorSystem.shutdown();
}
@Override
}
private void start() {
- LOG.info("Starting all rpc listeners and actors.");
- // Create actor to handle and sync routing table in cluster
+ LOG.info("Starting remote rpc service...");
+
SchemaService schemaService = brokerSession.getService(SchemaService.class);
schemaContext = schemaService.getGlobalContext();
- rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry), ActorConstants.RPC_MANAGER);
+ rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry),
+ config.getRpcManagerName());
- LOG.debug("Rpc actors are created.");
+ LOG.debug("rpc manager started");
}
-
@Override
public void onGlobalContextUpdated(SchemaContext schemaContext) {
this.schemaContext = schemaContext;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.util.Timeout;
+import com.typesafe.config.Config;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class RemoteRpcProviderConfig extends CommonConfig {
+
+ protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name";
+ protected static final String TAG_RPC_REGISTRY_NAME = "registry-name";
+ protected static final String TAG_RPC_MGR_NAME = "rpc-manager-name";
+ protected static final String TAG_RPC_BROKER_PATH = "rpc-broker-path";
+ protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path";
+ protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path";
+ protected static final String TAG_ASK_DURATION = "ask-duration";
+ private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval";
+
+ //locally cached values
+ private Timeout cachedAskDuration;
+ private FiniteDuration cachedGossipTickInterval;
+
+ public RemoteRpcProviderConfig(Config config){
+ super(config);
+ }
+
+ public String getRpcBrokerName(){
+ return get().getString(TAG_RPC_BROKER_NAME);
+ }
+
+ public String getRpcRegistryName(){
+ return get().getString(TAG_RPC_REGISTRY_NAME);
+ }
+
+ public String getRpcManagerName(){
+ return get().getString(TAG_RPC_MGR_NAME);
+ }
+
+ public String getRpcBrokerPath(){
+ return get().getString(TAG_RPC_BROKER_PATH);
+ }
+
+ public String getRpcRegistryPath(){
+ return get().getString(TAG_RPC_REGISTRY_PATH);
+
+ }
+
+ public String getRpcManagerPath(){
+ return get().getString(TAG_RPC_MGR_PATH);
+ }
+
+
+ public Timeout getAskDuration(){
+ if (cachedAskDuration != null){
+ return cachedAskDuration;
+ }
+
+ cachedAskDuration = new Timeout(new FiniteDuration(
+ get().getDuration(TAG_ASK_DURATION, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS));
+
+ return cachedAskDuration;
+ }
+
+ public FiniteDuration getGossipTickInterval(){
+ if (cachedGossipTickInterval != null) {
+ return cachedGossipTickInterval;
+ }
+
+ cachedGossipTickInterval = new FiniteDuration(
+ get().getDuration(TAG_GOSSIP_TICK_INTERVAL, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+
+ return cachedGossipTickInterval;
+ }
+
+ public static class Builder extends CommonConfig.Builder<Builder>{
+
+ public Builder(String actorSystemName){
+ super(actorSystemName);
+
+ //Actor names
+ configHolder.put(TAG_RPC_BROKER_NAME, "broker");
+ configHolder.put(TAG_RPC_REGISTRY_NAME, "registry");
+ configHolder.put(TAG_RPC_MGR_NAME, "rpc");
+
+ //Actor paths
+ configHolder.put(TAG_RPC_BROKER_PATH, "/user/rpc/broker");
+ configHolder.put(TAG_RPC_REGISTRY_PATH, "/user/rpc/registry");
+ configHolder.put(TAG_RPC_MGR_PATH, "/user/rpc");
+
+ //durations
+ configHolder.put(TAG_ASK_DURATION, "15s");
+ configHolder.put(TAG_GOSSIP_TICK_INTERVAL, "500ms");
+
+ }
+
+ public RemoteRpcProviderConfig build(){
+ return new RemoteRpcProviderConfig(merge());
+ }
+ }
+
+
+}
package org.opendaylight.controller.remote.rpc;
-
-import org.opendaylight.controller.remote.rpc.utils.DefaultAkkaConfigurationReader;
+import akka.actor.ActorSystem;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.typesafe.config.Config;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemoteRpcProviderFactory {
- public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext){
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProviderFactory.class);
+
+ public static RemoteRpcProvider createInstance(
+ final Broker broker, final BundleContext bundleContext, final RemoteRpcProviderConfig config){
- ActorSystemFactory.createInstance(bundleContext, new DefaultAkkaConfigurationReader());
RemoteRpcProvider rpcProvider =
- new RemoteRpcProvider(ActorSystemFactory.getInstance(), (RpcProvisionRegistry) broker);
+ new RemoteRpcProvider(createActorSystem(bundleContext, config), (RpcProvisionRegistry) broker);
+
broker.registerProvider(rpcProvider);
return rpcProvider;
}
+
+ private static ActorSystem createActorSystem(BundleContext bundleContext, RemoteRpcProviderConfig config){
+
+ // Create an OSGi bundle classloader for actor system
+ BundleDelegatingClassLoader classLoader =
+ new BundleDelegatingClassLoader(bundleContext.getBundle(),
+ Thread.currentThread().getContextClassLoader());
+
+ Config actorSystemConfig = config.get();
+ LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
+
+ if (config.isMetricCaptureEnabled()) {
+ LOG.info("Instrumentation is enabled in actor system {}. Metrics can be viewed in JMX console.",
+ config.getActorSystemName());
+ }
+
+ return ActorSystem.create(config.getActorSystemName(), actorSystemConfig, classLoader);
+ }
}
package org.opendaylight.controller.remote.rpc;
-import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.japi.Pair;
-import akka.util.Timeout;
-
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
-import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
+import static akka.pattern.Patterns.ask;
+
/**
* Actor to initiate execution of remote RPC on other nodes of the cluster.
*/
private final Broker.ProviderSession brokerSession;
private final ActorRef rpcRegistry;
private final SchemaContext schemaContext;
+ private final RemoteRpcProviderConfig config;
private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
SchemaContext schemaContext) {
this.brokerSession = brokerSession;
this.rpcRegistry = rpcRegistry;
this.schemaContext = schemaContext;
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
}
public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
null, msg.getRpc(), msg.getIdentifier());
RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
- scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg,
- new Timeout(ActorUtil.LOCAL_ASK_DURATION));
+ scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
final ActorRef sender = getSender();
final ActorRef self = self();
ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(),
schemaContext), msg.getRpc());
- scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg,
- new Timeout(ActorUtil.REMOTE_ASK_DURATION));
+ scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
import akka.actor.SupervisorStrategy;
import akka.japi.Creator;
import akka.japi.Function;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.yangtools.yang.common.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
+
import java.util.Set;
/**
private ActorRef rpcBroker;
private ActorRef rpcRegistry;
private final Broker.ProviderSession brokerSession;
+ private final RemoteRpcProviderConfig config;
private RpcListener rpcListener;
private RoutedRpcListener routeChangeListener;
private RemoteRpcImplementation rpcImplementation;
private final RpcProvisionRegistry rpcProvisionRegistry;
private RpcManager(SchemaContext schemaContext,
- Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) {
+ Broker.ProviderSession brokerSession,
+ RpcProvisionRegistry rpcProvisionRegistry) {
this.schemaContext = schemaContext;
this.brokerSession = brokerSession;
this.rpcProvisionRegistry = rpcProvisionRegistry;
+ this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
createRpcActors();
startListeners();
public static Props props(final SchemaContext schemaContext,
- final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) {
+ final Broker.ProviderSession brokerSession,
+ final RpcProvisionRegistry rpcProvisionRegistry) {
return Props.create(new Creator<RpcManager>() {
@Override
public RpcManager create() throws Exception {
private void createRpcActors() {
LOG.debug("Create rpc registry and broker actors");
- Config conf = ConfigFactory.load();
-
rpcRegistry =
getContext().actorOf(Props.create(RpcRegistry.class).
- withMailbox(ActorUtil.MAILBOX), ActorConstants.RPC_REGISTRY);
+ withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
rpcBroker =
getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
- withMailbox(ActorUtil.MAILBOX),ActorConstants.RPC_BROKER);
+ withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
rpcRegistry.tell(localRouter, self());
rpcListener = new RpcListener(rpcRegistry);
routeChangeListener = new RoutedRpcListener(rpcRegistry);
- rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
+ rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
brokerSession.addRpcRegistrationListener(rpcListener);
rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.messages.Monitor;
+import org.opendaylight.controller.cluster.common.actor.Monitor;
public class TerminationMonitor extends UntypedActor{
protected final LoggingAdapter LOG =
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.UntypedActor;
import akka.dispatch.Mapper;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Pair;
import akka.pattern.Patterns;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import scala.concurrent.Future;
import java.util.Map;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
-public class RpcRegistry extends UntypedActor {
+public class RpcRegistry extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
*/
private ActorRef localRouter;
+ private RemoteRpcProviderConfig config;
+
public RpcRegistry() {
bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-
+ this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
log.info("Bucket store path = {}", bucketStore.path().toString());
}
this.bucketStore = bucketStore;
}
- @Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: message [{}]", message);
+ @Override
+ protected void handleReceive(Object message) throws Exception {
//TODO: if sender is remote, reject message
if (message instanceof SetLocalRouter)
Preconditions.checkState(localRouter != null, "Router must be set first");
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
private void receiveGetRouter(FindRouters msg) {
final ActorRef sender = getSender();
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
}
* @param routeId
* @return
*/
- private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ private Messages.FindRoutersReply createReplyWithRouters(
+ Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
List<Pair<ActorRef, Long>> routers = new ArrayList<>();
Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
* @param sender client who asked to find the routers.
* @return
*/
- private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
+ private Mapper<Object, Void> getMapperToGetRouter(
+ final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
return new Mapper<Object, Void>() {
@Override
public Void apply(Object replyMessage) {
public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
- private Long version = System.currentTimeMillis();;
+ private Long version = System.currentTimeMillis();
private T data;
import akka.actor.ActorRefProvider;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.UntypedActor;
import akka.cluster.ClusterActorRefProvider;
import akka.event.Logging;
import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.utils.ConditionalProbe;
import java.util.HashMap;
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
-public class BucketStore extends UntypedActor {
+public class BucketStore extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
/**
* Bucket owned by the node
*/
- private BucketImpl localBucket = new BucketImpl();;
+ private BucketImpl localBucket = new BucketImpl();
/**
* Buckets ownded by other known nodes in the cluster
private ConditionalProbe probe;
+ private final RemoteRpcProviderConfig config;
+
+ public BucketStore(){
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ }
+
@Override
public void preStart(){
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
if ( provider instanceof ClusterActorRefProvider)
- getContext().actorOf(Props.create(Gossiper.class).withMailbox(ActorUtil.MAILBOX), "gossiper");
+ getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
}
- @Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
+ @Override
+ protected void handleReceive(Object message) throws Exception {
if (probe != null) {
probe.tell(message, getSelf());
}
receiveGetLocalBucket();
} else if (message instanceof GetBucketsByMembers) {
receiveGetBucketsByMembers(
- ((GetBucketsByMembers) message).getMembers());
+ ((GetBucketsByMembers) message).getMembers());
} else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
} else if (message instanceof UpdateRemoteBuckets) {
receiveUpdateRemoteBuckets(
- ((UpdateRemoteBuckets) message).getBuckets());
+ ((UpdateRemoteBuckets) message).getBuckets());
} else {
log.debug("Unhandled message [{}]", message);
unhandled(message);
}
-
}
/**
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
-import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
*
*/
-public class Gossiper extends UntypedActor {
+public class Gossiper extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private Boolean autoStartGossipTicks = true;
- public Gossiper(){}
+ private RemoteRpcProviderConfig config;
+
+ public Gossiper(){
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ }
/**
* Helpful for testing
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
- ActorUtil.GOSSIP_TICK_INTERVAL, //interval
+ config.getGossipTickInterval(), //interval
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
}
@Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
-
+ protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (message instanceof GossipTick)
receiveGossipTick();
- //Message from remote gossiper with its bucket versions
+ //Message from remote gossiper with its bucket versions
else if (message instanceof GossipStatus)
receiveGossipStatus((GossipStatus) message);
- //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
- //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
- //message with its local versions
+ //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
+ //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
+ //message with its local versions
else if (message instanceof GossipEnvelope)
receiveGossip((GossipEnvelope) message);
void receiveGossipTick(){
if (clusterMembers.size() == 0) return; //no members to send gossip status to
- Address remoteMemberToGossipTo = null;
+ Address remoteMemberToGossipTo;
if (clusterMembers.size() == 1)
remoteMemberToGossipTo = clusterMembers.get(0);
final ActorRef sender = getSender();
Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis());
+ Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
}
//Get local status from bucket store and send to remote
Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
//Find gossiper on remote system
ActorSelection remoteRef = getContext().system().actorSelection(
localIsOlder.add(address);
else if (localVersions.get(address) > remoteVersions.get(address))
localIsNewer.add(address);
- else
- continue;
}
if (!localIsOlder.isEmpty())
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- */
-package org.opendaylight.controller.remote.rpc.utils;
-
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-public class ActorUtil {
- public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
- public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
- public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
- public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS);
- public static final String MAILBOX = "bounded-mailbox";
-}
odl-cluster-rpc {
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
import config { prefix config; revision-date 2013-04-05; }
import opendaylight-md-sal-dom {prefix dom;}
-
+
description
"This module contains the base YANG definitions for
the remote routed rpc";
-
+
revision "2014-07-07" {
description
"Initial revision";
augment "/config:modules/config:module/config:configuration" {
case remote-rpc-connector {
when "/config:modules/config:module/config:type = 'remote-rpc-connector'";
-
+
container dom-broker {
uses config:service-ref {
refine type {
}
}
}
+
+ leaf enable-metric-capture {
+ default false;
+ type boolean;
+ description "Enable or disable metric capture.";
+ }
+
+ leaf actor-system-name {
+ default odl-cluster-rpc;
+ type string;
+ description "Name by which actor system is identified. Its also used to find relevant configuration";
+ }
+
+ leaf bounded-mailbox-capacity {
+ default 1000;
+ type uint16;
+ description "Max queue size that an actor's mailbox can reach";
+ }
}
}
package org.opendaylight.controller.remote.rpc;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableList;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.Node;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
-import com.google.common.collect.ImmutableList;
-import com.typesafe.config.ConfigFactory;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
/**
* Base class for RPC tests.
@BeforeClass
public static void setup() throws InterruptedException {
- node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
- node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
+ RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+ RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+ node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+ node2 = ActorSystem.create("opendaylight-rpc", config2.get());
}
@AfterClass
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-import akka.actor.ActorSystem;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.junit.After;
-import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ActorSystemFactoryTest {
- ActorSystem system = null;
-
- @Test
- public void testActorSystemCreation(){
- BundleContext context = mock(BundleContext.class);
- when(context.getBundle()).thenReturn(mock(Bundle.class));
-
- AkkaConfigurationReader reader = mock(AkkaConfigurationReader.class);
- when(reader.read()).thenReturn(ConfigFactory.load());
-
- ActorSystemFactory.createInstance(context, reader);
- system = ActorSystemFactory.getInstance();
- Assert.assertNotNull(system);
- // Check illegal state exception
-
- try {
- ActorSystemFactory.createInstance(context, reader);
- fail("Illegal State exception should be thrown, while creating actor system second time");
- } catch (IllegalStateException e) {
- }
- }
-
- @After
- public void cleanup() throws InterruptedException {
- if(system != null) {
- system.shutdown();
- }
- }
-}
package org.opendaylight.controller.remote.rpc;
-import static org.junit.Assert.assertEquals;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import akka.testkit.JavaTestKit;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.util.concurrent.ListenableFuture;
+import static org.junit.Assert.assertEquals;
/***
* Unit tests for RemoteRpcImplementation.
final AtomicReference<AssertionError> assertError = new AtomicReference<>();
try {
RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext);
+ probeReg1.getRef(), schemaContext, getConfig());
final CompositeNode input = makeRPCInput("foo");
final CompositeNode output = makeRPCOutput("bar");
final AtomicReference<AssertionError> assertError = new AtomicReference<>();
try {
RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext);
+ probeReg1.getRef(), schemaContext, getConfig());
QName instanceQName = new QName(new URI("ns"), "instance");
YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName);
final AtomicReference<AssertionError> assertError = new AtomicReference<>();
try {
RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext);
+ probeReg1.getRef(), schemaContext, getConfig());
final CompositeNode input = makeRPCInput("foo");
final AtomicReference<AssertionError> assertError = new AtomicReference<>();
try {
RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext);
+ probeReg1.getRef(), schemaContext, getConfig());
final CompositeNode input = makeRPCInput("foo");
return invokeRpcMsg;
}
+
+ private RemoteRpcProviderConfig getConfig(){
+ return new RemoteRpcProviderConfig.Builder("unit-test").build();
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+public class RemoteRpcProviderConfigTest {
+
+ @Test
+ public void testConfigDefaults() {
+
+ Config c = ConfigFactory.parseFile(new File("application.conf"));
+ RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test").build();
+
+ //Assert on configurations from common config
+ Assert.assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default
+ Assert.assertNotNull(config.getMailBoxCapacity());
+ Assert.assertNotNull(config.getMailBoxName());
+ Assert.assertNotNull(config.getMailBoxPushTimeout());
+
+ //rest of the configurations should be set
+ Assert.assertNotNull(config.getActorSystemName());
+ Assert.assertNotNull(config.getRpcBrokerName());
+ Assert.assertNotNull(config.getRpcBrokerPath());
+ Assert.assertNotNull(config.getRpcManagerName());
+ Assert.assertNotNull(config.getRpcManagerPath());
+ Assert.assertNotNull(config.getRpcRegistryName());
+ Assert.assertNotNull(config.getRpcRegistryPath());
+ Assert.assertNotNull(config.getAskDuration());
+ Assert.assertNotNull(config.getGossipTickInterval());
+
+
+
+ }
+
+ @Test
+ public void testConfigCustomizations() {
+
+ AkkaConfigurationReader reader = new TestConfigReader();
+
+ final int expectedCapacity = 100;
+ String timeOutVal = "10ms";
+ FiniteDuration expectedTimeout = FiniteDuration.create(10, TimeUnit.MILLISECONDS);
+
+ RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test")
+ .metricCaptureEnabled(true)//enable metric capture
+ .mailboxCapacity(expectedCapacity)
+ .mailboxPushTimeout(timeOutVal)
+ .withConfigReader(reader)
+ .build();
+
+ Assert.assertTrue(config.isMetricCaptureEnabled());
+ Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+ Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+
+ //Now check this config inside an actor
+ ActorSystem system = ActorSystem.create("unit-test", config.get());
+ TestActorRef<ConfigTestActor> configTestActorTestActorRef =
+ TestActorRef.create(system, Props.create(ConfigTestActor.class));
+
+ ConfigTestActor actor = configTestActorTestActorRef.underlyingActor();
+ Config actorConfig = actor.getConfig();
+
+ config = new RemoteRpcProviderConfig(actorConfig);
+
+ Assert.assertTrue(config.isMetricCaptureEnabled());
+ Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+ Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+ }
+
+ public static class ConfigTestActor extends UntypedActor {
+
+ private Config actorSystemConfig;
+
+ public ConfigTestActor() {
+ this.actorSystemConfig = getContext().system().settings().config();
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ }
+
+ /**
+ * Only for testing. NEVER expose actor's internal state like this.
+ *
+ * @return
+ */
+ public Config getConfig() {
+ return actorSystemConfig;
+ }
+ }
+
+ public static class TestConfigReader implements AkkaConfigurationReader {
+
+ @Override
+ public Config read() {
+ return ConfigFactory.parseResources("application.conf");
+
+ }
+ }
+}
\ No newline at end of file
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
-import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.Config;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
public class RemoteRpcProviderTest {
static ActorSystem system;
-
+ static RemoteRpcProviderConfig moduleConfig;
@BeforeClass
public static void setup() throws InterruptedException {
- system = ActorSystem.create("odl-cluster-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
+ moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
+ Config config = moduleConfig.get();
+ system = ActorSystem.create("odl-cluster-rpc", config);
+
}
@AfterClass
SchemaService schemaService = mock(SchemaService.class);
when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class));
when(session.getService(SchemaService.class)).thenReturn(schemaService);
+
rpcProvider.onSessionInitiated(session);
- ActorRef actorRef = Await.result(system.actorSelection(ActorConstants.RPC_MANAGER_PATH).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
- Duration.create(2, TimeUnit.SECONDS));
- Assert.assertTrue(actorRef.path().toString().contains(ActorConstants.RPC_MANAGER_PATH));
+
+ ActorRef actorRef = Await.result(
+ system.actorSelection(
+ moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
+ Duration.create(2, TimeUnit.SECONDS));
+
+ Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
}
}
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import com.google.common.base.Predicate;
-import com.typesafe.config.ConfigFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
@BeforeClass
public static void setup() throws InterruptedException {
- node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
- node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
- node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
+ RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+ RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+ RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
+ node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+ node2 = ActorSystem.create("opendaylight-rpc", config2.get());
+ node3 = ActorSystem.create("opendaylight-rpc", config3.get());
}
@AfterClass
new ConditionalProbe(probe.getRef(), new Predicate() {
@Override
public boolean apply(@Nullable Object input) {
- return clazz.equals(input.getClass());
+ if (input != null)
+ return clazz.equals(input.getClass());
+ else
+ return false;
}
});
odl-cluster-rpc{
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
}
unit-test{
akka {
- loglevel = "INFO"
+ loglevel = "DEBUG"
#loggers = ["akka.event.slf4j.Slf4jLogger"]
}
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ #mailbox-capacity is specified in config subsystem
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
memberA{
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
}
memberB{
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
}
memberC{
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
DataContainerChild<? extends PathArgument, ?> child = iterator.next();
nnWriter.write(child);
nnWriter.flush();
- if(iterator.hasNext()) {
- outputWriter.write(",");
- }
}
}
import com.google.common.collect.Lists;
import java.math.BigInteger;
import java.net.URI;
+import java.net.URISyntaxException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
private static final String SCOPE_PARAM_NAME = "scope";
+ private static final String NETCONF_BASE = "urn:ietf:params:xml:ns:netconf:base:1.0";
+
+ private static final String NETCONF_BASE_PAYLOAD_NAME = "data";
+
+ private static final QName NETCONF_BASE_QNAME;
+
static {
try {
EVENT_SUBSCRIPTION_AUGMENT_REVISION = new SimpleDateFormat("yyyy-MM-dd").parse("2014-07-08");
+ NETCONF_BASE_QNAME = QName.create(QNameModule.create(new URI(NETCONF_BASE), null), NETCONF_BASE_PAYLOAD_NAME );
} catch (ParseException e) {
throw new RestconfDocumentedException(
"It wasn't possible to convert revision date of sal-remote-augment to date", ErrorType.APPLICATION,
ErrorTag.OPERATION_FAILED);
+ } catch (URISyntaxException e) {
+ throw new RestconfDocumentedException(
+ "It wasn't possible to create instance of URI class with "+NETCONF_BASE+" URI", ErrorType.APPLICATION,
+ ErrorTag.OPERATION_FAILED);
}
}
validateInput(iiWithData.getSchemaNode(), payload);
DOMMountPoint mountPoint = iiWithData.getMountPoint();
+ validateTopLevelNodeName(payload, iiWithData.getInstanceIdentifier());
final CompositeNode value = this.normalizeNode(payload, iiWithData.getSchemaNode(), mountPoint);
validateListKeysEqualityInPayloadAndUri(iiWithData, value);
final NormalizedNode<?, ?> datastoreNormalizedNode = compositeNodeToDatastoreNormalizedNode(value,
iiWithData.getSchemaNode());
+
YangInstanceIdentifier normalizedII;
if (mountPoint != null) {
normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(
return Response.status(Status.OK).build();
}
+ private void validateTopLevelNodeName(final Node<?> node,
+ final YangInstanceIdentifier identifier) {
+ final String payloadName = getName(node);
+ final Iterator<PathArgument> pathArguments = identifier.getReversePathArguments().iterator();
+
+ //no arguments
+ if (!pathArguments.hasNext()) {
+ //no "data" payload
+ if (!node.getNodeType().equals(NETCONF_BASE_QNAME)) {
+ throw new RestconfDocumentedException("Instance identifier has to contain at least one path argument",
+ ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
+ }
+ //any arguments
+ } else {
+ final String identifierName = pathArguments.next().getNodeType().getLocalName();
+ if (!payloadName.equals(identifierName)) {
+ throw new RestconfDocumentedException("Payload name (" + payloadName
+ + ") is different from identifier name (" + identifierName + ")", ErrorType.PROTOCOL,
+ ErrorTag.MALFORMED_MESSAGE);
+ }
+ }
+ }
+
/**
* Validates whether keys in {@code payload} are equal to values of keys in {@code iiWithData} for list schema node
*
private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
private final MBeanServerConnection mBeanServerConnection;
- private final ConfigPersisterNotificationListener listener;
+ private final NotificationListener listener;
- public ConfigPersisterNotificationHandler(MBeanServerConnection mBeanServerConnection,
- Persister persisterAggregator) {
+ public ConfigPersisterNotificationHandler(final MBeanServerConnection mBeanServerConnection, final Persister persisterAggregator) {
+ this(mBeanServerConnection, new ConfigPersisterNotificationListener(persisterAggregator));
+ }
+
+ public ConfigPersisterNotificationHandler(final MBeanServerConnection mBeanServerConnection, final NotificationListener notificationListener) {
this.mBeanServerConnection = mBeanServerConnection;
- listener = new ConfigPersisterNotificationListener(persisterAggregator);
+ this.listener = notificationListener;
registerAsJMXListener(mBeanServerConnection, listener);
-
}
- private static void registerAsJMXListener(MBeanServerConnection mBeanServerConnection, ConfigPersisterNotificationListener listener) {
+ private static void registerAsJMXListener(final MBeanServerConnection mBeanServerConnection, final NotificationListener listener) {
logger.trace("Called registerAsJMXListener");
try {
mBeanServerConnection.addNotificationListener(DefaultCommitOperationMXBean.OBJECT_NAME, listener, null, null);
@Override
public synchronized void close() {
// unregister from JMX
- ObjectName on = DefaultCommitOperationMXBean.OBJECT_NAME;
+ final ObjectName on = DefaultCommitOperationMXBean.OBJECT_NAME;
try {
if (mBeanServerConnection.isRegistered(on)) {
mBeanServerConnection.removeNotificationListener(on, listener);
}
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.warn("Unable to unregister {} as listener for {}", listener, on, e);
}
}
private final Persister persisterAggregator;
- ConfigPersisterNotificationListener(Persister persisterAggregator) {
+ ConfigPersisterNotificationListener(final Persister persisterAggregator) {
this.persisterAggregator = persisterAggregator;
}
@Override
- public void handleNotification(Notification notification, Object handback) {
+ public void handleNotification(final Notification notification, final Object handback) {
if (!(notification instanceof NetconfJMXNotification))
return;
if (notification instanceof CommitJMXNotification) {
try {
handleAfterCommitNotification((CommitJMXNotification) notification);
- } catch (Exception e) {
+ } catch (final Exception e) {
// log exceptions from notification Handler here since
// notificationBroadcastSupport logs only DEBUG level
logger.warn("Failed to handle notification {}", notification, e);
persisterAggregator.persistConfig(new CapabilityStrippingConfigSnapshotHolder(notification.getConfigSnapshot(),
notification.getCapabilities()));
logger.trace("Configuration persisted successfully");
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new RuntimeException("Unable to persist configuration snapshot", e);
}
}
public void persistConfig(ConfigSnapshotHolder holder) throws IOException {
for (PersisterWithConfiguration persisterWithConfiguration: persisterWithConfigurations){
if (!persisterWithConfiguration.readOnly){
- logger.debug("Calling {}.persistConfig",persisterWithConfiguration.storage);
- persisterWithConfiguration.storage.persistConfig(holder);
+ logger.debug("Calling {}.persistConfig", persisterWithConfiguration.getStorage());
+ persisterWithConfiguration.getStorage().persistConfig(holder);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.persist.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import javax.management.MBeanServerConnection;
+
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.config.persist.api.Persister;
+
+public class ConfigPersisterNotificationHandlerTest {
+
+ @Mock
+ private MBeanServerConnection mBeanServer;
+ @Mock
+ private Persister notificationListener;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ doNothing().when(mBeanServer).addNotificationListener(any(ObjectName.class), any(NotificationListener.class),
+ any(NotificationFilter.class), anyObject());
+ }
+
+ @Test
+ public void testNotificationHandler() throws Exception {
+ doReturn(true).when(mBeanServer).isRegistered(any(ObjectName.class));
+ doThrow(Exception.class).when(mBeanServer).removeNotificationListener(any(ObjectName.class), any(NotificationListener.class));
+
+ final ConfigPersisterNotificationHandler testedHandler = new ConfigPersisterNotificationHandler(mBeanServer, notificationListener);
+ verify(mBeanServer).addNotificationListener(any(ObjectName.class), any(NotificationListener.class),
+ any(NotificationFilter.class), anyObject());
+
+ testedHandler.close();
+ verify(mBeanServer).removeNotificationListener(any(ObjectName.class), any(NotificationListener.class));
+ }
+
+ @Test
+ public void testNotificationHandlerCloseNotRegistered() throws Exception {
+ doReturn(false).when(mBeanServer).isRegistered(any(ObjectName.class));
+
+ final ConfigPersisterNotificationHandler testedHandler = new ConfigPersisterNotificationHandler(mBeanServer, notificationListener);
+
+ testedHandler.close();
+ verify(mBeanServer, times(0)).removeNotificationListener(any(ObjectName.class), any(NotificationListener.class));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.persist.impl;
+
+import java.util.Collections;
+
+import javax.management.Notification;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.config.persist.api.Persister;
+import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
+import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+
+import com.google.common.collect.Lists;
+
+public class ConfigPersisterNotificationListenerTest {
+
+ @Mock
+ private Persister mockPersister;
+ private PersisterAggregator persisterAggregator;
+
+ @Mock
+ private NetconfJMXNotification unknownNetconfNotif;
+ @Mock
+ private CommitJMXNotification commitNetconfNotif;
+ @Mock
+ private Notification unknownNotif;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ Mockito.doNothing().when(mockPersister).persistConfig(Matchers.any(ConfigSnapshotHolder.class));
+ Mockito.doReturn("persister").when(mockPersister).toString();
+ final PersisterAggregator.PersisterWithConfiguration withCfg = new PersisterAggregator.PersisterWithConfiguration(mockPersister, false);
+ persisterAggregator = new PersisterAggregator(Lists.newArrayList(withCfg));
+
+ Mockito.doReturn("netconfUnknownNotification").when(unknownNetconfNotif).toString();
+ Mockito.doReturn("netconfCommitNotification").when(commitNetconfNotif).toString();
+
+ Mockito.doReturn(XmlUtil.readXmlToElement("<config-snapshot/>")).when(commitNetconfNotif).getConfigSnapshot();
+ Mockito.doReturn(Collections.emptySet()).when(commitNetconfNotif).getCapabilities();
+
+ }
+
+ @Test
+ public void testNotificationListenerUnknownNotification() throws Exception {
+ final ConfigPersisterNotificationListener testeListener = new ConfigPersisterNotificationListener(persisterAggregator);
+ testeListener.handleNotification(unknownNotif, null);
+ Mockito.verifyZeroInteractions(mockPersister);
+ }
+
+ @Test
+ public void testNotificationListenerUnknownNetconfNotification() throws Exception {
+ final ConfigPersisterNotificationListener testeListener = new ConfigPersisterNotificationListener(persisterAggregator);
+ try {
+ testeListener.handleNotification(unknownNetconfNotif, null);
+ Assert.fail("Unknown netconf notification should fail");
+ } catch (final IllegalStateException e) {
+ Mockito.verifyZeroInteractions(mockPersister);
+ }
+ }
+
+ @Test
+ public void testNotificationListenerCommitNetconfNotification() throws Exception {
+ final ConfigPersisterNotificationListener testeListener = new ConfigPersisterNotificationListener(persisterAggregator);
+ testeListener.handleNotification(commitNetconfNotif, null);
+ Mockito.verify(mockPersister).persistConfig(Matchers.any(ConfigSnapshotHolder.class));
+ }
+}
package org.opendaylight.controller.netconf.persist.impl;
+import com.google.common.collect.Lists;
+
import org.junit.Test;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
assertEquals(1, DummyAdapter.props);
}
+ @Test
+ public void testNoopAdapter() throws Exception {
+ final NoOpStorageAdapter noOpStorageAdapter = new NoOpStorageAdapter();
+ final PersisterAggregator persisterAggregator =
+ new PersisterAggregator(Lists.newArrayList(new PersisterWithConfiguration(noOpStorageAdapter, false)));
+
+ noOpStorageAdapter.instantiate(null);
+
+ persisterAggregator.persistConfig(null);
+ persisterAggregator.loadLastConfigs();
+ persisterAggregator.persistConfig(null);
+ persisterAggregator.loadLastConfigs();
+
+ noOpStorageAdapter.close();
+ }
+
@Test
public void testLoadFromPropertyFile() throws Exception {
PersisterAggregator persisterAggregator = PersisterAggregator.createFromProperties(loadFile("test2.properties"));
*/
package org.opendaylight.controller.netconf.nettyutil.handler.exi;
-import com.google.common.base.Preconditions;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.openexi.proc.common.AlignmentType;
import org.openexi.proc.common.EXIOptions;
import org.openexi.proc.common.EXIOptionsException;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import com.google.common.base.Preconditions;
public final class EXIParameters {
private static final String EXI_PARAMETER_ALIGNMENT = "alignment";
- private static final String EXI_PARAMETER_BYTE_ALIGNED = "byte-aligned";
- private static final String EXI_PARAMETER_BIT_PACKED = "bit-packed";
- private static final String EXI_PARAMETER_COMPRESSED = "compressed";
- private static final String EXI_PARAMETER_PRE_COMPRESSION = "pre-compression";
+ static final String EXI_PARAMETER_BYTE_ALIGNED = "byte-aligned";
+ static final String EXI_PARAMETER_BIT_PACKED = "bit-packed";
+ static final String EXI_PARAMETER_COMPRESSED = "compressed";
+ static final String EXI_PARAMETER_PRE_COMPRESSION = "pre-compression";
private static final String EXI_PARAMETER_FIDELITY = "fidelity";
private static final String EXI_FIDELITY_DTD = "dtd";
final EXIOptions options = new EXIOptions();
options.setAlignmentType(AlignmentType.bitPacked);
- if (root.getElementsByTagName(EXI_PARAMETER_ALIGNMENT).getLength() > 0) {
- if (root.getElementsByTagName(EXI_PARAMETER_BIT_PACKED).getLength() > 0) {
- options.setAlignmentType(AlignmentType.bitPacked);
- } else if (root.getElementsByTagName(EXI_PARAMETER_BYTE_ALIGNED).getLength() > 0) {
- options.setAlignmentType(AlignmentType.byteAligned);
- } else if (root.getElementsByTagName(EXI_PARAMETER_COMPRESSED).getLength() > 0) {
- options.setAlignmentType(AlignmentType.compress);
- } else if (root.getElementsByTagName(EXI_PARAMETER_PRE_COMPRESSION).getLength() > 0) {
- options.setAlignmentType(AlignmentType.preCompress);
+
+ final NodeList alignmentElements = root.getElementsByTagName(EXI_PARAMETER_ALIGNMENT);
+ if (alignmentElements.getLength() > 0) {
+ final Element alignmentElement = (Element) alignmentElements.item(0);
+ final String alignmentTextContent = alignmentElement.getTextContent().trim();
+
+ switch (alignmentTextContent) {
+ case EXI_PARAMETER_BIT_PACKED:
+ options.setAlignmentType(AlignmentType.bitPacked);
+ break;
+ case EXI_PARAMETER_BYTE_ALIGNED:
+ options.setAlignmentType(AlignmentType.byteAligned);
+ break;
+ case EXI_PARAMETER_COMPRESSED:
+ options.setAlignmentType(AlignmentType.compress);
+ break;
+ case EXI_PARAMETER_PRE_COMPRESSION:
+ options.setAlignmentType(AlignmentType.preCompress);
+ break;
}
}
package org.opendaylight.controller.netconf.nettyutil.handler.exi;
+import com.google.common.collect.Lists;
import java.util.List;
-
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import com.google.common.collect.Lists;
-
/**
* Start-exi netconf message.
*/
public static final String PIS_KEY = "pis";
public static final String PREFIXES_KEY = "prefixes";
- private NetconfStartExiMessage(Document doc) {
+ private NetconfStartExiMessage(final Document doc) {
super(doc);
}
- public static NetconfStartExiMessage create(EXIOptions exiOptions, String messageId) {
- Document doc = XmlUtil.newDocument();
- Element rpcElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
+ public static NetconfStartExiMessage create(final EXIOptions exiOptions, final String messageId) {
+ final Document doc = XmlUtil.newDocument();
+ final Element rpcElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
XmlNetconfConstants.RPC_KEY);
rpcElement.setAttributeNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
XmlNetconfConstants.MESSAGE_ID, messageId);
// TODO draft http://tools.ietf.org/html/draft-varga-netconf-exi-capability-02#section-3.5.1 has no namespace for start-exi element in xml
- Element startExiElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
+ final Element startExiElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
START_EXI);
addAlignment(exiOptions, doc, startExiElement);
return new NetconfStartExiMessage(doc);
}
- private static void addFidelity(EXIOptions exiOptions, Document doc, Element startExiElement) {
- List<Element> fidelityElements = Lists.newArrayList();
+ private static void addFidelity(final EXIOptions exiOptions, final Document doc, final Element startExiElement) {
+ final List<Element> fidelityElements = Lists.newArrayList();
createFidelityElement(doc, fidelityElements, exiOptions.getPreserveComments(), COMMENTS_KEY);
createFidelityElement(doc, fidelityElements, exiOptions.getPreserveDTD(), DTD_KEY);
createFidelityElement(doc, fidelityElements, exiOptions.getPreserveLexicalValues(), LEXICAL_VALUES_KEY);
createFidelityElement(doc, fidelityElements, exiOptions.getPreserveNS(), PREFIXES_KEY);
if (fidelityElements.isEmpty() == false) {
- Element fidelityElement = doc.createElementNS(
+ final Element fidelityElement = doc.createElementNS(
XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0, FIDELITY_KEY);
- for (Element element : fidelityElements) {
+ for (final Element element : fidelityElements) {
fidelityElement.appendChild(element);
}
startExiElement.appendChild(fidelityElement);
}
}
- private static void addAlignment(EXIOptions exiOptions, Document doc, Element startExiElement) {
- Element alignmentElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
+ private static void addAlignment(final EXIOptions exiOptions, final Document doc, final Element startExiElement) {
+ final Element alignmentElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
ALIGNMENT_KEY);
- alignmentElement.setTextContent(exiOptions.getAlignmentType().toString());
+
+ String alignmentString = EXIParameters.EXI_PARAMETER_BIT_PACKED;
+ switch (exiOptions.getAlignmentType()) {
+ case byteAligned: {
+ alignmentString = EXIParameters.EXI_PARAMETER_BYTE_ALIGNED;
+ break;
+ }
+ case bitPacked: {
+ alignmentString = EXIParameters.EXI_PARAMETER_BIT_PACKED;
+ break;
+ }
+ case compress: {
+ alignmentString = EXIParameters.EXI_PARAMETER_COMPRESSED;
+ break;
+ }
+ case preCompress: {
+ alignmentString = EXIParameters.EXI_PARAMETER_PRE_COMPRESSION;
+ break;
+ }
+ }
+
+ alignmentElement.setTextContent(alignmentString);
startExiElement.appendChild(alignmentElement);
}
- private static void createFidelityElement(Document doc, List<Element> fidelityElements, boolean fidelity, String fidelityName) {
+ private static void createFidelityElement(final Document doc, final List<Element> fidelityElements, final boolean fidelity, final String fidelityName) {
if (fidelity) {
fidelityElements.add(doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
connectPromise = null;
sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
- sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
-
- ctx.fireChannelActive();
+ // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+ if(channel != null) {
+ sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+ ctx.fireChannelActive();
+ }
}
private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
@Override
public synchronized void operationComplete(final IoReadFuture future) {
if(future.getException() != null) {
-
if(asyncOut.isClosed() || asyncOut.isClosing()) {
-
// Ssh dropped
logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
- invokeDisconnect();
- return;
} else {
logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
- invokeDisconnect();
}
+ invokeDisconnect();
+ return;
}
if (future.getRead() > 0) {
// Check limit for pending writes
pendingWriteCounter++;
if(pendingWriteCounter > MAX_PENDING_WRITES) {
+ promise.setFailure(e);
handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
", remote window is not getting read or is too small"));
}
logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
// In case of pending, re-invoke write after pending is finished
+ Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(final IoWriteFuture future) {
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.util.concurrent.Promise;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfSession;
+
+public class AbstractChannelInitializerTest {
+
+ @Mock
+ private Channel channel;
+ @Mock
+ private ChannelPipeline pipeline;
+ @Mock
+ private Promise<NetconfSession> sessionPromise;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ doReturn(pipeline).when(channel).pipeline();
+ doReturn(pipeline).when(pipeline).addLast(anyString(), any(ChannelHandler.class));
+ }
+
+ @Test
+ public void testInit() throws Exception {
+ final TestingInitializer testingInitializer = new TestingInitializer();
+ testingInitializer.initialize(channel, sessionPromise);
+ verify(pipeline, times(4)).addLast(anyString(), any(ChannelHandler.class));
+ }
+
+ private static final class TestingInitializer extends AbstractChannelInitializer<NetconfSession> {
+
+ @Override
+ protected void initializeSessionNegotiator(final Channel ch, final Promise<NetconfSession> promise) {
+ }
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import com.google.common.base.Optional;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfSession;
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
+import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.openexi.proc.common.EXIOptions;
+
+public class AbstractNetconfSessionTest {
+
+ @Mock
+ private NetconfSessionListener<NetconfSession> listener;
+ @Mock
+ private Channel channel;
+ @Mock
+ private ChannelPipeline pipeline;
+ private NetconfHelloMessage clientHello;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ doNothing().when(listener).onMessage(any(NetconfSession.class), any(NetconfMessage.class));
+ doNothing().when(listener).onSessionUp(any(NetconfSession.class));
+ doNothing().when(listener).onSessionDown(any(NetconfSession.class), any(Exception.class));
+ doNothing().when(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class));
+
+ doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class));
+ doReturn(pipeline).when(channel).pipeline();
+ doReturn(mock(ChannelFuture.class)).when(channel).close();
+
+ doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class));
+
+ clientHello = NetconfHelloMessage.createClientHello(Collections.<String>emptySet(), Optional.<NetconfHelloMessageAdditionalHeader>absent());
+ }
+
+ @Test
+ public void testHandleMessage() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ testingNetconfSession.handleMessage(clientHello);
+ verify(listener).onMessage(testingNetconfSession, clientHello);
+ }
+
+ @Test
+ public void testSessionUp() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ testingNetconfSession.sessionUp();
+ verify(listener).onSessionUp(testingNetconfSession);
+ assertEquals(1L, testingNetconfSession.getSessionId());
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ testingNetconfSession.sessionUp();
+ testingNetconfSession.close();
+ verify(channel).close();
+ verify(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class));
+ }
+
+ @Test
+ public void testReplaceHandlers() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ final ChannelHandler mock = mock(ChannelHandler.class);
+ doReturn("handler").when(mock).toString();
+
+ testingNetconfSession.replaceMessageDecoder(mock);
+ verify(pipeline).replace(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, mock);
+ testingNetconfSession.replaceMessageEncoder(mock);
+ verify(pipeline).replace(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, mock);
+ testingNetconfSession.replaceMessageEncoderAfterNextMessage(mock);
+ verifyNoMoreInteractions(pipeline);
+
+ testingNetconfSession.sendMessage(clientHello);
+ verify(pipeline, times(2)).replace(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, mock);
+ }
+
+ @Test
+ public void testStartExi() throws Exception {
+ TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ testingNetconfSession = spy(testingNetconfSession);
+
+ testingNetconfSession.startExiCommunication(NetconfStartExiMessage.create(new EXIOptions(), "4"));
+ verify(testingNetconfSession).addExiHandlers(any(NetconfEXICodec.class));
+ }
+
+ @Test
+ public void testEndOfInput() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ testingNetconfSession.endOfInput();
+ verifyZeroInteractions(listener);
+ testingNetconfSession.sessionUp();
+ testingNetconfSession.endOfInput();
+ verify(listener).onSessionDown(any(NetconfSession.class), any(Exception.class));
+ }
+
+ @Test
+ public void testSendMessage() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ final NetconfHelloMessage clientHello = NetconfHelloMessage.createClientHello(Collections.<String>emptySet(), Optional.<NetconfHelloMessageAdditionalHeader>absent());
+ testingNetconfSession.sendMessage(clientHello);
+ verify(channel).writeAndFlush(clientHello);
+ }
+
+ private static class TestingNetconfSession extends AbstractNetconfSession<NetconfSession, NetconfSessionListener<NetconfSession>> {
+
+ protected TestingNetconfSession(final NetconfSessionListener<NetconfSession> sessionListener, final Channel channel, final long sessionId) {
+ super(sessionListener, channel, sessionId);
+ }
+
+ @Override
+ protected NetconfSession thisInstance() {
+ return this;
+ }
+
+ @Override
+ protected void addExiHandlers(final NetconfEXICodec exiCodec) {}
+
+ @Override
+ public void stopExiCommunication() {}
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.*;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.openexi.proc.common.EXIOptions;
+import org.openexi.proc.common.EXIOptionsException;
+import org.openexi.sax.Transmogrifier;
+import org.openexi.sax.TransmogrifierException;
+import org.xml.sax.InputSource;
+
+public class NetconfEXIHandlersTest {
+
+ private final String msgAsString = "<netconf-message/>";
+ private NetconfMessageToEXIEncoder netconfMessageToEXIEncoder;
+ private NetconfEXIToMessageDecoder netconfEXIToMessageDecoder;
+ private NetconfMessage msg;
+ private byte[] msgAsExi;
+
+ @Before
+ public void setUp() throws Exception {
+ final NetconfEXICodec codec = new NetconfEXICodec(new EXIOptions());
+ netconfMessageToEXIEncoder = new NetconfMessageToEXIEncoder(codec);
+ netconfEXIToMessageDecoder = new NetconfEXIToMessageDecoder(codec);
+
+ msg = new NetconfMessage(XmlUtil.readXmlToDocument(msgAsString));
+ this.msgAsExi = msgToExi(msgAsString, codec);
+ }
+
+ private byte[] msgToExi(final String msgAsString, final NetconfEXICodec codec) throws EXIOptionsException, TransmogrifierException, IOException {
+ final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ final Transmogrifier transmogrifier = codec.getTransmogrifier();
+ transmogrifier.setOutputStream(byteArrayOutputStream);
+ transmogrifier.encode(new InputSource(new ByteArrayInputStream(msgAsString.getBytes())));
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ @Test
+ public void testEncodeDecode() throws Exception {
+ final ByteBuf buffer = Unpooled.buffer();
+ netconfMessageToEXIEncoder.encode(null, msg, buffer);
+ final int exiLength = msgAsExi.length;
+ // array from buffer is cca 256 n length, compare only subarray
+ assertArrayEquals(msgAsExi, Arrays.copyOfRange(buffer.array(), 0, exiLength));
+
+ // assert all other bytes in buffer be 0
+ for (int i = exiLength; i < buffer.array().length; i++) {
+ assertEquals((byte)0, buffer.array()[i]);
+ }
+
+ final List<Object> out = Lists.newArrayList();
+ netconfEXIToMessageDecoder.decode(null, buffer, out);
+
+ XMLUnit.compareXML(msg.getDocument(), ((NetconfMessage) out.get(0)).getDocument());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.exi;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.openexi.proc.common.AlignmentType;
+import org.openexi.proc.common.EXIOptions;
+
+@RunWith(Parameterized.class)
+public class EXIParametersTest {
+
+ @Parameterized.Parameters
+ public static Iterable<Object[]> data() throws Exception {
+ final String noChangeXml =
+ "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+ "<alignment>bit-packed</alignment>\n" +
+ "</start-exi>\n";
+
+
+ final String fullOptionsXml =
+ "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+ "<alignment>byte-aligned</alignment>\n" +
+ "<fidelity>\n" +
+ "<comments/>\n" +
+ "<dtd/>\n" +
+ "<lexical-values/>\n" +
+ "<pis/>\n" +
+ "<prefixes/>\n" +
+ "</fidelity>\n" +
+ "</start-exi>\n";
+
+ final EXIOptions fullOptions = new EXIOptions();
+ fullOptions.setAlignmentType(AlignmentType.byteAligned);
+ fullOptions.setPreserveLexicalValues(true);
+ fullOptions.setPreserveDTD(true);
+ fullOptions.setPreserveComments(true);
+ fullOptions.setPreserveNS(true);
+ fullOptions.setPreservePIs(true);
+
+ return Arrays.asList(new Object[][]{
+ {noChangeXml, new EXIOptions()},
+ {fullOptionsXml, fullOptions},
+ });
+ }
+
+ private final String sourceXml;
+ private final EXIOptions exiOptions;
+
+ public EXIParametersTest(final String sourceXml, final EXIOptions exiOptions) {
+ this.sourceXml = sourceXml;
+ this.exiOptions = exiOptions;
+ }
+
+ @Test
+ public void testFromXmlElement() throws Exception {
+ final EXIParameters opts =
+ EXIParameters.fromXmlElement(
+ XmlElement.fromDomElement(
+ XmlUtil.readXmlToElement(sourceXml)));
+
+
+ assertEquals(opts.getOptions().getAlignmentType(), exiOptions.getAlignmentType());
+ assertEquals(opts.getOptions().getPreserveComments(), exiOptions.getPreserveComments());
+ assertEquals(opts.getOptions().getPreserveLexicalValues(), exiOptions.getPreserveLexicalValues());
+ assertEquals(opts.getOptions().getPreserveNS(), exiOptions.getPreserveNS());
+ assertEquals(opts.getOptions().getPreserveDTD(), exiOptions.getPreserveDTD());
+ assertEquals(opts.getOptions().getPreserveNS(), exiOptions.getPreserveNS());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.exi;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import org.custommonkey.xmlunit.Diff;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.openexi.proc.common.AlignmentType;
+import org.openexi.proc.common.EXIOptions;
+
+@RunWith(Parameterized.class)
+public class NetconfStartExiMessageTest {
+
+ @Parameterized.Parameters
+ public static Iterable<Object[]> data() throws Exception {
+ final String noChangeXml = "<rpc xmlns:ns0=\"urn:ietf:params:xml:ns:netconf:base:1.0\" ns0:message-id=\"id\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+ "<alignment>bit-packed</alignment>\n" +
+ "</start-exi>\n" +
+ "</rpc>";
+
+
+ final String fullOptionsXml = "<rpc xmlns:ns0=\"urn:ietf:params:xml:ns:netconf:base:1.0\" ns0:message-id=\"id\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+ "<alignment>byte-aligned</alignment>\n" +
+ "<fidelity>\n" +
+ "<comments/>\n" +
+ "<dtd/>\n" +
+ "<lexical-values/>\n" +
+ "<pis/>\n" +
+ "<prefixes/>\n" +
+ "</fidelity>\n" +
+ "</start-exi>\n" +
+ "</rpc>";
+
+ final EXIOptions fullOptions = new EXIOptions();
+ fullOptions.setAlignmentType(AlignmentType.byteAligned);
+ fullOptions.setPreserveLexicalValues(true);
+ fullOptions.setPreserveDTD(true);
+ fullOptions.setPreserveComments(true);
+ fullOptions.setPreserveNS(true);
+ fullOptions.setPreservePIs(true);
+
+ return Arrays.asList(new Object[][]{
+ {noChangeXml, new EXIOptions()},
+ {fullOptionsXml, fullOptions},
+ });
+ }
+
+ private final String controlXml;
+ private final EXIOptions exiOptions;
+
+ public NetconfStartExiMessageTest(final String controlXml, final EXIOptions exiOptions) {
+ this.controlXml = controlXml;
+ this.exiOptions = exiOptions;
+ }
+
+ @Test
+ public void testCreate() throws Exception {
+ final NetconfStartExiMessage startExiMessage = NetconfStartExiMessage.create(exiOptions, "id");
+
+ XMLUnit.setIgnoreWhitespace(true);
+ XMLUnit.setIgnoreAttributeOrder(true);
+ final Diff diff = XMLUnit.compareXML(XMLUnit.buildControlDocument(controlXml), startExiMessage.getDocument());
+ assertTrue(diff.toString(), diff.similar());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import java.nio.channels.WritePendingException;
+import org.apache.sshd.ClientChannel;
+import org.apache.sshd.ClientSession;
+import org.apache.sshd.SshClient;
+import org.apache.sshd.client.channel.ChannelSubsystem;
+import org.apache.sshd.client.future.AuthFuture;
+import org.apache.sshd.client.future.ConnectFuture;
+import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.SshFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.Buffer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+public class AsyncSshHandlerTest {
+
+ @Mock
+ private SshClient sshClient;
+ @Mock
+ private AuthenticationHandler authHandler;
+ @Mock
+ private ChannelHandlerContext ctx;
+ @Mock
+ private Channel channel;
+ @Mock
+ private SocketAddress remoteAddress;
+ @Mock
+ private SocketAddress localAddress;
+
+ private AsyncSshHandler asyncSshHandler;
+
+ private SshFutureListener<ConnectFuture> sshConnectListener;
+ private SshFutureListener<AuthFuture> sshAuthListener;
+ private SshFutureListener<OpenFuture> sshChannelOpenListener;
+
+ private ChannelPromise promise;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ stubAuth();
+ stubSshClient();
+ stubChannel();
+ stubCtx();
+ stubRemoteAddress();
+
+ promise = getMockedPromise();
+
+ asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ sshConnectListener = null;
+ sshAuthListener = null;
+ sshChannelOpenListener = null;
+ promise = null;
+ asyncSshHandler.close(ctx, getMockedPromise());
+ }
+
+ private void stubAuth() throws IOException {
+ doReturn("usr").when(authHandler).getUsername();
+
+ final AuthFuture authFuture = mock(AuthFuture.class);
+ Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<AuthFuture> result) {
+ sshAuthListener = result;
+ }
+ });
+ doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
+ final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(future).addListener(any(SshFutureListener.class));
+
+ return listenerSettableFuture;
+ }
+
+ private void stubRemoteAddress() {
+ doReturn("remote").when(remoteAddress).toString();
+ }
+
+ private void stubCtx() {
+ doReturn(channel).when(ctx).channel();
+ doReturn(ctx).when(ctx).fireChannelActive();
+ doReturn(ctx).when(ctx).fireChannelInactive();
+ doReturn(ctx).when(ctx).fireChannelRead(anyObject());
+ doReturn(getMockedPromise()).when(ctx).newPromise();
+ }
+
+ private void stubChannel() {
+ doReturn("channel").when(channel).toString();
+ }
+
+ private void stubSshClient() {
+ doNothing().when(sshClient).start();
+ final ConnectFuture connectFuture = mock(ConnectFuture.class);
+ Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<ConnectFuture> result) {
+ sshConnectListener = result;
+ }
+ });
+ doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
+ }
+
+ @Test
+ public void testConnectSuccess() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
+
+ verify(promise).setSuccess();
+ verifyNoMoreInteractions(promise);
+ verify(ctx).fireChannelActive();
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ verify(ctx).fireChannelRead(any(ByteBuf.class));
+ }
+
+ @Test
+ public void testReadClosed() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoReadFuture mockedReadFuture = asyncOut.read(null);
+
+ Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<IoReadFuture> result) {
+ doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
+ doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+ doReturn(true).when(asyncOut).isClosing();
+ doReturn(true).when(asyncOut).isClosed();
+ result.operationComplete(mockedReadFuture);
+ }
+ });
+
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ verify(ctx).fireChannelInactive();
+ }
+
+ @Test
+ public void testReadFail() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoReadFuture mockedReadFuture = asyncOut.read(null);
+
+ Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<IoReadFuture> result) {
+ doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
+ doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+ result.operationComplete(mockedReadFuture);
+ }
+ });
+
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ verify(ctx).fireChannelInactive();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ final ChannelPromise writePromise = getMockedPromise();
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
+
+ verify(writePromise).setSuccess();
+ }
+
+ @Test
+ public void testWriteClosed() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+
+ final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+
+ Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
+ doReturn(false).when(ioWriteFuture).isWritten();
+ doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
+ doReturn(true).when(asyncIn).isClosing();
+ doReturn(true).when(asyncIn).isClosed();
+ result.operationComplete(ioWriteFuture);
+ }
+ });
+
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ final ChannelPromise writePromise = getMockedPromise();
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
+
+ verify(writePromise).setFailure(any(Throwable.class));
+ }
+
+ @Test
+ public void testWritePendingOne() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ final ChannelPromise firstWritePromise = getMockedPromise();
+
+ // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
+ final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
+ final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
+ // intercept second listener, this is the listener for pending write for the pending write to know when pending state ended
+ final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
+
+ final ChannelPromise secondWritePromise = getMockedPromise();
+ // now make write throw pending exception
+ doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
+
+ doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
+
+ verifyZeroInteractions(firstWritePromise, secondWritePromise);
+
+ // make first write stop pending
+ firstWriteListener.operationComplete(ioWriteFuture);
+ // intercept third listener, this is regular listener for second write to determine success or failure
+ final ListenableFuture<SshFutureListener<IoWriteFuture>> afterPendingListener = stubAddListener(ioWriteFuture);
+
+ // notify listener for second write that pending has ended
+ pendingListener.get().operationComplete(ioWriteFuture);
+ // Notify third listener (regular listener for second write) that second write succeeded
+ afterPendingListener.get().operationComplete(ioWriteFuture);
+
+ // verify both write promises successful
+ verify(firstWritePromise).setSuccess();
+ verify(secondWritePromise).setSuccess();
+ }
+
+ @Test
+ public void testWritePendingMax() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ final ChannelPromise firstWritePromise = getMockedPromise();
+
+ // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
+ final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
+
+ final ChannelPromise secondWritePromise = getMockedPromise();
+ // now make write throw pending exception
+ doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
+ for (int i = 0; i < 1000; i++) {
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
+ }
+
+ verify(ctx).fireChannelInactive();
+ }
+
+ @Test
+ public void testDisconnect() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ final ChannelPromise disconnectPromise = getMockedPromise();
+ asyncSshHandler.disconnect(ctx, disconnectPromise);
+
+ verify(sshSession).close(anyBoolean());
+ verify(disconnectPromise).setSuccess();
+ verify(ctx).fireChannelInactive();
+ }
+
+ private OpenFuture getSuccessOpenFuture() {
+ final OpenFuture failedOpenFuture = mock(OpenFuture.class);
+ doReturn(true).when(failedOpenFuture).isOpened();
+ return failedOpenFuture;
+ }
+
+ private AuthFuture getSuccessAuthFuture() {
+ final AuthFuture authFuture = mock(AuthFuture.class);
+ doReturn(true).when(authFuture).isSuccess();
+ return authFuture;
+ }
+
+ private ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
+ final ConnectFuture connectFuture = mock(ConnectFuture.class);
+ doReturn(true).when(connectFuture).isConnected();
+
+ doReturn(sshSession).when(connectFuture).getSession();
+ return connectFuture;
+ }
+
+ private ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
+ final ClientSession sshSession = mock(ClientSession.class);
+
+ doReturn("sshSession").when(sshSession).toString();
+ doReturn("serverVersion").when(sshSession).getServerVersion();
+ doReturn(false).when(sshSession).isClosed();
+ doReturn(false).when(sshSession).isClosing();
+ final CloseFuture closeFuture = mock(CloseFuture.class);
+ Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<CloseFuture> result) {
+ doReturn(true).when(closeFuture).isClosed();
+ result.operationComplete(closeFuture);
+ }
+ });
+ doReturn(closeFuture).when(sshSession).close(false);
+
+ doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
+
+ return sshSession;
+ }
+
+ private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
+ final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
+ doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
+ final OpenFuture openFuture = mock(OpenFuture.class);
+
+ Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<OpenFuture> result) {
+ sshChannelOpenListener = result;
+ }
+ });
+
+ doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
+
+ doReturn(openFuture).when(subsystemChannel).open();
+ doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
+ return subsystemChannel;
+ }
+
+ private IoOutputStream getMockedIoOutputStream() {
+ final IoOutputStream mock = mock(IoOutputStream.class);
+ final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
+ doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
+ doReturn(true).when(ioWriteFuture).isWritten();
+
+ Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
+ result.operationComplete(ioWriteFuture);
+ }
+ });
+
+ doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
+ doReturn(false).when(mock).isClosed();
+ doReturn(false).when(mock).isClosing();
+ return mock;
+ }
+
+ private IoInputStream getMockedIoInputStream() {
+ final IoInputStream mock = mock(IoInputStream.class);
+ final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
+ doReturn(null).when(ioReadFuture).getException();
+ doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+ doReturn(5).when(ioReadFuture).getRead();
+ doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
+ doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+
+ // Always success for read
+ Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<IoReadFuture> result) {
+ result.operationComplete(ioReadFuture);
+ }
+ });
+
+ doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
+ doReturn(false).when(mock).isClosed();
+ doReturn(false).when(mock).isClosing();
+ return mock;
+ }
+
+ @Test
+ public void testConnectFailOpenChannel() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+
+ verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
+
+ try {
+ sshChannelOpenListener.operationComplete(getFailedOpenFuture());
+ fail("Exception expected");
+ } catch (final Exception e) {
+ verify(promise).setFailure(any(Throwable.class));
+ verifyNoMoreInteractions(promise);
+ // TODO should ctx.channelInactive be called if we throw exception ?
+ }
+ }
+
+ @Test
+ public void testConnectFailAuth() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final ClientSession sshSession = mock(ClientSession.class);
+ doReturn(true).when(sshSession).isClosed();
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+
+ final AuthFuture authFuture = getFailedAuthFuture();
+
+ try {
+ sshAuthListener.operationComplete(authFuture);
+ fail("Exception expected");
+ } catch (final Exception e) {
+ verify(promise).setFailure(any(Throwable.class));
+ verifyNoMoreInteractions(promise);
+ // TODO should ctx.channelInactive be called ?
+ }
+ }
+
+ private AuthFuture getFailedAuthFuture() {
+ final AuthFuture authFuture = mock(AuthFuture.class);
+ doReturn(false).when(authFuture).isSuccess();
+ doReturn(new IllegalStateException()).when(authFuture).getException();
+ return authFuture;
+ }
+
+ private OpenFuture getFailedOpenFuture() {
+ final OpenFuture authFuture = mock(OpenFuture.class);
+ doReturn(false).when(authFuture).isOpened();
+ doReturn(new IllegalStateException()).when(authFuture).getException();
+ return authFuture;
+ }
+
+ @Test
+ public void testConnectFail() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final ConnectFuture connectFuture = getFailedConnectFuture();
+ try {
+ sshConnectListener.operationComplete(connectFuture);
+ fail("Exception expected");
+ } catch (final Exception e) {
+ verify(promise).setFailure(any(Throwable.class));
+ verifyNoMoreInteractions(promise);
+ // TODO should ctx.channelInactive be called ?
+ }
+ }
+
+ private ConnectFuture getFailedConnectFuture() {
+ final ConnectFuture connectFuture = mock(ConnectFuture.class);
+ doReturn(false).when(connectFuture).isConnected();
+ doReturn(new IllegalStateException()).when(connectFuture).getException();
+ return connectFuture;
+ }
+
+ private ChannelPromise getMockedPromise() {
+ final ChannelPromise promise = mock(ChannelPromise.class);
+ doReturn(promise).when(promise).setSuccess();
+ doReturn(promise).when(promise).setFailure(any(Throwable.class));
+ return promise;
+ }
+
+ private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
+
+ @Override
+ public abstract void onSuccess(final SshFutureListener<T> result);
+
+ @Override
+ public void onFailure(final Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+}