--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>commons.opendaylight</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../../opendaylight/commons/opendaylight</relativePath>
+ </parent>
+ <artifactId>features-extras</artifactId>
+ <groupId>org.opendaylight.controller</groupId>
+ <packaging>jar</packaging>
+ <properties>
+ <features.file>features.xml</features.file>
+ <!-- Optional TODO: Move these properties to your parent pom and possibly
+ DependencyManagement section of your parent pom -->
+ <branding.version>1.1.0-SNAPSHOT</branding.version>
+ <karaf.resources.version>1.5.0-SNAPSHOT</karaf.resources.version>
+ <karaf.version>3.0.1</karaf.version>
+ <feature.test.version>0.7.0-SNAPSHOT</feature.test.version>
+ <karaf.empty.version>1.5.0-SNAPSHOT</karaf.empty.version>
+ <surefire.version>2.16</surefire.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.jolokia</groupId>
+ <artifactId>jolokia-osgi</artifactId>
+ <version>${jolokia.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ <version>${feature.test.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- dependency for opendaylight-karaf-empty for use by testing -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>opendaylight-karaf-empty</artifactId>
+ <version>${karaf.empty.version}</version>
+ <type>zip</type>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>filter</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>resources</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>${project.build.directory}/classes/${features.file}</file>
+ <type>xml</type>
+ <classifier>features</classifier>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${surefire.version}</version>
+ <configuration>
+ <systemPropertyVariables>
+ <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+ <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+ <karaf.distro.version>${karaf.empty.version}</karaf.distro.version>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.opendaylight.yangtools:features-test</dependency>
+ </dependenciesToScan>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary</url>
+ </scm>
+</project>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+
+<!--
+
+ This feature file is intended to contain only third party features that cannot be accommodated in any
+ other feature file. This is a good place to add features like jolokia which no other feature depends on
+ but which provides a utility.
+-->
+<features name="odl-extras-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.2.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
+ <feature name='odl-extras-all' version='${project.version}' description='OpenDaylight :: Extras :: All'>
+ <feature version="${project.version}">odl-jolokia</feature>
+ </feature>
+ <feature name="odl-jolokia" version="${project.version}" description="Jolokia JMX/HTTP bridge">
+ <feature>http</feature>
+ <bundle>mvn:org.jolokia/jolokia-osgi/${jolokia.version}</bundle>
+ </feature>
+</features>
<classifier>features</classifier>
<type>xml</type>
</dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller.md</groupId>
+ <artifactId>statistics-manager-config</artifactId>
+ <version>${mdsal.version}</version>
+ <type>xml</type>
+ <classifier>config</classifier>
+ </dependency>
+
<dependency>
<groupId>org.opendaylight.controller.model</groupId>
<artifactId>model-flow-base</artifactId>
<bundle>mvn:org.opendaylight.controller.md/inventory-manager/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller.md/forwardingrules-manager/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/liblldp/${sal.version}</bundle>
+ <configfile finalname="${config.configfile.directory}/${config.statistics.manager.configfile}">mvn:org.opendaylight.controller.md/statistics-manager-config/${mdsal.version}/xml/config</configfile>
</feature>
</features>
<module>akka</module>
<module>netconf-connector</module>
<module>restconf</module>
+ <module>extras</module>
</modules>
-</project>
\ No newline at end of file
+</project>
<controllermanager.northbound.version>0.1.0-SNAPSHOT</controllermanager.northbound.version>
<devices.web.version>0.5.0-SNAPSHOT</devices.web.version>
<dummy-console.version>1.2.0-SNAPSHOT</dummy-console.version>
+ <config.statistics.manager.configfile>30-statistics-manager.xml</config.statistics.manager.configfile>
<eclipse.persistence.version>2.5.0</eclipse.persistence.version>
<eclipse.jdt.core.compiler.batch.version>3.8.0.I20120518-2145</eclipse.jdt.core.compiler.batch.version>
<!-- enforcer version -->
createInstance();
final ConfigTransactionJMXClient transaction = configRegistryClient.createTransaction();
assertBeanCount(1, FACTORY_NAME);
- final NeverReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(
+ final NeverReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(
transaction.lookupConfigBean(FACTORY_NAME, INSTANCE_NAME), NeverReconnectStrategyFactoryModuleMXBean.class);
mxBean.setTimeout(200);
final CommitStatus status = transaction.commit();
private static ObjectName createInstance(final ConfigTransactionJMXClient transaction, final Integer timeout)
throws InstanceAlreadyExistsException {
final ObjectName nameCreated = transaction.createModule(FACTORY_NAME, INSTANCE_NAME);
- final NeverReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(nameCreated,
+ final NeverReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(nameCreated,
NeverReconnectStrategyFactoryModuleMXBean.class);
mxBean.setTimeout(timeout);
mxBean.setExecutor(GlobalEventExecutorUtil.create(transaction));
createInstance();
final ConfigTransactionJMXClient transaction = configRegistryClient.createTransaction();
assertBeanCount(1, FACTORY_NAME);
- final ReconnectImmediatelyStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(
+ final ReconnectImmediatelyStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(
transaction.lookupConfigBean(FACTORY_NAME, INSTANCE_NAME),
ReconnectImmediatelyStrategyFactoryModuleMXBean.class);
mxBean.setReconnectTimeout(200);
private static ObjectName createInstance(final ConfigTransactionJMXClient transaction, final Integer timeout)
throws InstanceAlreadyExistsException {
final ObjectName nameCreated = transaction.createModule(FACTORY_NAME, INSTANCE_NAME);
- final ReconnectImmediatelyStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(nameCreated,
+ final ReconnectImmediatelyStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(nameCreated,
ReconnectImmediatelyStrategyFactoryModuleMXBean.class);
mxBean.setReconnectTimeout(timeout);
mxBean.setReconnectExecutor(GlobalEventExecutorUtil.create(transaction));
createInstance();
final ConfigTransactionJMXClient transaction = configRegistryClient.createTransaction();
assertBeanCount(1, FACTORY_NAME);
- final TimedReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(
+ final TimedReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(
transaction.lookupConfigBean(FACTORY_NAME, INSTANCE_NAME), TimedReconnectStrategyFactoryModuleMXBean.class);
assertEquals(mxBean.getMinSleep(), new Long(100));
mxBean.setMinSleep(200L);
final Integer connectTime, final Long minSleep, final BigDecimal sleepFactor, final Long maxSleep,
final Long maxAttempts, final Long deadline) throws InstanceAlreadyExistsException {
final ObjectName nameCreated = transaction.createModule(FACTORY_NAME, instanceName);
- final TimedReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMBeanProxy(nameCreated,
+ final TimedReconnectStrategyFactoryModuleMXBean mxBean = transaction.newMXBeanProxy(nameCreated,
TimedReconnectStrategyFactoryModuleMXBean.class);
mxBean.setConnectTime(connectTime);
mxBean.setDeadline(deadline);
// test reported apsp number of threads
TestingParallelAPSPConfigMXBean parallelAPSPRuntimeProxy = configRegistryClient
- .newMBeanProxy(apspON, TestingParallelAPSPConfigMXBean.class);
+ .newMXBeanProxy(apspON, TestingParallelAPSPConfigMXBean.class);
assertEquals(
(Integer) TestingParallelAPSPImpl.MINIMAL_NUMBER_OF_THREADS,
parallelAPSPRuntimeProxy.getMaxNumberOfThreads());
import static org.junit.Assert.fail;
import javax.annotation.Nullable;
-import javax.management.DynamicMBean;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
import javax.management.ObjectName;
+import javax.management.ReflectionException;
import org.junit.Test;
import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
private void assertExists(@Nullable final ConfigTransactionJMXClient transaction,
final String moduleName, final String instanceName)
- throws InstanceNotFoundException {
+ throws InstanceNotFoundException, IntrospectionException, ReflectionException {
if (transaction != null) {
transaction.lookupConfigBean(moduleName, instanceName);
// make a dummy call
- configRegistryClient.newMBeanProxy(
- ObjectNameUtil.createTransactionModuleON(
- transaction.getTransactionName(), moduleName,
- instanceName), DynamicMBean.class).getMBeanInfo();
+ platformMBeanServer.getMBeanInfo(ObjectNameUtil.createTransactionModuleON(
+ transaction.getTransactionName(), moduleName, instanceName));
} else {
configRegistryClient.lookupConfigBean(moduleName, instanceName);
// make a dummy call
- configRegistryClient.newMBeanProxy(
- ObjectNameUtil.createReadOnlyModuleON(moduleName,
- instanceName), DynamicMBean.class).getMBeanInfo();
+ platformMBeanServer.getMBeanInfo(ObjectNameUtil.createReadOnlyModuleON(moduleName,
+ instanceName));
}
}
ObjectName apspName = transaction.createModule(
TestingParallelAPSPModuleFactory.NAME, "apsp1");
- TestingParallelAPSPConfigMXBean apspProxy = transaction.newMBeanProxy(
+ TestingParallelAPSPConfigMXBean apspProxy = transaction.newMXBeanProxy(
apspName, TestingParallelAPSPConfigMXBean.class);
apspProxy.setThreadPool(scheduledName);
apspProxy.setSomeParam("someParam");
ObjectName apspName = transaction.createModule(
TestingParallelAPSPModuleFactory.NAME, "apsp1");
- TestingParallelAPSPConfigMXBean apspProxy = transaction.newMBeanProxy(
+ TestingParallelAPSPConfigMXBean apspProxy = transaction.newMXBeanProxy(
apspName, TestingParallelAPSPConfigMXBean.class);
apspProxy.setThreadPool(ObjectNameUtil.createReadOnlyModuleON(
TestingScheduledThreadPoolModuleFactory.NAME, scheduled1));
ObjectName fixed1name = firstCommit();
// 2, check that configuration was copied to platform
- DynamicMBean dynamicMBean = configRegistryClient.newMBeanProxy(
- ObjectNameUtil.withoutTransactionName(fixed1name),
- DynamicMBean.class);
- dynamicMBean.getMBeanInfo();
- assertEquals(numberOfThreads, dynamicMBean.getAttribute("ThreadCount"));
+ ObjectName on = ObjectNameUtil.withoutTransactionName(fixed1name);
+ platformMBeanServer.getMBeanInfo(on);
+ assertEquals(numberOfThreads, platformMBeanServer.getAttribute(on, "ThreadCount"));
// 3, shutdown fixed1 in new transaction
assertFalse(TestingFixedThreadPool.allExecutors.get(0).isShutdown());
// dynamic config should be removed from platform
try {
- dynamicMBean.getMBeanInfo();
+ platformMBeanServer.getMBeanInfo(on);
fail();
} catch (Exception e) {
- assertTrue(e.getCause() instanceof InstanceNotFoundException);
+ assertTrue(e instanceof InstanceNotFoundException);
}
}
private ObjectName createInstance(ConfigTransactionJMXClient transaction, String instanceName)
throws InstanceAlreadyExistsException {
ObjectName nameCreated = transaction.createModule(factory.getImplementationName(), instanceName);
- transaction.newMBeanProxy(nameCreated, GlobalEventExecutorModuleMXBean.class);
+ transaction.newMXBeanProxy(nameCreated, GlobalEventExecutorModuleMXBean.class);
return nameCreated;
}
private ObjectName createInstance(ConfigTransactionJMXClient transaction, String instanceName)
throws InstanceAlreadyExistsException {
ObjectName nameCreated = transaction.createModule(factory.getImplementationName(), instanceName);
- transaction.newMBeanProxy(nameCreated, ImmediateEventExecutorModuleMXBean.class);
+ transaction.newMXBeanProxy(nameCreated, ImmediateEventExecutorModuleMXBean.class);
return nameCreated;
}
<type>xml</type>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>features-extras</artifactId>
+ <version>${project.version}</version>
+ <classifier>features</classifier>
+ <type>xml</type>
+ <scope>runtime</scope>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>features-flow</artifactId>
<module>inventory-manager</module>
<module>statistics-manager</module>
+ <module>statistics-manager-config</module>
<module>topology-manager</module>
<module>forwardingrules-manager</module>
<module>topology-lldp-discovery</module>
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftState;
*/
public class ExampleActor extends RaftActor {
- private final Map<String, String> state = new HashMap<>();
+ private final Map<String, String> state = new HashMap();
private final DataPersistenceProvider dataPersistenceProvider;
private long persistIdentifier = 1;
+ private Optional<ActorRef> roleChangeNotifier;
- public ExampleActor(final String id, final Map<String, String> peerAddresses,
- final Optional<ConfigParams> configParams) {
+ public ExampleActor(String id, Map<String, String> peerAddresses,
+ Optional<ConfigParams> configParams) {
super(id, peerAddresses, configParams);
this.dataPersistenceProvider = new PersistentDataProvider();
+ roleChangeNotifier = createRoleChangeNotifier(id);
}
public static Props props(final String id, final Map<String, String> peerAddresses,
final Optional<ConfigParams> configParams){
return Props.create(new Creator<ExampleActor>(){
- private static final long serialVersionUID = 1L;
@Override public ExampleActor create() throws Exception {
return new ExampleActor(id, peerAddresses, configParams);
});
}
- @Override public void onReceiveCommand(final Object message) throws Exception{
+ @Override public void onReceiveCommand(Object message) throws Exception{
if(message instanceof KeyValue){
if(isLeader()) {
String persistId = Long.toString(persistIdentifier++);
String followers = "";
if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
- LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
+ LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
+ getRaftActorContext().getPeerAddresses().keySet(), followers);
} else {
- LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+ LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
+ getRaftActorContext().getPeerAddresses().keySet());
}
}
}
+ protected String getReplicatedLogState() {
+ return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
+ + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
+ + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
+ }
+
+ public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
+ ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
+ RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
+ return Optional.<ActorRef>of(exampleRoleChangeNotifier);
+ }
+
+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return roleChangeNotifier;
+ }
+
@Override protected void applyState(final ActorRef clientActor, final String identifier,
final Object data) {
if(data instanceof KeyValue){
getSelf().tell(new CaptureSnapshotReply(bs), null);
}
- @Override protected void applySnapshot(final ByteString snapshot) {
+ @Override protected void applySnapshot(ByteString snapshot) {
state.clear();
try {
- state.putAll((Map<String, String>) toObject(snapshot));
+ state.putAll((HashMap) toObject(snapshot));
} catch (Exception e) {
LOG.error(e, "Exception in applying snapshot");
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot applied to state : {}", ((Map<?, ?>) state).size());
+ LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
}
}
- private ByteString fromObject(final Object snapshot) throws Exception {
+ private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
try {
}
}
- private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
+ private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
return dataPersistenceProvider;
}
- @Override public void onReceiveRecover(final Object message)throws Exception {
+ @Override public void onReceiveRecover(Object message)throws Exception {
super.onReceiveRecover(message);
}
}
@Override
- protected void startLogRecoveryBatch(final int maxBatchSize) {
+ protected void startLogRecoveryBatch(int maxBatchSize) {
}
@Override
- protected void appendRecoveredLogEntry(final Payload data) {
+ protected void appendRecoveredLogEntry(Payload data) {
}
@Override
}
@Override
- protected void applyRecoverySnapshot(final ByteString snapshot) {
+ protected void applyRecoverySnapshot(ByteString snapshot) {
}
}
--- /dev/null
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Props;
+import akka.japi.Creator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.example.messages.RegisterListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
+ * <p/>
+ * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
+ * <p/>
+ * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
+ * <p/>
+ * If all the notifiers have been regsitered with, then it cancels the scheduler.
+ * It starts the scheduler again when it receives a new registration
+ *
+ */
+public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
+ // the akka url should be set to the notifiers actor-system and domain.
+ private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/";
+
+ private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
+ private Cancellable registrationSchedule = null;
+ private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+ private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
+ private final String memberName;
+ private static final String[] shardsToMonitor = new String[] {"example"};
+
+ public ExampleRoleChangeListener(String memberName) {
+ super();
+ scheduleRegistrationListener(schedulerDuration);
+ this.memberName = memberName;
+ populateRegistry(memberName);
+ }
+
+ public static Props getProps(final String memberName) {
+ return Props.create(new Creator<Actor>() {
+ @Override
+ public Actor create() throws Exception {
+ return new ExampleRoleChangeListener(memberName);
+ }
+ });
+ }
+
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ if (message instanceof RegisterListener) {
+ // called by the scheduler at intervals to register any unregistered notifiers
+ sendRegistrationRequests();
+
+ } else if (message instanceof RegisterRoleChangeListenerReply) {
+ // called by the Notifier
+ handleRegisterRoleChangeListenerReply(getSender().path().toString());
+
+ } else if (message instanceof RoleChangeNotification) {
+ // called by the Notifier
+ RoleChangeNotification notification = (RoleChangeNotification) message;
+
+ LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
+ notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
+
+ // the apps dependent on such notifications can be called here
+ //TODO: add implementation here
+
+ }
+ }
+
+ private void scheduleRegistrationListener(FiniteDuration interval) {
+ LOG.debug("--->scheduleRegistrationListener called.");
+ registrationSchedule = getContext().system().scheduler().schedule(
+ interval, interval, getSelf(), new RegisterListener(),
+ getContext().system().dispatcher(), getSelf());
+
+ }
+
+ private void populateRegistry(String memberName) {
+
+ for (String shard: shardsToMonitor) {
+ String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
+ .append("/").append(memberName).append("-notifier").toString();
+
+ if (!notifierRegistrationStatus.containsKey(notifier)) {
+ notifierRegistrationStatus.put(notifier, false);
+ }
+ }
+
+ if (!registrationSchedule.isCancelled()) {
+ scheduleRegistrationListener(schedulerDuration);
+ }
+ }
+
+ private void sendRegistrationRequests() {
+ for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
+ if (!entry.getValue()) {
+ try {
+ LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
+ ActorRef notifier = Await.result(
+ getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
+
+ notifier.tell(new RegisterRoleChangeListener(), getSelf());
+
+ } catch (Exception e) {
+ LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
+ }
+ }
+ }
+ }
+
+ private void handleRegisterRoleChangeListenerReply(String senderId) {
+ if (notifierRegistrationStatus.containsKey(senderId)) {
+ notifierRegistrationStatus.put(senderId, true);
+
+ //cancel the schedule when listener is registered with all notifiers
+ if (!registrationSchedule.isCancelled()) {
+ boolean cancelScheduler = true;
+ for (Boolean value : notifierRegistrationStatus.values()) {
+ cancelScheduler = cancelScheduler & value;
+ }
+ if (cancelScheduler) {
+ registrationSchedule.cancel();
+ }
+ }
+ } else {
+ LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
+ senderId);
+ }
+ }
+
+
+ @Override
+ public void close() throws Exception {
+ registrationSchedule.cancel();
+ }
+}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.base.Optional;
-import org.opendaylight.controller.cluster.example.messages.PrintRole;
-import org.opendaylight.controller.cluster.example.messages.PrintState;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.example.messages.PrintRole;
+import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
/**
* This is a test driver for testing akka-raft implementation
*/
public class TestDriver {
- private static final ActorSystem actorSystem = ActorSystem.create();
+
private static Map<String, String> allPeers = new HashMap<>();
private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
private int nameCounter = 0;
private static ConfigParams configParams = new ExampleConfigParamsImpl();
+ private static ActorSystem actorSystem;
+ private static ActorSystem listenerActorSystem;
+
/**
* Create nodes, add clients and start logging.
* Commands
* @throws Exception
*/
public static void main(String[] args) throws Exception {
+
+ actorSystem = ActorSystem.create("raft-test", ConfigFactory
+ .load().getConfig("raft-test"));
+
+ listenerActorSystem = ActorSystem.create("raft-test-listener", ConfigFactory
+ .load().getConfig("raft-test-listener"));
+
TestDriver td = new TestDriver();
System.out.println("Enter command (type bye to exit):");
}
}
+ // create the listener using a separate actor system for each example actor
+ private void createClusterRoleChangeListener(List<String> memberIds) {
+ System.out.println("memberIds="+memberIds);
+ for (String memberId : memberIds) {
+ ActorRef listenerActor = listenerActorSystem.actorOf(
+ ExampleRoleChangeListener.getProps(memberId), memberId + "-role-change-listener");
+ System.out.println("Role Change Listener created:" + listenerActor.path().toString());
+ }
+ }
+
public static ActorRef createExampleActor(String name) {
return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
Optional.of(configParams)), name);
public void createNodes(int num) {
for (int i=0; i < num; i++) {
nameCounter = nameCounter + 1;
- allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
+ allPeers.put("example-"+nameCounter, "akka://raft-test/user/example-"+nameCounter);
}
for (String s : allPeers.keySet()) {
System.out.println("Created node:"+s);
}
+
+ createClusterRoleChangeListener(Lists.newArrayList(allPeers.keySet()));
}
// add num clients to all nodes in the system
package org.opendaylight.controller.cluster.example.messages;
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
public class KeyValue extends Payload implements Serializable {
private static final long serialVersionUID = 1L;
return this;
}
+ @Override
+ public int size() {
+ return this.value.length() + this.key.length();
+ }
+
}
--- /dev/null
+package org.opendaylight.controller.cluster.example.messages;
+
+/**
+ * Message sent by the Example Role Change Listener to itself for registering itself with the notifiers
+ *
+ * This message is sent by the scheduler
+ */
+public class RegisterListener {
+}
--- /dev/null
+package org.opendaylight.controller.cluster.example.messages;
+
+import java.util.List;
+
+/**
+ * Created by kramesha on 11/18/14.
+ */
+public class SetNotifiers {
+ private List<String> notifierList;
+
+ public SetNotifiers(List<String> notifierList) {
+ this.notifierList = notifierList;
+ }
+
+ public List<String> getNotifierList() {
+ return notifierList;
+ }
+}
protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
protected long previousSnapshotIndex = -1;
protected long previousSnapshotTerm = -1;
+ protected int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
snapshottedJournal = null;
previousSnapshotIndex = -1;
previousSnapshotTerm = -1;
+ dataSize = 0;
}
@Override
*/
long getSnapshotBatchCount();
+ /**
+ * The percentage of total memory in the in-memory Raft log before a snapshot
+ * is to be taken
+ *
+ * @return int
+ */
+ int getSnapshotDataThresholdPercentage();
+
/**
* The interval at which a heart beat message will be sent to the remote
* RaftActor
*/
package org.opendaylight.controller.cluster.raft;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.FiniteDuration;
/**
* Default implementation of the ConfigParams
private FiniteDuration isolatedLeaderCheckInterval =
new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
+ // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
+ // in-memory journal can use before it needs to snapshot
+ private int snapshotDataThresholdPercentage = 12;
+
public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
}
this.snapshotBatchCount = snapshotBatchCount;
}
+ public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage){
+ this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
+ }
+
public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
}
return snapshotBatchCount;
}
+ @Override
+ public int getSnapshotDataThresholdPercentage() {
+ return snapshotDataThresholdPercentage;
+ }
+
+
@Override
public FiniteDuration getHeartBeatInterval() {
return heartBeatInterval;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import java.io.Serializable;
-import java.util.Map;
-
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
* in a cluster. It implements the RAFT algorithm as described in the paper
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
onRecoveryComplete();
+
+ RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = new Follower(context);
- onStateChanged();
+ handleBehaviorChange(oldBehavior, currentBehavior);
}
}
}
replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
replicatedLog.snapshotTerm, replicatedLog.size());
+ RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = new Follower(context);
- onStateChanged();
+ handleBehaviorChange(oldBehavior, currentBehavior);
}
@Override public void handleCommand(Object message) {
RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = currentBehavior.handleMessage(getSender(), message);
- if(oldBehavior != currentBehavior){
- onStateChanged();
- }
-
- onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
+ handleBehaviorChange(oldBehavior, currentBehavior);
}
}
- public java.util.Set<String> getPeers() {
-
- return context.getPeerAddresses().keySet();
- }
+ private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+ if (oldBehavior != currentBehavior){
+ onStateChanged();
+ }
+ if (oldBehavior != null) {
+ // it can happen that the state has not changed but the leader has changed.
+ onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
- protected String getReplicatedLogState() {
- return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
- + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
- + ", im-mem journal size=" + context.getReplicatedLog().size();
+ if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) {
+ // we do not want to notify when the behavior/role is set for the first time (i.e follower)
+ getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(),
+ currentBehavior.state().name()), getSelf());
+ }
+ }
}
-
/**
* When a derived RaftActor needs to persist something it must call
* persistData.
protected abstract DataPersistenceProvider persistence();
+ /**
+ * Notifier Actor for this RaftActor to notify when a role change happens
+ * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
+ */
+ protected abstract Optional<ActorRef> getRoleChangeNotifier();
+
protected void onLeaderChanged(String oldLeader, String newLeader){};
private void trimPersistentData(long sequenceNumber) {
@Override public void apply(DeleteEntries param)
throws Exception {
//FIXME : Doing nothing for now
+ dataSize = 0;
+ for(ReplicatedLogEntry entry : journal){
+ dataSize += entry.size();
+ }
}
});
}
appendAndPersist(null, null, replicatedLogEntry);
}
+ @Override
+ public int dataSize() {
+ return dataSize;
+ }
+
public void appendAndPersist(final ActorRef clientActor,
final String identifier,
final ReplicatedLogEntry replicatedLogEntry) {
new Procedure<ReplicatedLogEntry>() {
@Override
public void apply(ReplicatedLogEntry evt) throws Exception {
+ dataSize += replicatedLogEntry.size();
+
+ long dataThreshold = Runtime.getRuntime().totalMemory() *
+ getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
// when a snaphsot is being taken, captureSnapshot != null
if (hasSnapshotCaptureInitiated == false &&
- journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+ ( journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0 ||
+ dataSize > dataThreshold)) {
LOG.info("Initiating Snapshot Capture..");
long lastAppliedIndex = -1;
protected RaftActorBehavior getCurrentBehavior() {
return currentBehavior;
}
+
}
* Restores the replicated log to a state in the event of a save snapshot failure
*/
public void snapshotRollback();
+
+ /**
+ * Size of the data in the log (in bytes)
+ */
+ public int dataSize();
}
* @return
*/
long getIndex();
+
+ /**
+ * The size of the entry in bytes.
+ *
+ * An approximate number may be good enough.
+ *
+ * @return
+ */
+ int size();
}
package org.opendaylight.controller.cluster.raft;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-
import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
Serializable {
return index;
}
+ @Override
+ public int size() {
+ return getData().size();
+ }
+
@Override public String toString() {
return "Entry{" +
"index=" + index +
* set commitIndex = N (§5.3, §5.4).
*/
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+
+ // The index of the first chunk that is sent when installing a snapshot
+ public static final int FIRST_CHUNK_INDEX = 1;
+
+ // The index that the follower should respond with if it needs the install snapshot to be reset
+ public static final int INVALID_CHUNK_INDEX = -1;
+
+ // This would be passed as the hash code of the last chunk when sending the first chunk
+ public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
"sending snapshot chunk failed, Will retry, Chunk:{}",
reply.getChunkIndex()
);
+
followerToSnapshot.markSendStatus(false);
}
" or Chunk Index in InstallSnapshotReply not matching {} != {}",
followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
);
+
+ if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+ // Since the Follower did not find this index to be valid we should reset the follower snapshot
+ // so that Installing the snapshot can resume from the beginning
+ followerToSnapshot.reset();
+ }
}
}
context.getReplicatedLog().getSnapshotTerm(),
getNextSnapshotChunk(followerId,snapshot.get()),
mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks()
+ mapFollowerToSnapshot.get(followerId).getTotalChunks(),
+ Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
).toSerializable(),
actor()
);
private boolean replyStatus = false;
private int chunkIndex;
private int totalChunks;
+ private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+ private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
public FollowerToSnapshot(ByteString snapshotBytes) {
this.snapshotBytes = snapshotBytes;
- replyReceivedForOffset = -1;
- chunkIndex = 1;
int size = snapshotBytes.size();
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
LOG.debug("Snapshot {} bytes, total chunks to send:{}",
size, totalChunks);
}
+ replyReceivedForOffset = -1;
+ chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
public ByteString getSnapshotBytes() {
// if the chunk sent was successful
replyReceivedForOffset = offset;
replyStatus = true;
+ lastChunkHashCode = nextChunkHashCode;
} else {
// if the chunk sent was failure
replyReceivedForOffset = offset;
LOG.debug("length={}, offset={},size={}",
snapshotLength, start, size);
}
- return getSnapshotBytes().substring(start, start + size);
+ ByteString substring = getSnapshotBytes().substring(start, start + size);
+ nextChunkHashCode = substring.hashCode();
+ return substring;
+ }
+
+ /**
+ * reset should be called when the Follower needs to be sent the snapshot from the beginning
+ */
+ public void reset(){
+ offset = 0;
+ replyStatus = false;
+ replyReceivedForOffset = offset;
+ chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
+ lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+ }
+ public int getLastChunkHashCode() {
+ return lastChunkHashCode;
}
}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
+import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import java.util.ArrayList;
-
/**
* The behavior of a RaftActor in the Follower state
* <p/>
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
- private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
+ private SnapshotTracker snapshotTracker = null;
public Follower(RaftActorContext context) {
super(context);
);
}
- try {
- if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
- // this is the last chunk, create a snapshot object and apply
-
- snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
- snapshotChunksCollected.size());
- }
+ if(snapshotTracker == null){
+ snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+ }
- Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
- new ArrayList<ReplicatedLogEntry>(),
- installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm(),
- installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm());
+ try {
+ if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+ installSnapshot.getLastChunkHashCode())){
+ Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+ new ArrayList<ReplicatedLogEntry>(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm());
actor().tell(new ApplySnapshot(snapshot), actor());
- } else {
- // we have more to go
- snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+ snapshotTracker = null;
- if(LOG.isDebugEnabled()) {
- LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
- installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
- }
}
sender.tell(new InstallSnapshotReply(
- currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
- true), actor());
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+ true), actor());
+
+ } catch (SnapshotTracker.InvalidChunkException e) {
+
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ -1, false), actor());
+ snapshotTracker = null;
+
+ } catch (Exception e){
- } catch (Exception e) {
LOG.error(e, "Exception in InstallSnapshot of follower:");
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
- installSnapshot.getChunkIndex(), false), actor());
+ installSnapshot.getChunkIndex(), false), actor());
+
}
}
@Override public void close() throws Exception {
stopElection();
}
+
+ @VisibleForTesting
+ ByteString getSnapshotChunksCollected(){
+ return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+ }
+
+
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+
+/**
+ * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
+ */
+public class SnapshotTracker {
+ private final LoggingAdapter LOG;
+ private final int totalChunks;
+ private ByteString collectedChunks = ByteString.EMPTY;
+ private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
+ private boolean sealed = false;
+ private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+
+ SnapshotTracker(LoggingAdapter LOG, int totalChunks){
+ this.LOG = LOG;
+ this.totalChunks = totalChunks;
+ }
+
+ /**
+ * Adds a chunk to the tracker
+ *
+ * @param chunkIndex
+ * @param chunk
+ * @return true when the lastChunk is received
+ * @throws InvalidChunkException
+ */
+ boolean addChunk(int chunkIndex, ByteString chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+ if(sealed){
+ throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
+ }
+
+ if(lastChunkIndex + 1 != chunkIndex){
+ throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
+ }
+
+ if(lastChunkHashCode.isPresent()){
+ if(lastChunkHashCode.get() != this.lastChunkHashCode){
+ throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
+ "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get());
+ }
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Chunk={},collectedChunks.size:{}",
+ chunkIndex, collectedChunks.size());
+ }
+
+ sealed = (chunkIndex == totalChunks);
+ lastChunkIndex = chunkIndex;
+ collectedChunks = collectedChunks.concat(chunk);
+ this.lastChunkHashCode = chunk.hashCode();
+ return sealed;
+ }
+
+ byte[] getSnapshot(){
+ if(!sealed) {
+ throw new IllegalStateException("lastChunk not received yet");
+ }
+
+ return collectedChunks.toByteArray();
+ }
+
+ ByteString getCollectedChunks(){
+ return collectedChunks;
+ }
+
+ public static class InvalidChunkException extends Exception {
+ InvalidChunkException(String message){
+ super(message);
+ }
+ }
+
+}
package org.opendaylight.controller.cluster.raft.messages;
+import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
private final ByteString data;
private final int chunkIndex;
private final int totalChunks;
+ private final Optional<Integer> lastChunkHashCode;
public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
- long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+ long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
super(term);
this.leaderId = leaderId;
this.lastIncludedIndex = lastIncludedIndex;
this.data = data;
this.chunkIndex = chunkIndex;
this.totalChunks = totalChunks;
+ this.lastChunkHashCode = lastChunkHashCode;
}
+ public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+ long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+ this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
+ }
+
+
public String getLeaderId() {
return leaderId;
}
return totalChunks;
}
- public <T extends Object> Object toSerializable(){
- return InstallSnapshotMessages.InstallSnapshot.newBuilder()
- .setLeaderId(this.getLeaderId())
- .setChunkIndex(this.getChunkIndex())
- .setData(this.getData())
- .setLastIncludedIndex(this.getLastIncludedIndex())
- .setLastIncludedTerm(this.getLastIncludedTerm())
- .setTotalChunks(this.getTotalChunks()).build();
+ public Optional<Integer> getLastChunkHashCode() {
+ return lastChunkHashCode;
+ }
+ public <T extends Object> Object toSerializable(){
+ InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
+ .setLeaderId(this.getLeaderId())
+ .setChunkIndex(this.getChunkIndex())
+ .setData(this.getData())
+ .setLastIncludedIndex(this.getLastIncludedIndex())
+ .setLastIncludedTerm(this.getLastIncludedTerm())
+ .setTotalChunks(this.getTotalChunks());
+
+ if(lastChunkHashCode.isPresent()){
+ builder.setLastChunkHashCode(lastChunkHashCode.get());
+ }
+ return builder.build();
}
public static InstallSnapshot fromSerializable (Object o) {
InstallSnapshotMessages.InstallSnapshot from =
(InstallSnapshotMessages.InstallSnapshot) o;
+ Optional<Integer> lastChunkHashCode = Optional.absent();
+ if(from.hasLastChunkHashCode()){
+ lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+ }
+
InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
from.getLeaderId(), from.getLastIncludedIndex(),
from.getLastIncludedTerm(), from.getData(),
- from.getChunkIndex(), from.getTotalChunks());
+ from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
return installSnapshot;
}
serialization-bindings {
"org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
+ "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
"com.google.protobuf.Message" = proto
"com.google.protobuf.GeneratedMessage" = proto
}
}
}
+
+raft-test {
+ akka {
+
+ loglevel = "DEBUG"
+
+ actor {
+ # enable to test serialization only.
+ # serialize-messages = on
+
+ provider = "akka.remote.RemoteActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
+ "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
+ "com.google.protobuf.Message" = proto
+ "com.google.protobuf.GeneratedMessage" = proto
+ }
+ }
+
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2550
+ }
+ }
+ }
+}
+
+raft-test-listener {
+
+ akka {
+ loglevel = "DEBUG"
+
+ actor {
+ provider = "akka.remote.RemoteActorRefProvider"
+ }
+
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2554
+ }
+ }
+
+ member-id = "member-1"
+ }
+}
+
+
+
this.snapshotTerm = snapshotTerm;
}
+ @Override
+ public int dataSize() {
+ return -1;
+ }
+
public List<ReplicatedLogEntry> getEntriesTill(final int index) {
return journal.subList(0, index);
}
import akka.event.LoggingAdapter;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
public class MockRaftActorContext implements RaftActorContext {
append(replicatedLogEntry);
}
+ @Override
+ public int dataSize() {
+ return -1;
+ }
+
@Override public void removeFromAndPersist(long index) {
removeFrom(index);
}
return this;
}
+ @Override
+ public int size() {
+ return value.length();
+ }
+
@Override public String getClientPayloadClassName() {
return MockPayload.class.getName();
}
@Override public long getIndex() {
return index;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
}
public static class MockReplicatedLogBuilder {
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
private final DataPersistenceProvider dataPersistenceProvider;
private final RaftActor delegate;
+ private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+ private final List<Object> state;
+ private ActorRef roleChangeNotifier;
public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
private static final long serialVersionUID = 1L;
private final String id;
private final Optional<ConfigParams> config;
private final DataPersistenceProvider dataPersistenceProvider;
+ private final ActorRef roleChangeNotifier;
private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
- Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+ Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
+ ActorRef roleChangeNotifier) {
this.peerAddresses = peerAddresses;
this.id = id;
this.config = config;
this.dataPersistenceProvider = dataPersistenceProvider;
+ this.roleChangeNotifier = roleChangeNotifier;
}
@Override
public MockRaftActor create() throws Exception {
- return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
+ MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
+ dataPersistenceProvider);
+ mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
+ return mockRaftActor;
}
}
- private final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
- private final List<Object> state;
-
public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
super(id, peerAddresses, config);
state = new ArrayList<>();
public static Props props(final String id, final Map<String, String> peerAddresses,
Optional<ConfigParams> config){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
}
public static Props props(final String id, final Map<String, String> peerAddresses,
Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
}
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ Optional<ConfigParams> config, ActorRef roleChangeNotifier){
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
+ }
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
delegate.applyState(clientActor, identifier, data);
LOG.info("applyState called");
}
-
-
-
@Override
protected void startLogRecoveryBatch(int maxBatchSize) {
}
return this.dataPersistenceProvider;
}
+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return Optional.fromNullable(roleChangeNotifier);
+ }
+
@Override public String persistenceId() {
return this.getId();
}
};
}
+ @Test
+ public void testRaftRoleChangeNotifier() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ String id = "testRaftRoleChangeNotifier";
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
+ Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
+
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.setCurrentBehavior(new Follower(mockRaftActor.getRaftActorContext()));
+
+ // sleeping for a minimum of 2 seconds, if it spans more its fine.
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+ List<Object> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+ assertNotNull(matches);
+ assertEquals(2, matches.size());
+
+ // check if the notifier got a role change from Follower to Candidate
+ RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
+ assertEquals(id, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+
+ // check if the notifier got a role change from Candidate to Leader
+ raftRoleChanged = (RoleChanged) matches.get(1);
+ assertEquals(id, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+ }};
+ }
+
private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
public class FollowerTest extends AbstractRaftActorBehaviorTest {
private final ActorRef followerActor = getSystem().actorOf(Props.create(
int offset = 0;
int snapshotLength = bsSnapshot.size();
int i = 1;
+ int chunkIndex = 1;
do {
chunkData = getNextChunk(bsSnapshot, offset);
final InstallSnapshot installSnapshot =
new InstallSnapshot(1, "leader-1", i, 1,
- chunkData, i, 3);
+ chunkData, chunkIndex, 3);
follower.handleMessage(leaderActor, installSnapshot);
offset = offset + 50;
i++;
+ chunkIndex++;
} while ((offset+50) < snapshotLength);
- final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
+ final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
follower.handleMessage(leaderActor, installSnapshot3);
String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
}
}.get();
+ // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
+ assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
String applySnapshotMatch = "";
for (String reply: matches) {
if (reply.startsWith("applySnapshot")) {
}};
}
+ @Test
+ public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+ JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(
+ MessageCollectorActor.class));
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext(getRef());
+
+ Follower follower = (Follower) createBehavior(context);
+
+ HashMap<String, String> followerSnapshot = new HashMap<>();
+ followerSnapshot.put("1", "A");
+ followerSnapshot.put("2", "B");
+ followerSnapshot.put("3", "C");
+
+ ByteString bsSnapshot = toByteString(followerSnapshot);
+
+ final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
+ follower.handleMessage(leaderActor, installSnapshot);
+
+ Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+
+ assertNotNull(messages);
+ assertTrue(messages instanceof List);
+ List<Object> listMessages = (List<Object>) messages;
+
+ int installSnapshotReplyReceivedCount = 0;
+ for (Object message: listMessages) {
+ if (message instanceof InstallSnapshotReply) {
+ ++installSnapshotReplyReceivedCount;
+ }
+ }
+
+ assertEquals(1, installSnapshotReplyReceivedCount);
+ InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
+ assertEquals(false, reply.isSuccess());
+ assertEquals(-1, reply.getChunkIndex());
+ assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
+
+ }};
+ }
+
public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
return MessageCollectorActor.getAllMessages(actor);
}
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
public class LeaderTest extends AbstractRaftActorBehaviorTest {
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ followerActor.path().toString());
actorContext.setPeerAddresses(peerAddresses);
}};
}
+ @Test
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+
+ @Test
+ public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+ int hashCode = installSnapshot.getData().hashCode();
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+ o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(2, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+
@Test
public void testFollowerToSnapshotLogic() {
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnapshotTrackerTest {
+
+ Map<String, String> data;
+ ByteString byteString;
+ ByteString chunk1;
+ ByteString chunk2;
+ ByteString chunk3;
+
+ @Before
+ public void setup(){
+ data = new HashMap<>();
+ data.put("key1", "value1");
+ data.put("key2", "value2");
+ data.put("key3", "value3");
+
+ byteString = toByteString(data);
+ chunk1 = getNextChunk(byteString, 0, 10);
+ chunk2 = getNextChunk(byteString, 10, 10);
+ chunk3 = getNextChunk(byteString, 20, byteString.size());
+ }
+
+ @Test
+ public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
+ SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+ tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+ tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
+
+ // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
+ SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+
+ try {
+ tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+ Assert.fail();
+ } catch(SnapshotTracker.InvalidChunkException e){
+ e.getMessage().startsWith("Invalid chunk");
+ }
+
+ // The first chunk's index must at least be FIRST_CHUNK_INDEX
+ SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ try {
+ tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
+ Assert.fail();
+ } catch(SnapshotTracker.InvalidChunkException e){
+
+ }
+
+ // Out of sequence chunk indexes won't work
+ SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+
+ try {
+ tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
+ Assert.fail();
+ } catch(SnapshotTracker.InvalidChunkException e){
+
+ }
+
+ // No exceptions will be thrown when invalid chunk is added with the right sequence
+ // If the lastChunkHashCode is missing
+ SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+ // Look I can add the same chunk again
+ tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
+
+ // An exception will be thrown when an invalid chunk is addedd with the right sequence
+ // when the lastChunkHashCode is present
+ SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
+
+ try {
+ // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777
+ tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
+ Assert.fail();
+ }catch(SnapshotTracker.InvalidChunkException e){
+
+ }
+
+ }
+
+ @Test
+ public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
+
+ // Trying to get a snapshot before all chunks have been received will throw an exception
+ SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+ tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+ try {
+ tracker1.getSnapshot();
+ Assert.fail();
+ } catch(IllegalStateException e){
+
+ }
+
+ SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3);
+
+ tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+ tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+
+ byte[] snapshot = tracker2.getSnapshot();
+
+ assertEquals(byteString, ByteString.copyFrom(snapshot));
+ }
+
+ @Test
+ public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
+ SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+ ByteString chunks = chunk1.concat(chunk2);
+
+ tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+
+ assertEquals(chunks, tracker1.getCollectedChunks());
+ }
+
+ public ByteString getNextChunk (ByteString bs, int offset, int size){
+ int snapshotLength = bs.size();
+ int start = offset;
+ if (size > snapshotLength) {
+ size = snapshotLength;
+ } else {
+ if ((start + size) > snapshotLength) {
+ size = snapshotLength - start;
+ }
+ }
+ return bs.substring(start, start + size);
+ }
+
+ private ByteString toByteString(Map<String, String> state) {
+ ByteArrayOutputStream b = null;
+ ObjectOutputStream o = null;
+ try {
+ try {
+ b = new ByteArrayOutputStream();
+ o = new ObjectOutputStream(b);
+ o.writeObject(state);
+ byte[] snapshotBytes = b.toByteArray();
+ return ByteString.copyFrom(snapshotBytes);
+ } finally {
+ if (o != null) {
+ o.flush();
+ o.close();
+ }
+ if (b != null) {
+ b.close();
+ }
+ }
+ } catch (IOException e) {
+ org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+ }
+ return null;
+ }
+
+
+}
\ No newline at end of file
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
public class MessageCollectorActor extends UntypedActor {
private List<Object> messages = new ArrayList<>();
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Message sent from the listener of Role Change messages to register itself to the Role Change Notifier
+ *
+ * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable
+ */
+public class RegisterRoleChangeListener implements Serializable {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Reply message sent from a RoleChangeNotifier to the Role Change Listener
+ *
+ * Can be sent to a separate actor system and hence should be made serializable.
+ */
+public class RegisterRoleChangeListenerReply implements Serializable {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Notification message representing a Role change of a cluster member
+ *
+ * Roles generally are Leader, Follower and Candidate. But can be based on the consensus strategy/implementation
+ *
+ * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable
+ */
+public class RoleChangeNotification implements Serializable {
+ private String memberId;
+ private String oldRole;
+ private String newRole;
+
+ public RoleChangeNotification(String memberId, String oldRole, String newRole) {
+ this.memberId = memberId;
+ this.oldRole = oldRole;
+ this.newRole = newRole;
+ }
+
+ public String getMemberId() {
+ return memberId;
+ }
+
+ public String getOldRole() {
+ return oldRole;
+ }
+
+ public String getNewRole() {
+ return newRole;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import akka.actor.Actor;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import akka.serialization.Serialization;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+
+/**
+ * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * the listeners (within the same node), which are registered with it.
+ * <p/>
+ * The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
+ */
+public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
+
+ private String memberId;
+ private Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
+ private RoleChangeNotification latestRoleChangeNotification = null;
+
+ public RoleChangeNotifier(String memberId) {
+ this.memberId = memberId;
+ }
+
+ public static Props getProps(final String memberId) {
+ return Props.create(new Creator<Actor>() {
+ @Override
+ public Actor create() throws Exception {
+ return new RoleChangeNotifier(memberId);
+ }
+ });
+ }
+
+ @Override
+ public void preStart() throws Exception {
+ super.preStart();
+ LOG.info("RoleChangeNotifier:{} created and ready for shard:{}",
+ Serialization.serializedActorPath(getSelf()), memberId);
+ }
+
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ if (message instanceof RegisterRoleChangeListener) {
+ // register listeners for this shard
+
+ ActorRef curRef = registeredListeners.get(getSender().path());
+ if (curRef != null) {
+ // ActorPaths would pass equal even if the unique id of the actors are different
+ // if a listener actor is re-registering after reincarnation, then removing the existing
+ // entry so the actor path with correct unique id is registered.
+ registeredListeners.remove(getSender().path());
+ }
+ registeredListeners.put(getSender().path(), getSender());
+
+ LOG.info("RoleChangeNotifier for {} , registered listener {}", memberId,
+ getSender().path().toString());
+
+ getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
+
+ if (latestRoleChangeNotification != null) {
+ getSender().tell(latestRoleChangeNotification, getSelf());
+ }
+
+
+ } else if (message instanceof RoleChanged) {
+ // this message is sent by RaftActor. Notify registered listeners when this message is received.
+ RoleChanged roleChanged = (RoleChanged) message;
+
+ LOG.info("RoleChangeNotifier for {} , received role change from {} to {}", memberId,
+ roleChanged.getOldRole(), roleChanged.getNewRole());
+
+ latestRoleChangeNotification =
+ new RoleChangeNotification(roleChanged.getMemberId(),
+ roleChanged.getOldRole(), roleChanged.getNewRole());
+
+ for (ActorRef listener: registeredListeners.values()) {
+ listener.tell(latestRoleChangeNotification, getSelf());
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ registeredListeners.clear();
+ }
+}
+
--- /dev/null
+package org.opendaylight.controller.cluster.notifications;
+
+/**
+ * Role Change message initiated internally from the Raft Actor when a the behavior/role changes.
+ *
+ * Since its internal , need not be serialized
+ *
+ */
+public class RoleChanged {
+ private String memberId;
+ private String oldRole;
+ private String newRole;
+
+ public RoleChanged(String memberId, String oldRole, String newRole) {
+ this.memberId = memberId;
+ this.oldRole = oldRole;
+ this.newRole = newRole;
+ }
+
+ public String getMemberId() {
+ return memberId;
+ }
+
+ public String getOldRole() {
+ return oldRole;
+ }
+
+ public String getNewRole() {
+ return newRole;
+ }
+}
import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.UnknownFieldSet;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
public class CompositeModificationPayload extends Payload implements
Serializable {
public Object getModification(){
return this.modification;
}
+
+ public int size(){
+ return this.modification.getSerializedSize();
+ }
}
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
/**
* An instance of a Payload class is meant to be used as the Payload for
public abstract Payload decode(
AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload);
+ public abstract int size();
+
}
* <code>optional int32 totalChunks = 7;</code>
*/
int getTotalChunks();
+
+ // optional int32 lastChunkHashCode = 8;
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ boolean hasLastChunkHashCode();
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ int getLastChunkHashCode();
}
/**
* Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
totalChunks_ = input.readInt32();
break;
}
+ case 64: {
+ bitField0_ |= 0x00000080;
+ lastChunkHashCode_ = input.readInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
return totalChunks_;
}
+ // optional int32 lastChunkHashCode = 8;
+ public static final int LASTCHUNKHASHCODE_FIELD_NUMBER = 8;
+ private int lastChunkHashCode_;
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public boolean hasLastChunkHashCode() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public int getLastChunkHashCode() {
+ return lastChunkHashCode_;
+ }
+
private void initFields() {
term_ = 0L;
leaderId_ = "";
data_ = com.google.protobuf.ByteString.EMPTY;
chunkIndex_ = 0;
totalChunks_ = 0;
+ lastChunkHashCode_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeInt32(7, totalChunks_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeInt32(8, lastChunkHashCode_);
+ }
getUnknownFields().writeTo(output);
}
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(7, totalChunks_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(8, lastChunkHashCode_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000020);
totalChunks_ = 0;
bitField0_ = (bitField0_ & ~0x00000040);
+ lastChunkHashCode_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
to_bitField0_ |= 0x00000040;
}
result.totalChunks_ = totalChunks_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.lastChunkHashCode_ = lastChunkHashCode_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
if (other.hasTotalChunks()) {
setTotalChunks(other.getTotalChunks());
}
+ if (other.hasLastChunkHashCode()) {
+ setLastChunkHashCode(other.getLastChunkHashCode());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional int32 lastChunkHashCode = 8;
+ private int lastChunkHashCode_ ;
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public boolean hasLastChunkHashCode() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public int getLastChunkHashCode() {
+ return lastChunkHashCode_;
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public Builder setLastChunkHashCode(int value) {
+ bitField0_ |= 0x00000080;
+ lastChunkHashCode_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public Builder clearLastChunkHashCode() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ lastChunkHashCode_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
}
static {
java.lang.String[] descriptorData = {
"\n\025InstallSnapshot.proto\022(org.opendayligh" +
- "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+ "t.controller.cluster.raft\"\270\001\n\017InstallSna" +
"pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
"\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
"Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
- " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
- "light.controller.protobuff.messages.clus" +
- "ter.raftB\027InstallSnapshotMessagesH\001"
+ " \001(\005\022\023\n\013totalChunks\030\007 \001(\005\022\031\n\021lastChunkHa" +
+ "shCode\030\010 \001(\005BX\n;org.opendaylight.control" +
+ "ler.protobuff.messages.cluster.raftB\027Ins" +
+ "tallSnapshotMessagesH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
- new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+ new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", "LastChunkHashCode", });
return null;
}
};
optional bytes data = 5;
optional int32 chunkIndex = 6;
optional int32 totalChunks = 7;
+ optional int32 lastChunkHashCode = 8;
}
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntry;
private static final String TWO_ONE_NAME = "one";
private static final String TWO_TWO_NAME = "two";
private static final String DESC = "Hello there";
-
+ private static final Long LONG_ID = 1L;
+ private static final Boolean ENABLED = false;
+ private static final Short SHORT_ID = 1;
+ private static final Byte BYTE_ID = 1;
// Family specific constants
public static final QName FAMILY_QNAME =
QName
private static final String FIRST_GRAND_CHILD_NAME = "first grand child";
private static final String SECOND_GRAND_CHILD_NAME = "second grand child";
+
// first child
private static final YangInstanceIdentifier CHILDREN_1_PATH =
YangInstanceIdentifier.builder(CHILDREN_PATH)
QName.create(TEST_QNAME, "my-bits"))).withValue(
ImmutableSet.of("foo", "bar"));
+ // Create unkey list entry
+ UnkeyedListEntryNode binaryDataKey =
+ Builders.unkeyedListEntryBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(SOME_BINARY_DATE_QNAME, DESC)).build();
+
+ Map<QName, Object> keyValues = new HashMap<>();
+ keyValues.put(CHILDREN_QNAME, FIRST_CHILD_NAME);
+
// Create the document
return ImmutableContainerNodeBuilder
.create()
.withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+ new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
.withChild(myBits.build())
.withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC))
- .withChild(ImmutableNodes.leafNode(POINTER_QNAME, "pointer"))
+ .withChild(ImmutableNodes.leafNode(POINTER_QNAME, ENABLED))
+ .withChild(ImmutableNodes.leafNode(POINTER_QNAME, SHORT_ID))
+ .withChild(ImmutableNodes.leafNode(POINTER_QNAME, BYTE_ID))
.withChild(
- ImmutableNodes.leafNode(SOME_REF_QNAME, YangInstanceIdentifier
- .builder().build()))
+ ImmutableNodes.leafNode(SOME_REF_QNAME, GRAND_CHILD_1_PATH))
.withChild(ImmutableNodes.leafNode(MYIDENTITY_QNAME, DESC_QNAME))
-
+ .withChild(Builders.unkeyedListBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(OUTER_LIST_QNAME))
+ .withChild(binaryDataKey).build())
+ .withChild(Builders.orderedMapBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)).withChild(mapEntry).build())
+ .withChild(Builders.choiceBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+ .withChild(ImmutableNodes.leafNode(DESC_QNAME, LONG_ID)).build())
// .withChild(augmentationNode)
.withChild(shoes)
.withChild(numbers)
.withChild(switchFeatures)
.withChild(
- mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build())
+ mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build())
.withChild(
- mapNodeBuilder(OUTER_LIST_QNAME)
- .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
- .withChild(BAR_NODE).build());
+ mapNodeBuilder(OUTER_LIST_QNAME)
+ .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
+ .withChild(BAR_NODE).build()
+ );
}
public static ContainerNode createTestContainer() {
package org.opendaylight.controller.cluster.datastore;
+import akka.util.Timeout;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
-import akka.util.Timeout;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-
/**
* Contains contextual data for a data store.
*
private boolean persistent = true;
private ConfigurationReader configurationReader = new FileConfigurationReader();
private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
+ private int shardSnapshotDataThresholdPercentage = 12;
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
+ public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+ return this;
+ }
+
+
public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
return this;
return this;
}
+
public DatastoreContext build() {
DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
TimeUnit.MILLISECONDS));
raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+ raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
raftConfig.setIsolatedLeaderCheckInterval(
new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
private Cancellable txCommitTimeoutCheckSchedule;
+ private Optional<ActorRef> roleChangeNotifier;
+
/**
* Coordinates persistence recovery on startup.
*/
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+
+ // create a notifier actor for each cluster member
+ roleChangeNotifier = createRoleChangeNotifier(name.toString());
}
private static Map<String, String> mapPeerAddresses(
return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
}
+ private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
+ ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
+ RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
+ return Optional.<ActorRef>of(shardRoleChangeNotifier);
+ }
+
@Override
public void postStop() {
super.postStop();
}
}
+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return roleChangeNotifier;
+ }
+
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
shardMBean.setCommitIndex(getCommitIndex());
shardMBean.setLastApplied(getLastApplied());
+ shardMBean.setDataSize(getRaftActorContext().getReplicatedLog().dataSize());
}
@Override
private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
+ private volatile long dataSize = 0;
+
private final SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
this.lastCommittedTransactionTime = lastCommittedTransactionTime;
}
+ public void setDataSize(long dataSize){
+ this.dataSize = dataSize;
+ }
+
+ @Override
+ public long getDataSize(){
+ return dataSize;
+ }
+
@Override
public ThreadExecutorStats getDataStoreExecutorStats() {
// FIXME: this particular thing does not work, as it really is DS-specific
int getMaxNotificationMgrListenerQueueSize();
void resetTransactionCounters();
+
+ long getDataSize();
}
}
}
+ typedef percentage {
+ type uint8 {
+ range "0..100";
+ }
+ }
+
grouping data-store-properties {
leaf max-shard-data-change-executor-queue-size {
default 1000;
leaf shard-snapshot-batch-count {
default 20000;
type non-zero-uint32-type;
- description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken.";
+ description "The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.";
}
+ leaf shard-snapshot-data-threshold-percentage {
+ default 12;
+ type percentage;
+ description "The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken";
+ }
+
+
leaf shard-hearbeat-interval-in-millis {
default 500;
type heartbeat-interval-type;
package org.opendaylight.controller.cluster.datastore;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
public class CompositeModificationPayloadTest {
@Override public long getIndex() {
return 1;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
});
AppendEntries appendEntries =
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class RoleChangeNotifierTest extends AbstractActorTest {
+
+ @Test
+ public void testHandleRegisterRoleChangeListener() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "testHandleRegisterRoleChangeListener";
+ ActorRef listenerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+ getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
+
+ notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor);
+
+ RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply)
+ MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class);
+ assertNotNull(reply);
+
+ RoleChangeNotification notification = (RoleChangeNotification)
+ MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class);
+ assertNull(notification);
+ }};
+
+ }
+
+ @Test
+ public void testHandleRaftRoleChanged() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "testHandleRegisterRoleChangeListenerWithNotificationSet";
+ ActorRef listenerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ ActorRef shardActor = getTestActor();
+
+ TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+ getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
+
+ RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor();
+
+ notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor);
+
+ // no notification should be sent as listener has not yet registered
+ assertNull(MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class));
+
+ // listener registers after role has been changed, ensure we sent the latest role change after a reply
+ notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor);
+
+ RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply)
+ MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class);
+ assertNotNull(reply);
+
+ RoleChangeNotification notification = (RoleChangeNotification)
+ MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class);
+ assertNotNull(notification);
+ assertEquals(RaftState.Candidate.name(), notification.getOldRole());
+ assertEquals(RaftState.Leader.name(), notification.getNewRole());
+
+ }};
+
+ }
+}
+
+
package org.opendaylight.controller.cluster.datastore.utils;
+import akka.actor.ActorRef;
import akka.actor.UntypedActor;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
/**
* MessageCollectorActor collects messages as it receives them. It can send
@Override public void onReceive(Object message) throws Exception {
if(message instanceof String){
if("messages".equals(message)){
- getSender().tell(messages, getSelf());
+ getSender().tell(new ArrayList(messages), getSelf());
}
} else {
messages.add(message);
}
}
+
+ public static List<Object> getAllMessages(ActorRef actor) throws Exception {
+ FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
+ Timeout operationTimeout = new Timeout(operationDuration);
+ Future<Object> future = Patterns.ask(actor, "messages", operationTimeout);
+
+ try {
+ return (List<Object>) Await.result(future, operationDuration);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ /**
+ * Get the first message that matches the specified class
+ * @param actor
+ * @param clazz
+ * @return
+ */
+ public static Object getFirstMatching(ActorRef actor, Class<?> clazz) throws Exception {
+ List<Object> allMessages = getAllMessages(actor);
+
+ for(Object message : allMessages){
+ if(message.getClass().equals(clazz)){
+ return message;
+ }
+ }
+
+ return null;
+ }
+
+ public static List<Object> getAllMatching(ActorRef actor, Class<?> clazz) throws Exception {
+ List<Object> allMessages = getAllMessages(actor);
+
+ List<Object> output = Lists.newArrayList();
+
+ for(Object message : allMessages){
+ if(message.getClass().equals(clazz)){
+ output.add(message);
+ }
+ }
+
+ return output;
+ }
+
}
import akka.actor.Props;
import akka.actor.UntypedActor;
import com.typesafe.config.ConfigFactory;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import java.util.ArrayList;
+import java.util.List;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import java.util.ArrayList;
-import java.util.List;
-
public class Client {
private static ActorSystem actorSystem;
@Override public long getIndex() {
return 1;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
});
return new AppendEntries(1, "member-1", 0, 100, modification, 1);
@Override public long getIndex() {
return 1;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
});
return new AppendEntries(1, "member-1", 0, 100, modification, 1);
import akka.cluster.ClusterActorRefProvider;
import akka.event.Logging;
import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.utils.ConditionalProbe;
-
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.utils.ConditionalProbe;
/**
* A store that syncs its data across nodes in the cluster.
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- if ( provider instanceof ClusterActorRefProvider)
+ if ( provider instanceof ClusterActorRefProvider) {
getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
+ }
}
}
if (message instanceof ConditionalProbe) {
+ // The ConditionalProbe is only used for unit tests.
log.info("Received probe {} {}", getSelf(), message);
probe = (ConditionalProbe) message;
+ // Send back any message to tell the caller we got the probe.
+ getSender().tell("Got it", getSelf());
} else if (message instanceof UpdateBucket) {
receiveUpdateBucket(((UpdateBucket) message).getBucket());
} else if (message instanceof GetAllBuckets) {
Map<Address, Bucket> buckets = new HashMap<>();
//first add the local bucket if asked
- if (members.contains(selfAddress))
+ if (members.contains(selfAddress)) {
buckets.put(selfAddress, localBucket);
+ }
//then get buckets for requested remote nodes
for (Address address : members){
- if (remoteBuckets.containsKey(address))
+ if (remoteBuckets.containsKey(address)) {
buckets.put(address, remoteBuckets.get(address));
+ }
}
return buckets;
void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
if (receivedBuckets == null || receivedBuckets.isEmpty())
+ {
return; //nothing to do
+ }
//Remote cant update self's bucket
receivedBuckets.remove(selfAddress);
for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
Long localVersion = versions.get(entry.getKey());
- if (localVersion == null) localVersion = -1L;
+ if (localVersion == null) {
+ localVersion = -1L;
+ }
Bucket receivedBucket = entry.getValue();
- if (receivedBucket == null)
+ if (receivedBucket == null) {
continue;
+ }
Long remoteVersion = receivedBucket.getVersion();
- if (remoteVersion == null) remoteVersion = -1L;
+ if (remoteVersion == null) {
+ remoteVersion = -1L;
+ }
//update only if remote version is newer
if ( remoteVersion.longValue() > localVersion.longValue() ) {
package org.opendaylight.controller.remote.rpc.registry;
-
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.ChildActorPath;
import akka.actor.Props;
+import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
import com.google.common.base.Predicate;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.utils.ConditionalProbe;
import org.opendaylight.yangtools.yang.common.QName;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import javax.annotation.Nullable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-
public class RpcRegistryTest {
- private static ActorSystem node1;
- private static ActorSystem node2;
- private static ActorSystem node3;
-
- private ActorRef registry1;
- private ActorRef registry2;
- private ActorRef registry3;
-
- @BeforeClass
- public static void setup() throws InterruptedException {
- RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
- RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
- RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
- node1 = ActorSystem.create("opendaylight-rpc", config1.get());
- node2 = ActorSystem.create("opendaylight-rpc", config2.get());
- node3 = ActorSystem.create("opendaylight-rpc", config3.get());
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(node1);
- JavaTestKit.shutdownActorSystem(node2);
- JavaTestKit.shutdownActorSystem(node3);
- if (node1 != null)
- node1.shutdown();
- if (node2 != null)
- node2.shutdown();
- if (node3 != null)
- node3.shutdown();
-
- }
-
- @Before
- public void createRpcRegistry() throws InterruptedException {
- registry1 = node1.actorOf(Props.create(RpcRegistry.class));
- registry2 = node2.actorOf(Props.create(RpcRegistry.class));
- registry3 = node3.actorOf(Props.create(RpcRegistry.class));
- }
-
- @After
- public void stopRpcRegistry() throws InterruptedException {
- if (registry1 != null)
- node1.stop(registry1);
- if (registry2 != null)
- node2.stop(registry2);
- if (registry3 != null)
- node3.stop(registry3);
- }
-
- /**
- * One node cluster.
- * 1. Register rpc, ensure router can be found
- * 2. Then remove rpc, ensure its deleted
- *
- * @throws URISyntaxException
- * @throws InterruptedException
- */
- @Test
- public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
-
- final JavaTestKit mockBroker = new JavaTestKit(node1);
-
- final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
-
- //install probe
- final JavaTestKit probe1 = createProbeForMessage(
- node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class);
-
- //Add rpc on node 1
- registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
- registry1.tell(getAddRouteMessage(), mockBroker.getRef());
-
- //Bucket store should get an update bucket message. Updated bucket contains added rpc.
- probe1.expectMsgClass(
- FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateBucket.class);
-
- //Now remove rpc
- registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
-
- //Bucket store should get an update bucket message. Rpc is removed in the updated bucket
- probe1.expectMsgClass(
- FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateBucket.class);
-
-
- }
-
-
- /**
- * Three node cluster.
- * 1. Register rpc on 1 node, ensure 2nd node gets updated
- * 2. Remove rpc on 1 node, ensure 2nd node gets updated
- *
- * @throws URISyntaxException
- * @throws InterruptedException
- */
- @Test
- public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
-
- final JavaTestKit mockBroker1 = new JavaTestKit(node1);
-
- //install probe on node2's bucket store
- final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
- final JavaTestKit probe2 = createProbeForMessage(
- node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
- //Add rpc on node 1
- registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
-
- //Bucket store on node2 should get a message to update its local copy of remote buckets
- probe2.expectMsgClass(
- FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
- //Now remove
- registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
-
- //Bucket store on node2 should get a message to update its local copy of remote buckets
- probe2.expectMsgClass(
- FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
- }
-
- /**
- * Three node cluster.
- * Register rpc on 2 nodes. Ensure 3rd gets updated.
- *
- * @throws Exception
- */
- @Test
- public void testRpcAddedOnMultiNodes() throws Exception {
-
- final JavaTestKit mockBroker1 = new JavaTestKit(node1);
- final JavaTestKit mockBroker2 = new JavaTestKit(node2);
- final JavaTestKit mockBroker3 = new JavaTestKit(node3);
-
- registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
-
- //install probe on node 3
- final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
- final JavaTestKit probe3 = createProbeForMessage(
- node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
-
- //Add rpc on node 1
- registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
-
- probe3.expectMsgClass(
- FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
+ private static ActorSystem node1;
+ private static ActorSystem node2;
+ private static ActorSystem node3;
+
+ private ActorRef registry1;
+ private ActorRef registry2;
+ private ActorRef registry3;
+
+ @BeforeClass
+ public static void staticSetup() throws InterruptedException {
+ RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+ RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+ RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
+ node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+ node2 = ActorSystem.create("opendaylight-rpc", config2.get());
+ node3 = ActorSystem.create("opendaylight-rpc", config3.get());
+ }
+
+ @AfterClass
+ public static void staticTeardown() {
+ JavaTestKit.shutdownActorSystem(node1);
+ JavaTestKit.shutdownActorSystem(node2);
+ JavaTestKit.shutdownActorSystem(node3);
+ }
+
+ @Before
+ public void setup() {
+ registry1 = node1.actorOf(Props.create(RpcRegistry.class));
+ registry2 = node2.actorOf(Props.create(RpcRegistry.class));
+ registry3 = node3.actorOf(Props.create(RpcRegistry.class));
+ }
+
+ @After
+ public void teardown() {
+ if (registry1 != null) {
+ node1.stop(registry1);
+ }
+ if (registry2 != null) {
+ node2.stop(registry2);
+ }
+ if (registry3 != null) {
+ node3.stop(registry3);
+ }
+ }
+
+ /**
+ * One node cluster. 1. Register rpc, ensure router can be found 2. Then remove rpc, ensure its
+ * deleted
+ *
+ * @throws URISyntaxException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testAddRemoveRpcOnSameNode() throws Exception {
+
+ System.out.println("testAddRemoveRpcOnSameNode starting");
+
+ final JavaTestKit mockBroker = new JavaTestKit(node1);
+
+ final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
+
+ // Add rpc on node 1
+ registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
+
+ // install probe
+ final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath,
+ Messages.BucketStoreMessages.UpdateBucket.class);
+
+ registry1.tell(getAddRouteMessage(), mockBroker.getRef());
+
+ // Bucket store should get an update bucket message. Updated bucket contains added rpc.
+ probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateBucket.class);
+
+ // Now remove rpc
+ registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
+
+ // Bucket store should get an update bucket message. Rpc is removed in the updated bucket
+ probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateBucket.class);
+
+ System.out.println("testAddRemoveRpcOnSameNode ending");
+
+ }
+
+ /**
+ * Three node cluster. 1. Register rpc on 1 node, ensure 2nd node gets updated 2. Remove rpc on
+ * 1 node, ensure 2nd node gets updated
+ *
+ * @throws URISyntaxException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testRpcAddRemoveInCluster() throws Exception {
+
+ System.out.println("testRpcAddRemoveInCluster starting");
+
+ final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+
+ // install probe on node2's bucket store
+ final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
+ final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath,
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+ // Add rpc on node 1
+ registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+ registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+
+ // Bucket store on node2 should get a message to update its local copy of remote buckets
+ probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+ // Now remove
+ registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
+
+ // Bucket store on node2 should get a message to update its local copy of remote buckets
+ probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+ System.out.println("testRpcAddRemoveInCluster ending");
+ }
+
+ /**
+ * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRpcAddedOnMultiNodes() throws Exception {
+
+ final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+ final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+ final JavaTestKit mockBroker3 = new JavaTestKit(node3);
+
+ registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
+
+ // install probe on node 3
+ final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
+ final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath,
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+ // Add rpc on node 1
+ registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+ registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
- //Add same rpc on node 2
- registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
- registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+ probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
- probe3.expectMsgClass(
- FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
- }
-
- private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz) {
- final JavaTestKit probe = new JavaTestKit(node);
+ // Add same rpc on node 2
+ registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
+ registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
- ConditionalProbe conditionalProbe =
- new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
- @Override
- public boolean apply(@Nullable Object input) {
- if (input != null)
- return clazz.equals(input.getClass());
- else
- return false;
- }
- });
-
- ActorSelection subject = node.actorSelection(subjectPath);
- subject.tell(conditionalProbe, ActorRef.noSender());
-
- return probe;
+ probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ }
- }
+ private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz)
+ throws Exception {
+ final JavaTestKit probe = new JavaTestKit(node);
- private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
- return new AddOrUpdateRoutes(createRouteIds());
- }
-
- private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
- return new RemoveRoutes(createRouteIds());
- }
+ ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
+ @Override
+ public boolean apply(@Nullable Object input) {
+ if (input != null) {
+ return clazz.equals(input.getClass());
+ } else {
+ return false;
+ }
+ }
+ });
- private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
- QName type = new QName(new URI("/mockrpc"), "mockrpc");
- List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
- routeIds.add(new RouteIdentifierImpl(null, type, null));
- return routeIds;
- }
+ FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
+ Timeout timeout = new Timeout(duration);
+ int maxTries = 30;
+ int i = 0;
+ while(true) {
+ ActorSelection subject = node.actorSelection(subjectPath);
+ Future<Object> future = Patterns.ask(subject, conditionalProbe, timeout);
+
+ try {
+ Await.ready(future, duration);
+ break;
+ } catch (TimeoutException | InterruptedException e) {
+ if(++i > maxTries) {
+ throw e;
+ }
+ }
+ }
+
+ return probe;
+
+ }
+
+ private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
+ return new AddOrUpdateRoutes(createRouteIds());
+ }
+
+ private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
+ return new RemoveRoutes(createRouteIds());
+ }
+
+ private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
+ QName type = new QName(new URI("/mockrpc"), "mockrpc");
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
+ routeIds.add(new RouteIdentifierImpl(null, type, null));
+ return routeIds;
+ }
}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-parent</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.opendaylight.controller.md</groupId>
+ <artifactId>statistics-manager-config</artifactId>
+ <description>Configuration files for statistics manager</description>
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>${project.build.directory}/classes/initial/30-statistics-manager.xml</file>
+ <type>xml</type>
+ <classifier>config</classifier>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+ Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<snapshot>
+ <configuration>
+ <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <module>
+ <type xmlns:statsmanager="urn:opendaylight:params:xml:ns:yang:controller:md:sal:statistics-manager">
+ statsmanager:statistics-manager
+ </type>
+ <name>statistics-manager</name>
+
+ <rpc-registry>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-rpc-registry</type>
+ <name>binding-rpc-broker</name>
+ </rpc-registry>
+
+ <data-broker>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-async-data-broker</type>
+ <name>binding-data-broker</name>
+ </data-broker>
+
+ <notification-service>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">
+ binding:binding-notification-service
+ </type>
+ <name>binding-notification-broker</name>
+ </notification-service>
+
+ <statistics-manager-settings>
+ <min-request-net-monitor-interval>3000</min-request-net-monitor-interval>
+ <max-nodes-for-collector>16</max-nodes-for-collector>
+ </statistics-manager-settings>
+
+ </module>
+ </modules>
+ </data>
+ </configuration>
+
+ <required-capabilities>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:statistics-manager?module=statistics-manager&revision=2014-09-25</capability>
+ </required-capabilities>
+
+</snapshot>
+
<artifactId>org.osgi.core</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<build>
<plugins>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <configuration>
- <instructions>
- <Bundle-Activator>org.opendaylight.controller.md.statistics.manager.StatisticsManagerActivator</Bundle-Activator>
- </instructions>
- </configuration>
- </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Import-Package>*</Import-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>config</id>
+ <goals>
+ <goal>generate-sources</goal>
+ </goals>
+ <configuration>
+ <codeGenerators>
+ <generator>
+ <codeGeneratorClass>org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator</codeGeneratorClass>
+ <outputBaseDir>${jmxGeneratorPath}</outputBaseDir>
+ <additionalConfiguration>
+ <namespaceToPackage1>urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang</namespaceToPackage1>
+ </additionalConfiguration>
+ </generator>
+ <generator>
+ <codeGeneratorClass>org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl</codeGeneratorClass>
+ <outputBaseDir>${salGeneratorPath}</outputBaseDir>
+ </generator>
+ </codeGenerators>
+ <inspectDependencies>true</inspectDependencies>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
<scm>
--- /dev/null
+package org.opendaylight.controller.config.yang.md.sal.statistics_manager;
+
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.controller.md.statistics.manager.impl.StatisticsManagerConfig;
+import org.opendaylight.controller.md.statistics.manager.impl.StatisticsManagerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatisticsManagerModule extends org.opendaylight.controller.config.yang.md.sal.statistics_manager.AbstractStatisticsManagerModule {
+ private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerModule.class);
+
+ private final static int MAX_NODES_FOR_COLLECTOR_DEFAULT = 16;
+ private final static int MIN_REQUEST_NET_MONITOR_INTERVAL_DEFAULT = 3000;
+
+ private StatisticsManager statisticsManagerProvider;
+
+ public StatisticsManagerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ super(identifier, dependencyResolver);
+ }
+
+ public StatisticsManagerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, final StatisticsManagerModule oldModule, final java.lang.AutoCloseable oldInstance) {
+ super(identifier, dependencyResolver, oldModule, oldInstance);
+ }
+
+ @Override
+ public void customValidation() {
+ // add custom validation form module attributes here.
+ }
+
+ @Override
+ public java.lang.AutoCloseable createInstance() {
+ LOG.info("StatisticsManager module initialization.");
+ final StatisticsManagerConfig config = createConfig();
+ statisticsManagerProvider = new StatisticsManagerImpl(getDataBrokerDependency(), config);
+ statisticsManagerProvider.start(getNotificationServiceDependency(), getRpcRegistryDependency());
+ LOG.info("StatisticsManager started successfully.");
+ return new AutoCloseable() {
+ @Override
+ public void close() throws Exception {
+ try {
+ statisticsManagerProvider.close();
+ }
+ catch (final Exception e) {
+ LOG.error("Unexpected error by stopping StatisticsManager module", e);
+ }
+ LOG.info("StatisticsManager module stopped.");
+ }
+ };
+ }
+
+ public StatisticsManagerConfig createConfig() {
+ final StatisticsManagerConfig.StatisticsManagerConfigBuilder builder = StatisticsManagerConfig.builder();
+ if (getStatisticsManagerSettings() != null && getStatisticsManagerSettings().getMaxNodesForCollector() != null) {
+ builder.setMaxNodesForCollector(getStatisticsManagerSettings().getMaxNodesForCollector());
+ } else {
+ LOG.warn("Load the xml ConfigSubsystem input value fail! MaxNodesForCollector value is set to {} ",
+ MAX_NODES_FOR_COLLECTOR_DEFAULT);
+ builder.setMaxNodesForCollector(MAX_NODES_FOR_COLLECTOR_DEFAULT);
+ }
+ if (getStatisticsManagerSettings() != null &&
+ getStatisticsManagerSettings().getMinRequestNetMonitorInterval() != null) {
+ builder.setMinRequestNetMonitorInterval(getStatisticsManagerSettings().getMinRequestNetMonitorInterval());
+ } else {
+ LOG.warn("Load the xml CofnigSubsystem input value fail! MinRequestNetMonitorInterval value is set to {} ",
+ MIN_REQUEST_NET_MONITOR_INTERVAL_DEFAULT);
+ builder.setMinRequestNetMonitorInterval(MIN_REQUEST_NET_MONITOR_INTERVAL_DEFAULT);
+ }
+ return builder.build();
+ }
+
+}
--- /dev/null
+/*
+* Generated file
+*
+* Generated from: yang module name: statistics-manager yang module local name: statistics-manager
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Tue Oct 07 14:09:47 CEST 2014
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.md.sal.statistics_manager;
+public class StatisticsManagerModuleFactory extends org.opendaylight.controller.config.yang.md.sal.statistics_manager.AbstractStatisticsManagerModuleFactory {
+
+}
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
+import org.opendaylight.controller.md.statistics.manager.impl.StatisticsManagerConfig;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
* @param minReqNetMonitInt
*/
void start(final NotificationProviderService notifService,
- final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt);
+ final RpcConsumerRegistry rpcRegistry);
/**
* Method provides read/write DataStore functionality cross applyOperation
*/
StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit();
+ StatisticsManagerConfig getConfiguration();
+
}
+++ /dev/null
-/*
- * Copyright IBM Corporation, 2013. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.md.statistics.manager;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.statistics.manager.impl.StatisticsManagerImpl;
-import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.osgi.framework.BundleContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Statistics Manager Activator
- *
- * OSGi bundle activator
- *
- */
-public class StatisticsManagerActivator extends AbstractBindingAwareProvider {
-
- private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerActivator.class);
-
- /* TODO move it to ConfigSubsystem */
- private static final long DEFAULT_MIN_REQUEST_NET_MONITOR_INTERVAL = 3000L;
- private static final int MAX_NODES_FOR_COLLECTOR = 16;
-
- private StatisticsManager statsProvider;
-
- @Override
- public void onSessionInitiated(final ProviderContext session) {
- LOG.info("StatisticsManagerActivator initialization.");
- try {
- final DataBroker dataBroker = session.getSALService(DataBroker.class);
- final NotificationProviderService notifService =
- session.getSALService(NotificationProviderService.class);
- statsProvider = new StatisticsManagerImpl(dataBroker, MAX_NODES_FOR_COLLECTOR);
- statsProvider.start(notifService, session, DEFAULT_MIN_REQUEST_NET_MONITOR_INTERVAL);
- LOG.info("StatisticsManagerActivator started successfully.");
- }
- catch (final Exception e) {
- LOG.error("Unexpected error by initialization of StatisticsManagerActivator", e);
- stopImpl(null);
- }
- }
-
- @VisibleForTesting
- StatisticsManager getStatisticManager() {
- return statsProvider;
- }
-
- @Override
- protected void stopImpl(final BundleContext context) {
- if (statsProvider != null) {
- try {
- statsProvider.close();
- }
- catch (final Exception e) {
- LOG.error("Unexpected error by stopping StatisticsManagerActivator", e);
- }
- statsProvider = null;
- }
- LOG.info("StatisticsManagerActivator stoped.");
- }
-}
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
statBuild.setFlowCapableNodeConnectorQueueStatistics(statChild);
final QueueKey qKey = new QueueKey(queueStat.getQueueId());
- final InstanceIdentifier<FlowCapableNodeConnectorQueueStatisticsData> queueStatIdent = nodeIdent
+ final InstanceIdentifier<Queue> queueIdent = nodeIdent
.child(NodeConnector.class, new NodeConnectorKey(queueStat.getNodeConnectorId()))
.augmentation(FlowCapableNodeConnector.class)
- .child(Queue.class, qKey).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class);
+ .child(Queue.class, qKey);
+ final InstanceIdentifier<FlowCapableNodeConnectorQueueStatisticsData> queueStatIdent = queueIdent.augmentation(FlowCapableNodeConnectorQueueStatisticsData.class);
existQueueKeys.remove(qKey);
+ tx.merge(LogicalDatastoreType.OPERATIONAL, queueIdent, new QueueBuilder().setKey(qKey).build());
tx.put(LogicalDatastoreType.OPERATIONAL, queueStatIdent, statBuild.build());
}
}
--- /dev/null
+package org.opendaylight.controller.md.statistics.manager.impl;
+
+public class StatisticsManagerConfig {
+ private final int maxNodesForCollector;
+ private final int minRequestNetMonitorInterval;
+
+ private StatisticsManagerConfig(StatisticsManagerConfigBuilder builder) {
+ this.maxNodesForCollector = builder.getMaxNodesForCollector();
+ this.minRequestNetMonitorInterval = builder.getMinRequestNetMonitorInterval();
+ }
+
+ public int getMaxNodesForCollector() {
+ return maxNodesForCollector;
+ }
+
+ public int getMinRequestNetMonitorInterval() {
+ return minRequestNetMonitorInterval;
+ }
+
+ public static StatisticsManagerConfigBuilder builder() {
+ return new StatisticsManagerConfigBuilder();
+ }
+
+ public static class StatisticsManagerConfigBuilder {
+ private int maxNodesForCollector;
+ private int minRequestNetMonitorInterval;
+
+ public int getMaxNodesForCollector() {
+ return maxNodesForCollector;
+ }
+
+ public void setMaxNodesForCollector(int maxNodesForCollector) {
+ this.maxNodesForCollector = maxNodesForCollector;
+ }
+
+ public int getMinRequestNetMonitorInterval() {
+ return minRequestNetMonitorInterval;
+ }
+
+ public void setMinRequestNetMonitorInterval(int minRequestNetMonitorInterval) {
+ this.minRequestNetMonitorInterval = minRequestNetMonitorInterval;
+ }
+
+ public StatisticsManagerConfig build() {
+ return new StatisticsManagerConfig(this);
+ }
+ }
+}
package org.opendaylight.controller.md.statistics.manager.impl;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadFactory;
-
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
/**
* statistics-manager
private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
private final DataBroker dataBroker;
- private final int maxNodesForCollectors;
- private long minReqNetMonitInt;
private final ExecutorService statRpcMsgManagerExecutor;
private final ExecutorService statDataStoreOperationServ;
private StatRpcMsgManager rpcMsgManager;
private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
- public StatisticsManagerImpl (final DataBroker dataBroker, final int maxNodesForCollector) {
+ private final StatisticsManagerConfig statManagerConfig;
+
+ public StatisticsManagerImpl (final DataBroker dataBroker, StatisticsManagerConfig statManagerconfig) {
+ this.statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
ThreadFactory threadFact;
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
- maxNodesForCollectors = maxNodesForCollector;
txChain = dataBroker.createTransactionChain(this);
}
@Override
public void start(final NotificationProviderService notifService,
- final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
+ final RpcConsumerRegistry rpcRegistry) {
Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
- this.minReqNetMonitInt = minReqNetMonitInt;
- rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, minReqNetMonitInt);
+ rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMinRequestNetMonitorInterval());
statCollectors = Collections.emptyList();
nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
}
}
final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
- minReqNetMonitInt, statCollectors.size() + 1, maxNodesForCollectors);
+ statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
+ statManagerConfig.getMaxNodesForCollector());
final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
statCollectorsNew.add(newCollector);
public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
return portNotifyCommiter;
}
+
+ @Override
+ public StatisticsManagerConfig getConfiguration() {
+ return statManagerConfig;
+ }
}
--- /dev/null
+module statistics-manager {
+
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:statistics-manager";
+ prefix "statistics-manager";
+
+ import config { prefix config; revision-date 2013-04-05; }
+ import opendaylight-md-sal-binding { prefix mdsal; revision-date 2013-10-28; }
+
+ description
+ "This module contains the base YANG definitions for
+ statitics-manager implementation.";
+
+ revision "2014-09-25" {
+ description
+ "Initial revision.";
+ }
+
+ identity statistics-manager {
+ base config:module-type;
+ config:java-name-prefix StatisticsManager;
+ }
+
+ augment "/config:modules/config:module/config:configuration" {
+ case statistics-manager {
+ when "/config:modules/config:module/config:type = 'statistics-manager'";
+
+ container rpc-registry {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity mdsal:binding-rpc-registry;
+ }
+ }
+ }
+
+ container notification-service {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity mdsal:binding-notification-service;
+ }
+ }
+ }
+
+ container data-broker {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity mdsal:binding-async-data-broker;
+ }
+ }
+ }
+
+ container statistics-manager-settings {
+ leaf min-request-net-monitor-interval {
+ type int32;
+ }
+ leaf max-nodes-for-collector {
+ type int32;
+ }
+ }
+ }
+ }
+
+}
+++ /dev/null
-/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.md.statistics.manager;
-
-/**
- * statistics-manager
- * org.opendaylight.controller.md.statistics.manager
- *
- *
- *
- * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *
- * Created: Sep 6, 2014
- */
-public class StatisticsManagerProvider {
-
- private final StatisticsManagerActivator activator;
-
- public StatisticsManagerProvider(final StatisticsManagerActivator activator) {
- this.activator = activator;
- }
-
- /**
- * Method provides Initialized {@link StatisticsManager}
- * from {@link StatisticsManagerActivator} for all tests
- * suites;
- *
- * @return
- */
- public StatisticsManager getStatisticsManager() {
- return activator.getStatisticManager();
- }
-}
+++ /dev/null
-package test.mock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManagerActivator;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowFeatureCapabilityFlowStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import test.mock.util.StatisticsManagerTest;
-
-import com.google.common.base.Optional;
-
-public class FlowStatisticsTest extends StatisticsManagerTest {
- private final Object waitObject = new Object();
-
-// @Test(timeout = 5000)
- public void addedFlowOnDemandStatsTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNode(s1Key);
-
- final Flow flow = getFlow();
-
- final InstanceIdentifier<Flow> flowII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(flow.getTableId()))
- .child(Flow.class, flow.getKey());
- final InstanceIdentifier<Table> tableII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(flow.getTableId()));
- final Table table = new TableBuilder().setKey(new TableKey(flow.getTableId())).setFlow(Collections.<Flow>emptyList()).build();
-
- final WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.put(LogicalDatastoreType.CONFIGURATION, tableII, table);
- writeTx.put(LogicalDatastoreType.CONFIGURATION, flowII, flow);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, tableII, table);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, flowII, flow);
- assertCommit(writeTx.submit());
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- flowII.augmentation(FlowStatisticsData.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<FlowStatisticsData> flowStatDataOptional = readTx.read(LogicalDatastoreType.OPERATIONAL, flowII.augmentation(FlowStatisticsData.class))
- .checkedGet();
- assertTrue(flowStatDataOptional.isPresent());
- assertEquals(COUNTER_64_TEST_VALUE, flowStatDataOptional.get().getFlowStatistics().getByteCount());
-
- }
-
-// @Test(timeout = 5000)
- public void deletedFlowStatsRemovalTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNode(s1Key);
-
- final Flow flow = getFlow();
-
- final InstanceIdentifier<Flow> flowII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(flow.getTableId()))
- .child(Flow.class, flow.getKey());
- final InstanceIdentifier<Table> tableII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(flow.getTableId()));
- final Table table = new TableBuilder().setKey(new TableKey(flow.getTableId())).setFlow(Collections.<Flow>emptyList()).build();
-
- WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.put(LogicalDatastoreType.CONFIGURATION, tableII, table);
- writeTx.put(LogicalDatastoreType.CONFIGURATION, flowII, flow);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, tableII, table);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, flowII, flow);
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- flowII.augmentation(FlowStatisticsData.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- assertCommit(writeTx.submit());
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- Optional<Flow> flowStatDataOptional = readTx.read(LogicalDatastoreType.OPERATIONAL, flowII).checkedGet();
- assertTrue(flowStatDataOptional.isPresent());
-// assertEquals(COUNTER_64_TEST_VALUE, flowStatDataOptional.get().getFlowStatistics().getByteCount());
-
- writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.delete(LogicalDatastoreType.CONFIGURATION, flowII);
- assertCommit(writeTx.submit());
-
- readTx = getDataBroker().newReadOnlyTransaction();
- flowStatDataOptional = readTx.read(LogicalDatastoreType.OPERATIONAL, flowII).checkedGet();
- assertFalse(flowStatDataOptional.isPresent());
- }
-
-// @Test(timeout = 23000)
- public void getAllStatsWhenNodeIsConnectedTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNodeWithFeatures(s1Key, false, FlowFeatureCapabilityFlowStats.class);
-
- final Flow flow = getFlow();
-
- final InstanceIdentifier<Table> tableII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(flow.getTableId()));
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- tableII.child(Flow.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<Table> tableOptional = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
- .child(Node.class, s1Key).augmentation(FlowCapableNode.class)
- .child(Table.class, new TableKey(flow.getTableId()))).checkedGet();
- assertTrue(tableOptional.isPresent());
- final FlowStatisticsData flowStats = tableOptional.get().getFlow().get(0).getAugmentation(FlowStatisticsData.class);
- assertTrue(flowStats != null);
- assertEquals(COUNTER_64_TEST_VALUE, flowStats.getFlowStatistics().getByteCount());
- }
-
- public class ChangeListener implements DataChangeListener {
-
- @Override
- public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
- synchronized (waitObject) {
- waitObject.notify();
- }
- }
- }
-}
-
+++ /dev/null
-package test.mock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManagerActivator;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowFeatureCapabilityGroupStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeatures;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import test.mock.util.StatisticsManagerTest;
-
-import com.google.common.base.Optional;
-
-public class GroupStatisticsTest extends StatisticsManagerTest {
- private final Object waitObject = new Object();
-
-// @Test(timeout = 5000)
- public void addedGroupOnDemandStatsTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNode(s1Key);
-
- final Group group = getGroup();
-
- final InstanceIdentifier<Group> groupII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Group.class, group.getKey());
-
- final WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.put(LogicalDatastoreType.CONFIGURATION, groupII, group);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, groupII, group);
- assertCommit(writeTx.submit());
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- groupII.augmentation(NodeGroupStatistics.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<NodeGroupStatistics> groupOptional = readTx.read(LogicalDatastoreType.OPERATIONAL, groupII.augmentation(NodeGroupStatistics.class)).checkedGet();
- assertTrue(groupOptional.isPresent());
- assertEquals(COUNTER_64_TEST_VALUE, groupOptional.get().getGroupStatistics().getByteCount());
- }
-
-// @Test(timeout = 5000)
- public void deletedGroupStasRemovalTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNode(s1Key);
-
- final Group group = getGroup();
- final InstanceIdentifier<Group> groupII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Group.class, group.getKey());
-
- WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.put(LogicalDatastoreType.CONFIGURATION, groupII, group);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, groupII, group);
- assertCommit(writeTx.submit());
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- groupII.augmentation(NodeGroupStatistics.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- Optional<NodeGroupStatistics> groupOptional = readTx.read(LogicalDatastoreType.OPERATIONAL,
- groupII.augmentation(NodeGroupStatistics.class)).checkedGet();
- assertTrue(groupOptional.isPresent());
- assertEquals(COUNTER_64_TEST_VALUE, groupOptional.get().getGroupStatistics().getByteCount());
-
- writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.delete(LogicalDatastoreType.CONFIGURATION, groupII);
- assertCommit(writeTx.submit());
-
- readTx = getDataBroker().newReadOnlyTransaction();
- groupOptional = readTx.read(LogicalDatastoreType.OPERATIONAL,
- groupII.augmentation(NodeGroupStatistics.class)).checkedGet();
- assertFalse(groupOptional.isPresent());
-
- }
-
-// @Test(timeout = 23000)
- public void getAllStatsFromConnectedNodeTest() throws ExecutionException, InterruptedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNodeWithFeatures(s1Key, false, FlowFeatureCapabilityGroupStats.class);
-
- final InstanceIdentifier<Group> groupII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Group.class, getGroup().getKey());
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- groupII.augmentation(NodeGroupStatistics.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<Group> optionalGroup = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
- .child(Node.class, s1Key).augmentation(FlowCapableNode.class)
- .child(Group.class, getGroup().getKey())).get();
-
- assertTrue(optionalGroup.isPresent());
- assertTrue(optionalGroup.get().getAugmentation(NodeGroupDescStats.class) != null);
- final NodeGroupStatistics groupStats = optionalGroup.get().getAugmentation(NodeGroupStatistics.class);
- assertTrue(groupStats != null);
- assertEquals(COUNTER_64_TEST_VALUE, groupStats.getGroupStatistics().getByteCount());
-
- readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<GroupFeatures> optionalGroupFeatures = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
- .child(Node.class, s1Key).augmentation(NodeGroupFeatures.class).child(GroupFeatures.class)).get();
- assertTrue(optionalGroupFeatures.isPresent());
- assertEquals(1, optionalGroupFeatures.get().getMaxGroups().size());
- assertEquals(MAX_GROUPS_TEST_VALUE, optionalGroupFeatures.get().getMaxGroups().get(0));
- }
-
- private class ChangeListener implements DataChangeListener {
-
- @Override
- public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
- synchronized (waitObject) {
- waitObject.notify();
- }
- }
- }
-}
-
+++ /dev/null
-package test.mock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManagerActivator;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeatures;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import test.mock.util.StatisticsManagerTest;
-
-import com.google.common.base.Optional;
-
-public class MeterStatisticsTest extends StatisticsManagerTest {
- private final Object waitObject = new Object();
-
-// @Test(timeout = 5000)
- public void addedMeterOnDemandStatsTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNode(s1Key);
-
- final Meter meter = getMeter();
- final InstanceIdentifier<Meter> meterII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Meter.class, meter.getKey());
-
- final WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.put(LogicalDatastoreType.CONFIGURATION, meterII, meter);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, meterII, meter);
- assertCommit(writeTx.submit());
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- meterII.augmentation(NodeMeterStatistics.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<NodeMeterStatistics> meterStatsOptional = readTx.read(LogicalDatastoreType.OPERATIONAL,
- meterII.augmentation(NodeMeterStatistics.class)).checkedGet();
- assertTrue(meterStatsOptional.isPresent());
- assertEquals(COUNTER_64_TEST_VALUE, meterStatsOptional.get().getMeterStatistics().getByteInCount());
- assertEquals(COUNTER_64_TEST_VALUE, meterStatsOptional.get().getMeterStatistics().getPacketInCount());
- }
-
-// @Test(timeout = 5000)
- public void deletedMeterStatsRemovalTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNode(s1Key);
-
- final Meter meter = getMeter();
- final InstanceIdentifier<Meter> meterII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Meter.class, meter.getKey());
-
- WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.put(LogicalDatastoreType.CONFIGURATION, meterII, meter);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, meterII, meter);
- assertCommit(writeTx.submit());
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- meterII.augmentation(NodeMeterStatistics.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<NodeMeterStatistics> meterStatsOptional = readTx.read(LogicalDatastoreType.OPERATIONAL,
- meterII.augmentation(NodeMeterStatistics.class)).checkedGet();
- assertTrue(meterStatsOptional.isPresent());
- assertEquals(COUNTER_64_TEST_VALUE, meterStatsOptional.get().getMeterStatistics().getByteInCount());
- assertEquals(COUNTER_64_TEST_VALUE, meterStatsOptional.get().getMeterStatistics().getPacketInCount());
-
- writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.delete(LogicalDatastoreType.CONFIGURATION, meterII);
- assertCommit(writeTx.submit());
-
- readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<Meter> meterOptional = readTx.read(LogicalDatastoreType.OPERATIONAL, meterII).checkedGet();
- assertFalse(meterOptional.isPresent());
- }
-
-// @Test(timeout = 23000)
- public void getAllStatsFromConnectedNodeTest() throws ExecutionException, InterruptedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNodeWithFeatures(s1Key, true);
-
- final InstanceIdentifier<Meter> meterII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Meter.class, getMeter().getKey());
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- meterII.augmentation(NodeMeterStatistics.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<Meter> optionalMeter = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
- .child(Node.class, s1Key).augmentation(FlowCapableNode.class)
- .child(Meter.class, getMeter().getKey())).get();
-
- assertTrue(optionalMeter.isPresent());
- assertTrue(optionalMeter.get().getAugmentation(NodeMeterConfigStats.class) != null);
- final NodeMeterStatistics meterStats = optionalMeter.get().getAugmentation(NodeMeterStatistics.class);
- assertTrue(meterStats != null);
- assertEquals(COUNTER_64_TEST_VALUE, meterStats.getMeterStatistics().getByteInCount());
- assertEquals(COUNTER_64_TEST_VALUE, meterStats.getMeterStatistics().getPacketInCount());
-
- readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<MeterFeatures> optionalMeterFeautures = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
- .child(Node.class, s1Key).augmentation(NodeMeterFeatures.class).child(MeterFeatures.class)).get();
- assertTrue(optionalMeterFeautures.isPresent());
- assertEquals(COUNTER_32_TEST_VALUE, optionalMeterFeautures.get().getMaxMeter());
- }
-
- private class ChangeListener implements DataChangeListener {
-
- @Override
- public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
- synchronized (waitObject) {
- waitObject.notify();
- }
- }
- }
-}
-
package test.mock;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.statistics.manager.StatisticsManagerActivator;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManagerProvider;
+import org.junit.Test;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
import test.mock.util.StatisticsManagerTest;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
public class NodeRegistrationTest extends StatisticsManagerTest {
-// @Test
+ @Test
public void nodeRegistrationTest() throws ExecutionException, InterruptedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- final StatisticsManagerProvider statisticsManagerProvider = new StatisticsManagerProvider(activator);
- activator.onSessionInitiated(providerContext);
+ StatisticsManager statisticsManager = setupStatisticsManager();
addFlowCapableNode(s1Key);
- Thread.sleep(1000);
+ Thread.sleep(2000);
final InstanceIdentifier<Node> nodeII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key);
- assertTrue(statisticsManagerProvider.getStatisticsManager().isProvidedFlowNodeActive(nodeII));
+ assertTrue(statisticsManager.isProvidedFlowNodeActive(nodeII));
}
-// @Test
+ @Test
public void nodeUnregistrationTest() throws ExecutionException, InterruptedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- final StatisticsManagerProvider statisticsManagerProvider = new StatisticsManagerProvider(activator);
- activator.onSessionInitiated(providerContext);
+ StatisticsManager statisticsManager = setupStatisticsManager();
addFlowCapableNode(s1Key);
- Thread.sleep(1000);
+ Thread.sleep(2000);
final InstanceIdentifier<Node> nodeII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key);
- assertTrue(statisticsManagerProvider.getStatisticsManager().isProvidedFlowNodeActive(nodeII));
+ assertTrue(statisticsManager.isProvidedFlowNodeActive(nodeII));
removeNode(s1Key);
- Thread.sleep(1000);
- assertFalse(statisticsManagerProvider.getStatisticsManager().isProvidedFlowNodeActive(nodeII));
+ Thread.sleep(2000);
+ assertFalse(statisticsManager.isProvidedFlowNodeActive(nodeII));
}
}
+++ /dev/null
-package test.mock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManagerActivator;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowFeatureCapabilityPortStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import test.mock.util.StatisticsManagerTest;
-
-import com.google.common.base.Optional;
-
-public class PortStatisticsTest extends StatisticsManagerTest {
- private final Object waitObject = new Object();
-
-// @Test(timeout = 23000)
- public void getPortStatisticsTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNodeWithFeatures(s1Key, false, FlowFeatureCapabilityPortStats.class);
-
- final InstanceIdentifier<NodeConnector> nodeConnectorII = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, s1Key).child(NodeConnector.class, new NodeConnectorKey(getNodeConnectorId()));
-
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- nodeConnectorII.augmentation(FlowCapableNodeConnectorStatisticsData.class),
- new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<FlowCapableNodeConnectorStatisticsData> flowCapableNodeConnectorStatisticsDataOptional =
- readTx.read(LogicalDatastoreType.OPERATIONAL,
- nodeConnectorII.augmentation(FlowCapableNodeConnectorStatisticsData.class)).checkedGet();
- assertTrue(flowCapableNodeConnectorStatisticsDataOptional.isPresent());
- assertEquals(BIG_INTEGER_TEST_VALUE,
- flowCapableNodeConnectorStatisticsDataOptional.get().getFlowCapableNodeConnectorStatistics()
- .getReceiveDrops());
- assertEquals(BIG_INTEGER_TEST_VALUE,
- flowCapableNodeConnectorStatisticsDataOptional.get().getFlowCapableNodeConnectorStatistics()
- .getCollisionCount());
- }
-
- private class ChangeListener implements DataChangeListener {
-
- @Override
- public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
- synchronized (waitObject) {
- waitObject.notify();
- }
- }
- }
-}
-
+++ /dev/null
-package test.mock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManagerActivator;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowFeatureCapabilityQueueStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.port.mod.port.Port;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import test.mock.util.StatisticsManagerTest;
-
-import com.google.common.base.Optional;
-
-
-public class QueueStatisticsTest extends StatisticsManagerTest {
- private final Object waitObject = new Object();
-
-// @Test(timeout = 5000)
- public void addedQueueOnDemandStatsTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNode(s1Key);
-
- final Port port = getPort();
-
- final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder();
- final FlowCapableNodeConnectorBuilder fcncBuilder = new FlowCapableNodeConnectorBuilder();
- fcncBuilder.setConfiguration(port.getConfiguration());
- fcncBuilder.setPortNumber(port.getPortNumber());
- fcncBuilder.setQueue(Collections.<Queue>emptyList());
- ncBuilder.setKey(new NodeConnectorKey(new NodeConnectorId("connector.1")));
- ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcncBuilder.build());
-
-
- final Queue queue = getQueue();
- final InstanceIdentifier<Queue> queueII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .child(NodeConnector.class, ncBuilder.getKey()).augmentation(FlowCapableNodeConnector.class)
- .child(Queue.class, queue.getKey());
- final InstanceIdentifier<NodeConnector> nodeConnectorII = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, s1Key).child(NodeConnector.class, ncBuilder.getKey());
-
- final WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.put(LogicalDatastoreType.CONFIGURATION, nodeConnectorII, ncBuilder.build());
- writeTx.put(LogicalDatastoreType.CONFIGURATION, queueII, queue);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, nodeConnectorII, ncBuilder.build());
- writeTx.put(LogicalDatastoreType.OPERATIONAL, queueII, queue);
- assertCommit(writeTx.submit());
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- queueII.augmentation(FlowCapableNodeConnectorQueueStatisticsData.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<FlowCapableNodeConnectorQueueStatisticsData> queueStatsOptional = readTx.read(LogicalDatastoreType.OPERATIONAL,
- queueII.augmentation(FlowCapableNodeConnectorQueueStatisticsData.class)).checkedGet();
- assertTrue(queueStatsOptional.isPresent());
- assertEquals(COUNTER_64_TEST_VALUE,
- queueStatsOptional.get().getFlowCapableNodeConnectorQueueStatistics().getTransmittedBytes());
- }
-
-// @Test(timeout = 5000)
- public void deletedQueueStatsRemovalTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNode(s1Key);
-
- final Port port = getPort();
-
- final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder();
- final FlowCapableNodeConnectorBuilder fcncBuilder = new FlowCapableNodeConnectorBuilder();
- fcncBuilder.setConfiguration(port.getConfiguration());
- fcncBuilder.setPortNumber(port.getPortNumber());
- fcncBuilder.setQueue(Collections.<Queue>emptyList());
- ncBuilder.setKey(new NodeConnectorKey(new NodeConnectorId("connector.1")));
- ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcncBuilder.build());
-
-
- final Queue queue = getQueue();
- final InstanceIdentifier<Queue> queueII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .child(NodeConnector.class, ncBuilder.getKey()).augmentation(FlowCapableNodeConnector.class)
- .child(Queue.class, queue.getKey());
- final InstanceIdentifier<NodeConnector> nodeConnectorII = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, s1Key).child(NodeConnector.class, ncBuilder.getKey());
-
- WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.put(LogicalDatastoreType.CONFIGURATION, nodeConnectorII, ncBuilder.build());
- writeTx.put(LogicalDatastoreType.CONFIGURATION, queueII, queue);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, nodeConnectorII, ncBuilder.build());
- writeTx.put(LogicalDatastoreType.OPERATIONAL, queueII, queue);
- assertCommit(writeTx.submit());
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- queueII.augmentation(FlowCapableNodeConnectorQueueStatisticsData.class),
- new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- Optional<FlowCapableNodeConnectorQueueStatisticsData> queueStatsOptional = readTx.read(LogicalDatastoreType.OPERATIONAL,
- queueII.augmentation(FlowCapableNodeConnectorQueueStatisticsData.class)).checkedGet();
- assertTrue(queueStatsOptional.isPresent());
- assertEquals(COUNTER_64_TEST_VALUE,
- queueStatsOptional.get().getFlowCapableNodeConnectorQueueStatistics().getTransmittedBytes());
-
- writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.delete(LogicalDatastoreType.CONFIGURATION, queueII);
- assertCommit(writeTx.submit());
-
- readTx = getDataBroker().newReadOnlyTransaction();
- queueStatsOptional = readTx.read(LogicalDatastoreType.OPERATIONAL,
- queueII.augmentation(FlowCapableNodeConnectorQueueStatisticsData.class)).checkedGet();
- assertFalse(queueStatsOptional.isPresent());
- }
-
-// @Test(timeout = 23000)
- public void getAllStatsFromConnectedNodeTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNodeWithFeatures(s1Key, false, FlowFeatureCapabilityQueueStats.class);
-
- final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder();
- final FlowCapableNodeConnectorBuilder fcncBuilder = new FlowCapableNodeConnectorBuilder();
- ncBuilder.setKey(new NodeConnectorKey(getNodeConnectorId()));
- ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcncBuilder.build());
-
- final InstanceIdentifier<NodeConnector> nodeConnectorII = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, s1Key)
- .child(NodeConnector.class, ncBuilder.getKey());
-
- final WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
- writeTx.put(LogicalDatastoreType.OPERATIONAL, nodeConnectorII, ncBuilder.build());
- final InstanceIdentifier<Queue> queueII = nodeConnectorII.augmentation(FlowCapableNodeConnector.class)
- .child(Queue.class, getQueue().getKey());
- final QueueBuilder qBuilder = new QueueBuilder(getQueue());
- writeTx.put(LogicalDatastoreType.OPERATIONAL, queueII, qBuilder.build());
- assertCommit(writeTx.submit());
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- queueII.augmentation(FlowCapableNodeConnectorQueueStatisticsData.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<Queue> queueOptional = readTx.read(LogicalDatastoreType.OPERATIONAL, queueII).checkedGet();
- assertTrue(queueOptional.isPresent());
- final FlowCapableNodeConnectorQueueStatisticsData queueStats =
- queueOptional.get().getAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class);
- assertTrue(queueStats != null);
- assertEquals(COUNTER_64_TEST_VALUE,
- queueStats.getFlowCapableNodeConnectorQueueStatistics().getTransmittedBytes());
- }
-
- private class ChangeListener implements DataChangeListener {
-
- @Override
- public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
- synchronized (waitObject) {
- waitObject.notify();
- }
- }
- }
-}
-
--- /dev/null
+package test.mock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Optional;
+import java.util.concurrent.ExecutionException;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowFeatureCapabilityFlowStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowFeatureCapabilityGroupStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowFeatureCapabilityPortStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowFeatureCapabilityQueueStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowFeatureCapabilityTableStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import test.mock.util.StatisticsManagerTest;
+
+public class StatCollectorTest extends StatisticsManagerTest {
+ private final Object waitObject = new Object();
+
+ @Test(timeout = 200000)
+ public void getAllFlowStatsTest() throws ExecutionException, InterruptedException, ReadFailedException {
+ setupStatisticsManager();
+
+ addFlowCapableNodeWithFeatures(s1Key, false, FlowFeatureCapabilityFlowStats.class);
+
+ final Flow flow = getFlow();
+
+ final InstanceIdentifier<Table> tableII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
+ .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(flow.getTableId()));
+
+ getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+ tableII.child(Flow.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
+
+ synchronized (waitObject) {
+ waitObject.wait();
+ }
+
+ final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
+ final Optional<Table> tableOptional = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, s1Key).augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(flow.getTableId()))).checkedGet();
+ assertTrue(tableOptional.isPresent());
+ final FlowStatisticsData flowStats = tableOptional.get().getFlow().get(0).getAugmentation(FlowStatisticsData.class);
+ assertTrue(flowStats != null);
+ assertEquals(COUNTER_64_TEST_VALUE, flowStats.getFlowStatistics().getByteCount());
+ }
+
+ @Test(timeout = 200000)
+ public void getAllGroupStatsFeatureNotAdvertisedTest() throws ExecutionException, InterruptedException {
+ setupStatisticsManager();
+
+ addFlowCapableNodeWithFeatures(s1Key, true);
+
+ final InstanceIdentifier<Group> groupII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
+ .augmentation(FlowCapableNode.class).child(Group.class, getGroup().getKey());
+ getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+ groupII.augmentation(NodeGroupStatistics.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
+
+ synchronized (waitObject) {
+ waitObject.wait();
+ }
+
+ ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
+ final Optional<Group> optionalGroup = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, s1Key).augmentation(FlowCapableNode.class)
+ .child(Group.class, getGroup().getKey())).get();
+
+ assertTrue(optionalGroup.isPresent());
+ assertTrue(optionalGroup.get().getAugmentation(NodeGroupDescStats.class) != null);
+ final NodeGroupStatistics groupStats = optionalGroup.get().getAugmentation(NodeGroupStatistics.class);
+ assertTrue(groupStats != null);
+ assertEquals(COUNTER_64_TEST_VALUE, groupStats.getGroupStatistics().getByteCount());
+
+ readTx = getDataBroker().newReadOnlyTransaction();
+ final Optional<GroupFeatures> optionalGroupFeatures = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, s1Key).augmentation(NodeGroupFeatures.class).child(GroupFeatures.class)).get();
+ assertTrue(optionalGroupFeatures.isPresent());
+ assertEquals(1, optionalGroupFeatures.get().getMaxGroups().size());
+ assertEquals(MAX_GROUPS_TEST_VALUE, optionalGroupFeatures.get().getMaxGroups().get(0));
+ }
+
+ @Test(timeout = 200000)
+ public void getAllGroupStatsFeatureAdvertisedTest() throws ExecutionException, InterruptedException {
+ setupStatisticsManager();
+
+ addFlowCapableNodeWithFeatures(s1Key, false, FlowFeatureCapabilityGroupStats.class);
+
+ final InstanceIdentifier<Group> groupII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
+ .augmentation(FlowCapableNode.class).child(Group.class, getGroup().getKey());
+ getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+ groupII.augmentation(NodeGroupStatistics.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
+
+ synchronized (waitObject) {
+ waitObject.wait();
+ }
+
+ ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
+ final Optional<Group> optionalGroup = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, s1Key).augmentation(FlowCapableNode.class)
+ .child(Group.class, getGroup().getKey())).get();
+
+ assertTrue(optionalGroup.isPresent());
+ assertTrue(optionalGroup.get().getAugmentation(NodeGroupDescStats.class) != null);
+ final NodeGroupStatistics groupStats = optionalGroup.get().getAugmentation(NodeGroupStatistics.class);
+ assertTrue(groupStats != null);
+ assertEquals(COUNTER_64_TEST_VALUE, groupStats.getGroupStatistics().getByteCount());
+
+ readTx = getDataBroker().newReadOnlyTransaction();
+ final Optional<GroupFeatures> optionalGroupFeatures = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, s1Key).augmentation(NodeGroupFeatures.class).child(GroupFeatures.class)).get();
+ assertTrue(optionalGroupFeatures.isPresent());
+ assertEquals(1, optionalGroupFeatures.get().getMaxGroups().size());
+ assertEquals(MAX_GROUPS_TEST_VALUE, optionalGroupFeatures.get().getMaxGroups().get(0));
+ }
+
+ @Test(timeout = 200000)
+ public void getAllMeterStatsTest() throws ExecutionException, InterruptedException {
+ setupStatisticsManager();
+
+ addFlowCapableNodeWithFeatures(s1Key, true);
+
+ final InstanceIdentifier<Meter> meterII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
+ .augmentation(FlowCapableNode.class).child(Meter.class, getMeter().getKey());
+ getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+ meterII.augmentation(NodeMeterStatistics.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
+
+ synchronized (waitObject) {
+ waitObject.wait();
+ }
+
+ ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
+ final Optional<Meter> optionalMeter = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, s1Key).augmentation(FlowCapableNode.class)
+ .child(Meter.class, getMeter().getKey())).get();
+
+ assertTrue(optionalMeter.isPresent());
+ assertTrue(optionalMeter.get().getAugmentation(NodeMeterConfigStats.class) != null);
+ final NodeMeterStatistics meterStats = optionalMeter.get().getAugmentation(NodeMeterStatistics.class);
+ assertTrue(meterStats != null);
+ assertEquals(COUNTER_64_TEST_VALUE, meterStats.getMeterStatistics().getByteInCount());
+ assertEquals(COUNTER_64_TEST_VALUE, meterStats.getMeterStatistics().getPacketInCount());
+
+ readTx = getDataBroker().newReadOnlyTransaction();
+ final Optional<MeterFeatures> optionalMeterFeautures = readTx.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, s1Key).augmentation(NodeMeterFeatures.class).child(MeterFeatures.class)).get();
+ assertTrue(optionalMeterFeautures.isPresent());
+ assertEquals(COUNTER_32_TEST_VALUE, optionalMeterFeautures.get().getMaxMeter());
+ }
+
+ @Test(timeout = 200000)
+ public void getAllQueueStatsTest() throws ExecutionException, InterruptedException, ReadFailedException {
+ setupStatisticsManager();
+
+ addFlowCapableNodeWithFeatures(s1Key, false, FlowFeatureCapabilityQueueStats.class);
+
+ final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder();
+ final FlowCapableNodeConnectorBuilder fcncBuilder = new FlowCapableNodeConnectorBuilder();
+ ncBuilder.setKey(new NodeConnectorKey(getNodeConnectorId()));
+ ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcncBuilder.build());
+
+ final InstanceIdentifier<NodeConnector> nodeConnectorII = InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, s1Key)
+ .child(NodeConnector.class, ncBuilder.getKey());
+
+ final WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, nodeConnectorII, ncBuilder.build());
+ final InstanceIdentifier<Queue> queueII = nodeConnectorII.augmentation(FlowCapableNodeConnector.class)
+ .child(Queue.class, getQueue().getKey());
+ final QueueBuilder qBuilder = new QueueBuilder(getQueue());
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, queueII, qBuilder.build());
+ assertCommit(writeTx.submit());
+
+ getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+ queueII.augmentation(FlowCapableNodeConnectorQueueStatisticsData.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
+
+ synchronized (waitObject) {
+ waitObject.wait();
+ }
+
+ final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
+ final Optional<Queue> queueOptional = readTx.read(LogicalDatastoreType.OPERATIONAL, queueII).checkedGet();
+ assertTrue(queueOptional.isPresent());
+ final FlowCapableNodeConnectorQueueStatisticsData queueStats =
+ queueOptional.get().getAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class);
+ assertTrue(queueStats != null);
+ assertEquals(COUNTER_64_TEST_VALUE,
+ queueStats.getFlowCapableNodeConnectorQueueStatistics().getTransmittedBytes());
+ }
+
+ @Test(timeout = 200000)
+ public void getAllPortStatsTest() throws ExecutionException, InterruptedException, ReadFailedException {
+ setupStatisticsManager();
+
+ addFlowCapableNodeWithFeatures(s1Key, false, FlowFeatureCapabilityPortStats.class);
+
+ final InstanceIdentifier<NodeConnector> nodeConnectorII = InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, s1Key).child(NodeConnector.class, new NodeConnectorKey(getNodeConnectorId()));
+
+ NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder();
+ ncBuilder.setKey(new NodeConnectorKey(getNodeConnectorId()));
+ WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, nodeConnectorII, ncBuilder.build());
+ assertCommit(writeTx.submit());
+
+ getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+ nodeConnectorII.augmentation(FlowCapableNodeConnectorStatisticsData.class),
+ new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
+
+ synchronized (waitObject) {
+ waitObject.wait();
+ }
+
+ final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
+ final Optional<FlowCapableNodeConnectorStatisticsData> flowCapableNodeConnectorStatisticsDataOptional =
+ readTx.read(LogicalDatastoreType.OPERATIONAL,
+ nodeConnectorII.augmentation(FlowCapableNodeConnectorStatisticsData.class)).checkedGet();
+ assertTrue(flowCapableNodeConnectorStatisticsDataOptional.isPresent());
+ assertEquals(BIG_INTEGER_TEST_VALUE,
+ flowCapableNodeConnectorStatisticsDataOptional.get().getFlowCapableNodeConnectorStatistics()
+ .getReceiveDrops());
+ assertEquals(BIG_INTEGER_TEST_VALUE,
+ flowCapableNodeConnectorStatisticsDataOptional.get().getFlowCapableNodeConnectorStatistics()
+ .getCollisionCount());
+ }
+
+ @Test(timeout = 200000)
+ public void getAllTableStatsTest() throws ExecutionException, InterruptedException, ReadFailedException {
+ setupStatisticsManager();
+
+ addFlowCapableNodeWithFeatures(s1Key, false, FlowFeatureCapabilityTableStats.class);
+
+ final TableId tableId = getTableId();
+ final InstanceIdentifier<Table> tableII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
+ .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId.getValue()));
+
+ getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+ tableII.augmentation(FlowTableStatisticsData.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
+
+ synchronized (waitObject) {
+ waitObject.wait();
+ }
+
+ final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
+ final Optional<FlowTableStatisticsData> flowTableStatisticsDataOptional = readTx.read(
+ LogicalDatastoreType.OPERATIONAL, tableII.augmentation(FlowTableStatisticsData.class)).checkedGet();
+ assertTrue(flowTableStatisticsDataOptional.isPresent());
+ assertEquals(COUNTER_32_TEST_VALUE,
+ flowTableStatisticsDataOptional.get().getFlowTableStatistics().getActiveFlows());
+ assertEquals(COUNTER_64_TEST_VALUE,
+ flowTableStatisticsDataOptional.get().getFlowTableStatistics().getPacketsLookedUp());
+ }
+
+ public class ChangeListener implements DataChangeListener {
+
+ @Override
+ public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+ synchronized (waitObject) {
+ waitObject.notify();
+ }
+ }
+ }
+}
+++ /dev/null
-package test.mock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManagerActivator;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowFeatureCapabilityTableStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import test.mock.util.StatisticsManagerTest;
-
-import com.google.common.base.Optional;
-
-public class TableStatisticsTest extends StatisticsManagerTest {
- private final Object waitObject = new Object();
-
-// @Test(timeout = 23000)
- public void getTableStatisticsTest() throws ExecutionException, InterruptedException, ReadFailedException {
- final StatisticsManagerActivator activator = new StatisticsManagerActivator();
- activator.onSessionInitiated(providerContext);
-
- addFlowCapableNodeWithFeatures(s1Key, false, FlowFeatureCapabilityTableStats.class);
-
- final TableId tableId = getTableId();
- final InstanceIdentifier<Table> tableII = InstanceIdentifier.create(Nodes.class).child(Node.class, s1Key)
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId.getValue()));
-
- getDataBroker().registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- tableII.augmentation(FlowTableStatisticsData.class), new ChangeListener(), AsyncDataBroker.DataChangeScope.BASE);
-
- synchronized (waitObject) {
- waitObject.wait();
- }
-
- final ReadOnlyTransaction readTx = getDataBroker().newReadOnlyTransaction();
- final Optional<FlowTableStatisticsData> flowTableStatisticsDataOptional = readTx.read(
- LogicalDatastoreType.OPERATIONAL, tableII.augmentation(FlowTableStatisticsData.class)).checkedGet();
- assertTrue(flowTableStatisticsDataOptional.isPresent());
- assertEquals(COUNTER_32_TEST_VALUE,
- flowTableStatisticsDataOptional.get().getFlowTableStatistics().getActiveFlows());
- assertEquals(COUNTER_64_TEST_VALUE,
- flowTableStatisticsDataOptional.get().getFlowTableStatistics().getPacketsLookedUp());
- }
-
- private class ChangeListener implements DataChangeListener {
-
- @Override
- public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
- synchronized (waitObject) {
- waitObject.notify();
- }
- }
- }
-}
-
package test.mock.util;
import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdateBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdateBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
public class OpendaylightFlowStatisticsServiceMock implements OpendaylightFlowStatisticsService {
NotificationProviderServiceHelper notifService;
- AtomicLong transNum = new AtomicLong();
public OpendaylightFlowStatisticsServiceMock(NotificationProviderServiceHelper notifService) {
this.notifService = notifService;
@Override
public Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> getAggregateFlowStatisticsFromFlowTableForAllFlows(GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput input) {
GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutputBuilder builder = new GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
return Futures.immediateFuture(RpcResultBuilder.success(builder.build()).build());
}
@Override
public Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput>> getAggregateFlowStatisticsFromFlowTableForGivenMatch(GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput input) {
GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutputBuilder builder = new GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
AggregateFlowStatisticsUpdateBuilder afsuBuilder = new AggregateFlowStatisticsUpdateBuilder();
afsuBuilder.setMoreReplies(false);
@Override
public Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> getAllFlowStatisticsFromFlowTable(GetAllFlowStatisticsFromFlowTableInput input) {
GetAllFlowStatisticsFromFlowTableOutputBuilder builder = new GetAllFlowStatisticsFromFlowTableOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
return Futures.immediateFuture(RpcResultBuilder.success(builder.build()).build());
}
@Override
public Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> getAllFlowsStatisticsFromAllFlowTables(GetAllFlowsStatisticsFromAllFlowTablesInput input) {
GetAllFlowsStatisticsFromAllFlowTablesOutputBuilder builder = new GetAllFlowsStatisticsFromAllFlowTablesOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
List<FlowAndStatisticsMapList> flowAndStatisticsMapLists = new ArrayList<>();
FlowsStatisticsUpdateBuilder flowsStatisticsUpdateBuilder = new FlowsStatisticsUpdateBuilder();
@Override
public Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> getFlowStatisticsFromFlowTable(GetFlowStatisticsFromFlowTableInput input) {
GetFlowStatisticsFromFlowTableOutputBuilder builder = new GetFlowStatisticsFromFlowTableOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
List<FlowAndStatisticsMapList> flowAndStatisticsMapLists = new ArrayList<>();
FlowsStatisticsUpdateBuilder flowsStatisticsUpdateBuilder = new FlowsStatisticsUpdateBuilder();
package test.mock.util;
import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdateBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
public class OpendaylightFlowTableStatisticsServiceMock implements OpendaylightFlowTableStatisticsService {
NotificationProviderServiceHelper notifService;
- AtomicLong transNum = new AtomicLong();
public OpendaylightFlowTableStatisticsServiceMock(NotificationProviderServiceHelper notifService) {
this.notifService = notifService;
@Override
public Future<RpcResult<GetFlowTablesStatisticsOutput>> getFlowTablesStatistics(GetFlowTablesStatisticsInput input) {
GetFlowTablesStatisticsOutputBuilder builder = new GetFlowTablesStatisticsOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
FlowTableStatisticsUpdateBuilder ftsBuilder = new FlowTableStatisticsUpdateBuilder();
FlowTableAndStatisticsMapBuilder ftasmBuilder = new FlowTableAndStatisticsMapBuilder();
package test.mock.util;
import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
public class OpendaylightGroupStatisticsServiceMock implements OpendaylightGroupStatisticsService {
NotificationProviderServiceHelper notifService;
- AtomicLong transNum = new AtomicLong();
public OpendaylightGroupStatisticsServiceMock(NotificationProviderServiceHelper notifService) {
this.notifService = notifService;
@Override
public Future<RpcResult<GetAllGroupStatisticsOutput>> getAllGroupStatistics(GetAllGroupStatisticsInput input) {
GetAllGroupStatisticsOutputBuilder builder = new GetAllGroupStatisticsOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
List<GroupStats> groupStats = new ArrayList<>();
GroupStatsBuilder gsBuilder = new GroupStatsBuilder();
@Override
public Future<RpcResult<GetGroupDescriptionOutput>> getGroupDescription(GetGroupDescriptionInput input) {
GetGroupDescriptionOutputBuilder builder = new GetGroupDescriptionOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
List<GroupDescStats> groupDescStats = new ArrayList<>();
GroupDescStatsUpdatedBuilder gdsuBuilder = new GroupDescStatsUpdatedBuilder();
@Override
public Future<RpcResult<GetGroupFeaturesOutput>> getGroupFeatures(GetGroupFeaturesInput input) {
GetGroupFeaturesOutputBuilder builder = new GetGroupFeaturesOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
GroupFeaturesUpdatedBuilder gfuBuilder = new GroupFeaturesUpdatedBuilder();
gfuBuilder.setTransactionId(transId);
@Override
public Future<RpcResult<GetGroupStatisticsOutput>> getGroupStatistics(GetGroupStatisticsInput input) {
GetGroupStatisticsOutputBuilder builder = new GetGroupStatisticsOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
GroupStatsBuilder gsBuilder = new GroupStatsBuilder();
List<GroupStats> groupStats = new ArrayList<>();
package test.mock.util;
import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
public class OpendaylightMeterStatisticsServiceMock implements OpendaylightMeterStatisticsService {
NotificationProviderServiceHelper notifService;
- AtomicLong transNum = new AtomicLong();
public OpendaylightMeterStatisticsServiceMock(NotificationProviderServiceHelper notifService) {
this.notifService = notifService;
@Override
public Future<RpcResult<GetAllMeterConfigStatisticsOutput>> getAllMeterConfigStatistics(GetAllMeterConfigStatisticsInput input) {
GetAllMeterConfigStatisticsOutputBuilder builder = new GetAllMeterConfigStatisticsOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
List<MeterConfigStats> meterConfigStats = new ArrayList<>();
MeterConfigStatsBuilder mcsBuilder = new MeterConfigStatsBuilder();
@Override
public Future<RpcResult<GetAllMeterStatisticsOutput>> getAllMeterStatistics(GetAllMeterStatisticsInput input) {
GetAllMeterStatisticsOutputBuilder builder = new GetAllMeterStatisticsOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
MeterStatsBuilder msBuilder = new MeterStatsBuilder();
msBuilder.setByteInCount(StatisticsManagerTest.COUNTER_64_TEST_VALUE);
@Override
public Future<RpcResult<GetMeterFeaturesOutput>> getMeterFeatures(GetMeterFeaturesInput input) {
GetMeterFeaturesOutputBuilder builder = new GetMeterFeaturesOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
MeterFeaturesUpdatedBuilder mfuBuilder = new MeterFeaturesUpdatedBuilder();
mfuBuilder.setTransactionId(transId);
@Override
public Future<RpcResult<GetMeterStatisticsOutput>> getMeterStatistics(GetMeterStatisticsInput input) {
GetMeterStatisticsOutputBuilder builder = new GetMeterStatisticsOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
MeterStatsBuilder msBuilder = new MeterStatsBuilder();
msBuilder.setKey(new MeterStatsKey(input.getMeterId()));
package test.mock.util;
import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
public class OpendaylightPortStatisticsServiceMock implements OpendaylightPortStatisticsService {
NotificationProviderServiceHelper notifService;
- AtomicLong transNum = new AtomicLong();
public OpendaylightPortStatisticsServiceMock(NotificationProviderServiceHelper notifService) {
this.notifService = notifService;
@Override
public Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> getAllNodeConnectorsStatistics(GetAllNodeConnectorsStatisticsInput input) {
GetAllNodeConnectorsStatisticsOutputBuilder builder = new GetAllNodeConnectorsStatisticsOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
NodeConnectorStatisticsUpdateBuilder ncsuBuilder = new NodeConnectorStatisticsUpdateBuilder();
NodeConnectorStatisticsAndPortNumberMapBuilder ncsapnmBuilder = new NodeConnectorStatisticsAndPortNumberMapBuilder();
ncsapnmBuilder.setKey(new NodeConnectorStatisticsAndPortNumberMapKey(StatisticsManagerTest.getNodeConnectorId()));
ncsapnmBuilder.setReceiveDrops(StatisticsManagerTest.BIG_INTEGER_TEST_VALUE);
nodeConnectorStatisticsAndPortNumberMaps.add(ncsapnmBuilder.build());
- ncsuBuilder.setTransactionId(new TransactionId(BigInteger.valueOf(1)));
+ ncsuBuilder.setTransactionId(transId);
ncsuBuilder.setId(input.getNode().getValue().firstKeyOf(Node.class, NodeKey.class).getId());
ncsuBuilder.setNodeConnectorStatisticsAndPortNumberMap(nodeConnectorStatisticsAndPortNumberMaps);
ncsuBuilder.setMoreReplies(true);
@Override
public Future<RpcResult<GetNodeConnectorStatisticsOutput>> getNodeConnectorStatistics(GetNodeConnectorStatisticsInput input) {
GetNodeConnectorStatisticsOutputBuilder builder = new GetNodeConnectorStatisticsOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
return Futures.immediateFuture(RpcResultBuilder.success(builder.build()).build());
}
@Override
public Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> getAllQueuesStatisticsFromAllPorts(GetAllQueuesStatisticsFromAllPortsInput input) {
GetAllQueuesStatisticsFromAllPortsOutputBuilder builder = new GetAllQueuesStatisticsFromAllPortsOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
QueueStatisticsUpdateBuilder qsuBuilder = new QueueStatisticsUpdateBuilder();
QueueIdAndStatisticsMapBuilder qiasmBuilder = new QueueIdAndStatisticsMapBuilder();
@Override
public Future<RpcResult<GetAllQueuesStatisticsFromGivenPortOutput>> getAllQueuesStatisticsFromGivenPort(GetAllQueuesStatisticsFromGivenPortInput input) {
GetAllQueuesStatisticsFromGivenPortOutputBuilder builder = new GetAllQueuesStatisticsFromGivenPortOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
return Futures.immediateFuture(RpcResultBuilder.success(builder.build()).build());
}
@Override
public Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> getQueueStatisticsFromGivenPort(GetQueueStatisticsFromGivenPortInput input) {
GetQueueStatisticsFromGivenPortOutputBuilder builder = new GetQueueStatisticsFromGivenPortOutputBuilder();
- TransactionId transId = new TransactionId(BigInteger.valueOf(transNum.incrementAndGet()));
+ TransactionId transId = new TransactionId(BigInteger.valueOf(TestUtils.getNewTransactionId()));
builder.setTransactionId(transId);
QueueIdAndStatisticsMapBuilder qiasmBuilder = new QueueIdAndStatisticsMapBuilder();
List<QueueIdAndStatisticsMap> queueIdAndStatisticsMaps = new ArrayList<>();
+++ /dev/null
-package test.mock.util;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.BindingAwareService;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.RpcService;
-
-public class ProviderContextMock implements BindingAwareBroker.ProviderContext {
-
- RpcProviderRegistry rpcProviderMock;
- NotificationProviderService notificationProviderService;
- DataBroker dataBroker;
-
- public ProviderContextMock(RpcProviderRegistry rpcProviderMock, DataBroker dataBroker,
- NotificationProviderService notificationProviderServiceMock) {
- this.rpcProviderMock = rpcProviderMock;
- this.dataBroker = dataBroker;
- this.notificationProviderService = notificationProviderServiceMock;
- }
-
- @Override
- public void registerFunctionality(BindingAwareProvider.ProviderFunctionality functionality) {
-
- }
-
- @Override
- public void unregisterFunctionality(BindingAwareProvider.ProviderFunctionality functionality) {
-
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T extends BindingAwareService> T getSALService(Class<T> service) {
- if (service.equals(DataBroker.class)) {
- return (T) dataBroker;
- }
- else if (service.equals(NotificationProviderService.class)) {
- return (T) notificationProviderService;
- }
- return null;
- }
-
- @Override
- public <T extends RpcService> BindingAwareBroker.RpcRegistration<T> addRpcImplementation(Class<T> serviceInterface, T implementation) throws IllegalStateException {
- return null;
- }
-
- @Override
- public <T extends RpcService> BindingAwareBroker.RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> serviceInterface, T implementation) throws IllegalStateException {
- return null;
- }
-
- @Override
- public <L extends RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> ListenerRegistration<L> registerRouteChangeListener(L listener) {
- return null;
- }
-
- @Override
- public <T extends RpcService> T getRpcService(Class<T> serviceInterface) {
- return rpcProviderMock.getRpcService(serviceInterface);
- }
-}
package test.mock.util;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.BeforeClass;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.controller.md.statistics.manager.impl.StatisticsManagerConfig;
+import org.opendaylight.controller.md.statistics.manager.impl.StatisticsManagerImpl;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.Counter32;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.Counter64;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FeatureCapability;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
public abstract class StatisticsManagerTest extends AbstractDataBrokerTest {
public static final Counter64 COUNTER_64_TEST_VALUE = new Counter64(BigInteger.valueOf(128));
public static final Long MAX_GROUPS_TEST_VALUE = 2000L;
public static final BigInteger BIG_INTEGER_TEST_VALUE = BigInteger.valueOf(1000);
+ private static final int DEFAULT_MIN_REQUEST_NET_MONITOR_INTERVAL = 5000;
+ private static final int MAX_NODES_FOR_COLLECTOR = 16;
+
private static Flow flow;
private static Group group;
private static Meter meter;
private final NotificationProviderServiceHelper notificationMock = new NotificationProviderServiceHelper();
protected final NodeKey s1Key = new NodeKey(new NodeId("S1"));
protected RpcProviderRegistryMock rpcRegistry;
- protected ProviderContextMock providerContext;
@BeforeClass
public static void setupTests() {
@Before
public void init() {
rpcRegistry = new RpcProviderRegistryMock(notificationMock);
- providerContext = new ProviderContextMock(rpcRegistry, getDataBroker(), notificationMock.getNotifBroker());
}
// node with statistics capabilities will enable cyclic statistics collection
capabilitiyList.add(capability);
}
sfBuilder.setCapabilities(capabilitiyList);
- sfBuilder.setMaxTables((short) 2);
+ sfBuilder.setMaxTables((short) 255);
final NodeBuilder nodeBuilder = new NodeBuilder();
nodeBuilder.setKey(nodeKey);
fcnBuilder.setSwitchFeatures(sfBuilder.build());
final FlowCapableNodeBuilder fcnBuilder = new FlowCapableNodeBuilder();
final NodeBuilder nodeBuilder = new NodeBuilder();
nodeBuilder.setKey(nodeKey);
+ final SwitchFeaturesBuilder sfBuilder = new SwitchFeaturesBuilder();
+ sfBuilder.setMaxTables((short) 255);
+ fcnBuilder.setSwitchFeatures(sfBuilder.build());
final FlowCapableNode flowCapableNode = fcnBuilder.build();
nodeBuilder.addAugmentation(FlowCapableNode.class, flowCapableNode);
final Node node = nodeBuilder.build();
notificationMock.pushNotification(nrBuilder.build());
}
+ public StatisticsManager setupStatisticsManager() {
+ StatisticsManagerConfig.StatisticsManagerConfigBuilder confBuilder = StatisticsManagerConfig.builder();
+ confBuilder.setMaxNodesForCollector(MAX_NODES_FOR_COLLECTOR);
+ confBuilder.setMinRequestNetMonitorInterval(DEFAULT_MIN_REQUEST_NET_MONITOR_INTERVAL);
+ StatisticsManager statsProvider = new StatisticsManagerImpl(getDataBroker(), confBuilder.build());
+ statsProvider.start(notificationMock.getNotifBroker(), rpcRegistry);
+ return statsProvider;
+ }
+
public static Flow getFlow() {
return flow;
}
package test.mock.util;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
public class TestUtils {
+
+ private static AtomicLong transId = new AtomicLong();
+
private static Random rnd = new Random();
public static long nextLong(long RangeBottom, long rangeTop) {
return RangeBottom + ((long)(rnd.nextDouble()*(rangeTop - RangeBottom)));
}
+
+ public static long getNewTransactionId() {
+ return transId.incrementAndGet();
+ }
}
import org.slf4j.LoggerFactory;
public class ProxyServerHandler extends ChannelInboundHandlerAdapter {
- private static final Logger logger = LoggerFactory.getLogger(ProxyServerHandler.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(ProxyServerHandler.class);
private final Bootstrap clientBootstrap;
private final LocalAddress localAddress;
@Override
public void channelInactive(ChannelHandlerContext ctx) {
- logger.trace("channelInactive - closing client channel");
+ LOG.trace("channelInactive - closing client channel");
clientChannel.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, final Object msg) {
- logger.trace("Writing to client channel");
+ LOG.trace("Writing to client channel");
clientChannel.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
- logger.trace("Flushing client channel");
+ LOG.trace("Flushing client channel");
clientChannel.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
- logger.warn("Unexpected exception from downstream.", cause);
+ LOG.warn("Unexpected exception from downstream.", cause);
ctx.close();
}
}
class ProxyClientHandler extends ChannelInboundHandlerAdapter {
- private static final Logger logger = LoggerFactory.getLogger(ProxyClientHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ProxyClientHandler.class);
private final ChannelHandlerContext remoteCtx;
private ChannelHandlerContext localCtx;
@Override
public void channelActive(ChannelHandlerContext ctx) {
checkState(this.localCtx == null);
- logger.trace("Client channel active");
+ LOG.trace("Client channel active");
this.localCtx = ctx;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
- logger.trace("Forwarding message");
+ LOG.trace("Forwarding message");
remoteCtx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
- logger.trace("Flushing remote ctx");
+ LOG.trace("Flushing remote ctx");
remoteCtx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
- logger.warn("Unexpected exception from downstream", cause);
+ LOG.warn("Unexpected exception from downstream", cause);
checkState(this.localCtx.equals(ctx));
ctx.close();
}
// called both when local or remote connection dies
@Override
public void channelInactive(ChannelHandlerContext ctx) {
- logger.trace("channelInactive() called, closing remote client ctx");
+ LOG.trace("channelInactive() called, closing remote client ctx");
remoteCtx.close();
}
* Opens TCP port specified in config.ini, creates bridge between this port and local netconf server.
*/
public class NetconfTCPActivator implements BundleActivator {
- private static final Logger logger = LoggerFactory.getLogger(NetconfTCPActivator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfTCPActivator.class);
private ProxyServer proxyServer;
@Override
public void start(BundleContext context) {
final Optional<InetSocketAddress> maybeAddress = NetconfConfigUtil.extractNetconfServerAddress(context, InfixProp.tcp);
if (maybeAddress.isPresent() == false) {
- logger.debug("Netconf tcp server is not configured to start");
+ LOG.debug("Netconf tcp server is not configured to start");
return;
}
InetSocketAddress address = maybeAddress.get();
if (address.getAddress().isAnyLocalAddress()) {
- logger.warn("Unprotected netconf TCP address is configured to ANY local address. This is a security risk. " +
- "Consider changing {} to 127.0.0.1", NetconfConfigUtil.getNetconfServerAddressKey(InfixProp.tcp));
+ LOG.warn("Unprotected netconf TCP address is configured to ANY local address. This is a security risk. Consider changing {} to 127.0.0.1",
+ NetconfConfigUtil.getNetconfServerAddressKey(InfixProp.tcp));
}
- logger.info("Starting TCP netconf server at {}", address);
+ LOG.info("Starting TCP netconf server at {}", address);
proxyServer = new ProxyServer(address, NetconfConfigUtil.getNetconfLocalAddress());
}
void validate() {
checkArgument(deviceCount > 0, "Device count has to be > 0");
- checkArgument(startingPort > 1024, "Starting port has to be > 1024");
+ checkArgument(startingPort > 1023, "Starting port has to be > 1023");
if(schemasDir != null) {
checkArgument(schemasDir.exists(), "Schemas dir has to exist");
final NetconfDeviceSimulator netconfDeviceSimulator = new NetconfDeviceSimulator();
try {
final List<Integer> openDevices = netconfDeviceSimulator.start(params);
+ if (openDevices.size() == 0) {
+ LOG.error("Failed to start any simulated devices, exiting...");
+ System.exit(1);
+ }
if(params.distroFolder != null) {
final ConfigGenerator configGenerator = new ConfigGenerator(params.distroFolder, openDevices);
final List<File> generated = configGenerator.generate(params.ssh, params.generateConfigBatchSize, params.generateConfigsTimeout, params.generateConfigsAddress);
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
+import java.net.BindException;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.URI;
final PEMGeneratorHostKeyProvider keyPairProvider = getPemGeneratorHostKeyProvider();
for (int i = 0; i < params.deviceCount; i++) {
+ if (currentPort > 65535) {
+ LOG.warn("Port cannot be greater than 65535, stopping further attempts.");
+ break;
+ }
final InetSocketAddress address = getAddress(currentPort);
final ChannelFuture server;
final SshProxyServer sshServer = new SshProxyServer(minaTimerExecutor, nettyThreadgroup, nioExecutor);
sshServer.bind(getSshConfiguration(bindingAddress, tcpLocalAddress));
sshWrappers.add(sshServer);
- } catch (final Exception e) {
- LOG.warn("Cannot start simulated device on {}, skipping", address, e);
+ } catch (final BindException e) {
+ LOG.warn("Cannot start simulated device on {}, port already in use. Skipping.", address);
// Close local server and continue
server.cancel(true);
if(server.isDone()) {
server.channel().close();
}
continue;
+ } catch (final IOException e) {
+ LOG.warn("Cannot start simulated device on {} due to IOException.", address, e);
+ break;
} finally {
currentPort++;
}
if(openDevices.size() == params.deviceCount) {
LOG.info("All simulated devices started successfully from port {} to {}", params.startingPort, currentPort - 1);
+ } else if (openDevices.size() == 0) {
+ LOG.warn("No simulated devices started.");
} else {
LOG.warn("Not all simulated devices started successfully. Started devices ar on ports {}", openDevices);
}
*/
package org.opendaylight.controller.netconf.auth.usermanager;
+import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.netconf.auth.AuthProvider;
import org.opendaylight.controller.sal.authorization.AuthResultEnum;
import org.opendaylight.controller.usermanager.IUserManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* AuthProvider implementation delegating to AD-SAL UserManager instance.
*/
public class AuthProviderImpl implements AuthProvider {
- private static final Logger logger = LoggerFactory.getLogger(AuthProviderImpl.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AuthProviderImpl.class);
private IUserManager nullableUserManager;
final ServiceTrackerCustomizer<IUserManager, IUserManager> customizer = new ServiceTrackerCustomizer<IUserManager, IUserManager>() {
@Override
public IUserManager addingService(final ServiceReference<IUserManager> reference) {
- logger.trace("UerManager {} added", reference);
+ LOG.trace("UerManager {} added", reference);
nullableUserManager = bundleContext.getService(reference);
return nullableUserManager;
}
@Override
public void modifiedService(final ServiceReference<IUserManager> reference, final IUserManager service) {
- logger.trace("Replacing modified UerManager {}", reference);
+ LOG.trace("Replacing modified UerManager {}", reference);
nullableUserManager = service;
}
@Override
public void removedService(final ServiceReference<IUserManager> reference, final IUserManager service) {
- logger.trace("Removing UerManager {}. This AuthProvider will fail to authenticate every time", reference);
+ LOG.trace("Removing UerManager {}. This AuthProvider will fail to authenticate every time", reference);
synchronized (AuthProviderImpl.this) {
nullableUserManager = null;
}
@Override
public synchronized boolean authenticated(final String username, final String password) {
if (nullableUserManager == null) {
- logger.warn("Cannot authenticate user '{}', user manager service is missing", username);
+ LOG.warn("Cannot authenticate user '{}', user manager service is missing", username);
throw new IllegalStateException("User manager service is not available");
}
final AuthResultEnum authResult = nullableUserManager.authenticate(username, password);
- logger.debug("Authentication result for user '{}' : {}", username, authResult);
+ LOG.debug("Authentication result for user '{}' : {}", username, authResult);
return authResult.equals(AuthResultEnum.AUTH_ACCEPT) || authResult.equals(AuthResultEnum.AUTH_ACCEPT_LOC);
}
package org.opendaylight.controller.netconf.util;
import com.google.common.base.Preconditions;
-
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
public final class NetconfUtil {
- private static final Logger logger = LoggerFactory.getLogger(NetconfUtil.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfUtil.class);
private NetconfUtil() {}
if (element.getName().equals(XmlNetconfConstants.OK)) {
return response;
}
- logger.warn("Can not load last configuration. Operation failed.");
+ LOG.warn("Can not load last configuration. Operation failed.");
throw new IllegalStateException("Can not load last configuration. Operation failed: "
+ XmlUtil.toString(response));
}
import java.util.Collections;
import java.util.Map;
-
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
public class MissingNameSpaceException extends NetconfDocumentedException {
import java.util.Collections;
import java.util.Map;
-
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
public class UnexpectedElementException extends NetconfDocumentedException {
import java.util.Collections;
import java.util.Map;
-
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
public class UnexpectedNamespaceException extends NetconfDocumentedException {
package org.opendaylight.controller.netconf.util.messages;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.util.Set;
-
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-
/**
* NetconfMessage that can carry additional header with session metadata. See {@link org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader}
*/
package org.opendaylight.controller.netconf.util.messages;
import com.google.common.base.Preconditions;
-
import java.util.regex.Matcher;
import java.util.regex.Pattern;
package org.opendaylight.controller.netconf.util.messages;
-import java.nio.ByteBuffer;
-
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
+import java.nio.ByteBuffer;
/**
* Netconf message header is used only when chunked framing mechanism is
public final class NetconfMessageUtil {
- private static final Logger logger = LoggerFactory.getLogger(NetconfMessageUtil.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfMessageUtil.class);
private NetconfMessageUtil() {}
try {
return input.getTextContent().trim();
} catch (NetconfDocumentedException e) {
- logger.trace("Error fetching inpit text content becauese {}",e);
+ LOG.trace("Error fetching input text content",e);
return null;
}
}
package org.opendaylight.controller.netconf.util.messages;
import com.google.common.base.Preconditions;
-
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-
-import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.w3c.dom.NamedNodeMap;
public final class SendErrorExceptionUtil {
- private static final Logger logger = LoggerFactory.getLogger(SendErrorExceptionUtil.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SendErrorExceptionUtil.class);
private SendErrorExceptionUtil() {}
public static void sendErrorMessage(final NetconfSession session,
final NetconfDocumentedException sendErrorException) {
- logger.trace("Sending error {}", sendErrorException.getMessage(), sendErrorException);
+ LOG.trace("Sending error {}", sendErrorException.getMessage(), sendErrorException);
final Document errorDocument = createDocument(sendErrorException);
ChannelFuture f = session.sendMessage(new NetconfMessage(errorDocument));
f.addListener(new SendErrorVerifyingListener(sendErrorException));
}
public static void sendErrorMessage(Channel channel, NetconfDocumentedException sendErrorException) {
- logger.trace("Sending error {}", sendErrorException.getMessage(), sendErrorException);
+ LOG.trace("Sending error {}", sendErrorException.getMessage(), sendErrorException);
final Document errorDocument = createDocument(sendErrorException);
ChannelFuture f = channel.writeAndFlush(new NetconfMessage(errorDocument));
f.addListener(new SendErrorVerifyingListener(sendErrorException));
public static void sendErrorMessage(NetconfSession session, NetconfDocumentedException sendErrorException,
NetconfMessage incommingMessage) {
final Document errorDocument = createDocument(sendErrorException);
- logger.trace("Sending error {}", XmlUtil.toString(errorDocument));
+ LOG.trace("Sending error {}", XmlUtil.toString(errorDocument));
tryToCopyAttributes(incommingMessage.getDocument(), errorDocument, sendErrorException);
ChannelFuture f = session.sendMessage(new NetconfMessage(errorDocument));
f.addListener(new SendErrorVerifyingListener(sendErrorException));
rpcReply.setAttributeNode((Attr) errorDocument.importNode(attr, true));
}
} catch (final Exception e) {
- logger.warn("Unable to copy incomming attributes to {}, returned rpc-error might be invalid for client",
+ LOG.warn("Unable to copy incomming attributes to {}, returned rpc-error might be invalid for client",
sendErrorException, e);
}
}
import org.slf4j.LoggerFactory;
public final class NetconfConfigUtil {
- private static final Logger logger = LoggerFactory.getLogger(NetconfConfigUtil.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfConfigUtil.class);
private static final String PREFIX_PROP = "netconf.";
try {
return Long.parseLong(timeoutString);
} catch (final NumberFormatException e) {
- logger.warn("Cannot parse {} property: {}, using defaults", key, timeoutString, e);
+ LOG.warn("Cannot parse {} property: {}, using defaults", key, timeoutString, e);
return DEFAULT_TIMEOUT_MILLIS;
}
}
try {
return Optional.of(parseAddress(address, port));
} catch (final RuntimeException e) {
- logger.warn("Unable to parse {} netconf address from {}:{}, fallback to default",
+ LOG.warn("Unable to parse {} netconf address from {}:{}, fallback to default",
infixProp, address, port, e);
}
}
package org.opendaylight.controller.netconf.util.xml;
+import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
-
import javax.xml.namespace.NamespaceContext;
-import com.google.common.collect.ImmutableMap;
-
// http://www.ibm.com/developerworks/library/x-nmspccontext/
public class HardcodedNamespaceResolver implements NamespaceContext {
private final Map<String/* prefix */, String/* namespace */> prefixesToNamespaces;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
-
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
public final class XMLNetconfUtil {
public static final String DEFAULT_NAMESPACE_PREFIX = "";
private final Element element;
- private static final Logger logger = LoggerFactory.getLogger(XmlElement.class);
+ private static final Logger LOG = LoggerFactory.getLogger(XmlElement.class);
private XmlElement(Element element) {
this.element = element;
public void checkNamespace(String expectedNamespace) throws UnexpectedNamespaceException, MissingNameSpaceException {
if (!getNamespace().equals(expectedNamespace))
- {
+ {
throw new UnexpectedNamespaceException(String.format("Unexpected namespace %s should be %s",
getNamespace(),
expectedNamespace),
try {
sb.append(", namespace='").append(getNamespace()).append('\'');
} catch (MissingNameSpaceException e) {
- logger.trace("Missing namespace for element.");
+ LOG.trace("Missing namespace for element.");
}
}
sb.append('}');
package org.opendaylight.controller.netconf.util.xml;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
-
import javax.xml.transform.Source;
import javax.xml.transform.dom.DOMSource;
import javax.xml.validation.Schema;
import javax.xml.validation.Validator;
-
import org.w3c.dom.Document;
import org.xml.sax.SAXException;
-import com.google.common.base.Preconditions;
-
public final class XmlNetconfValidator {
private static final Schema SCHEMA;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
-
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
-
import javax.xml.XMLConstants;
import javax.xml.namespace.QName;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.validation.SchemaFactory;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
-
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
public static Element createTextElementWithNamespacedContent(Document document, String qName, String prefix,
String namespace, String contentWithoutPrefix) {
- return createTextElementWithNamespacedContent(document, qName, prefix, namespace, contentWithoutPrefix, Optional.<String>absent());
+ return createTextElementWithNamespacedContent(document, qName, prefix, namespace, contentWithoutPrefix, Optional.<String>absent());
}
public static Element createTextElementWithNamespacedContent(Document document, String qName, String prefix,
*/
package org.opendaylight.controller.netconf.util;
-import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import static org.hamcrest.CoreMatchers.containsString;
+
import org.junit.Test;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Document;
package org.opendaylight.controller.netconf.util.mapping;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
public class AbstractLastNetconfOperationTest {
class LastNetconfOperationImplTest extends AbstractLastNetconfOperation {
package org.opendaylight.controller.netconf.util.mapping;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
public class AbstractNetconfOperationTest {
class NetconfOperationImpl extends AbstractNetconfOperation {
package org.opendaylight.controller.netconf.util.mapping;
+import static org.junit.Assert.assertEquals;
+
import org.junit.Test;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import static org.junit.Assert.assertEquals;
-
public class AbstractSingletonNetconfOperationTest {
class SingletonNCOperationImpl extends AbstractSingletonNetconfOperation {
package org.opendaylight.controller.netconf.util.messages;
+import static org.junit.Assert.assertEquals;
+
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
public class NetconfHelloMessageAdditionalHeaderTest {
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+
import com.google.common.base.Optional;
import java.util.Set;
import org.junit.Before;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+
import com.google.common.base.Charsets;
import org.junit.Test;
package org.opendaylight.controller.netconf.util.messages;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.util.Collection;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.w3c.dom.Document;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class NetconfMessageUtilTest {
@Test
public void testNetconfMessageUtil() throws Exception {
package org.opendaylight.controller.netconf.util.messages;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.w3c.dom.Document;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
public class SendErrorExceptionUtilTest {
NetconfSession netconfSession;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+
import com.google.common.base.Optional;
import io.netty.channel.local.LocalAddress;
import java.net.InetSocketAddress;
package org.opendaylight.controller.netconf.util.test;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.CharStreams;
+import com.google.common.io.InputSupplier;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-
import javax.xml.parsers.ParserConfigurationException;
-
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.io.CharStreams;
-import com.google.common.io.InputSupplier;
-
public class XmlFileLoader {
public static NetconfMessage xmlFileToNetconfMessage(final String fileName) throws IOException, SAXException,
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.common.base.Optional;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import com.google.common.base.Optional;
-
public class XmlElementTest {
private final String elementAsString = "<top xmlns=\"namespace\" xmlns:a=\"attrNamespace\" a:attr1=\"value1\" attr2=\"value2\">" +
<lexical-values/>
</fidelity>
</start-exi>
-</rpc>
+</rpc>
\ No newline at end of file
<rpc message-id="a" a="64" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<stop-exi xmlns="urn:ietf:params:xml:ns:netconf:exi:1.0"/>
-</rpc>
+</rpc>
\ No newline at end of file