<artifactId>clustering.stub</artifactId>
<version>${clustering.stub.version}</version>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>configuration</artifactId>
- <version>${controller.version}</version>
- </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>configuration.implementation</artifactId>
<artifactId>netty-timer-config</artifactId>
<version>${config.version}</version>
</dependency>
-
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>configuration</artifactId>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-broker-impl</artifactId>
<version>${mdsal.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-remote</artifactId>
+ <version>${mdsal.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-restconf-broker</artifactId>
+ <version>${mdsal.version}</version>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>concepts</artifactId>
<version>${yangtools.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>restconf-client-api</artifactId>
+ <version>${yangtools.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>restconf-client-impl</artifactId>
+ <version>${yangtools.version}</version>
+ </dependency>
<!-- config-->
<dependency>
<artifactId>sal-connector-api</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal</artifactId>
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-remote</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Supporting Libraries -->
<dependency>
* Returns a session specific instance (implementation) of requested
* YANG module implentation / service provided by consumer.
*
- * @param service
- * Broker service
* @return Session specific implementation of service
*/
<T extends RpcService> T getRpcService(Class<T> module);
org.opendaylight.controller.sal.binding.impl.*,
org.opendaylight.controller.sal.binding.codegen,
org.opendaylight.controller.sal.binding.codegen.*,
- org.opendaylight.controller.sal.binding.dom.*,
+ <!--org.opendaylight.controller.sal.binding.dom.*,-->
org.opendaylight.controller.sal.binding.osgi.*,
</Private-Package>
</instructions>
*/
package org.opendaylight.controller.config.yang.md.sal.binding.impl;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import java.util.Hashtable;
import java.util.Map.Entry;
import java.util.Set;
-
import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
import org.opendaylight.yangtools.concepts.Delegator;
import org.opendaylight.yangtools.sal.binding.generator.impl.RuntimeGeneratedMappingServiceImpl;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
/**
*
*/
@Override
public java.lang.AutoCloseable createInstance() {
-
+
RuntimeGeneratedMappingServiceProxy potential = tryToReuseGlobalInstance();
if(potential != null) {
return potential;
BindingIndependentMappingService, //
Delegator<BindingIndependentMappingService>, //
AutoCloseable {
-
+
private BindingIndependentMappingService delegate;
private ServiceReference<BindingIndependentMappingService> reference;
private BundleContext bundleContext;
this.delegate = Preconditions.checkNotNull(delegate);
}
- @Override
public CodecRegistry getCodecRegistry() {
return delegate.getCodecRegistry();
}
- @Override
public CompositeNode toDataDom(DataObject data) {
return delegate.toDataDom(data);
}
- @Override
public Entry<InstanceIdentifier, CompositeNode> toDataDom(
Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject> entry) {
return delegate.toDataDom(entry);
}
- @Override
public InstanceIdentifier toDataDom(
org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject> path) {
return delegate.toDataDom(path);
}
- @Override
public DataObject dataObjectFromDataDom(
org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject> path,
CompositeNode result) throws DeserializationException {
return delegate.dataObjectFromDataDom(path, result);
}
- @Override
public org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> fromDataDom(InstanceIdentifier entry)
throws DeserializationException {
return delegate.fromDataDom(entry);
}
- @Override
public Set<QName> getRpcQNamesFor(Class<? extends RpcService> service) {
return delegate.getRpcQNamesFor(service);
}
- @Override
- public DataContainer dataObjectFromDataDom(Class<? extends DataContainer> inputClass, CompositeNode domInput) {
- return delegate.dataObjectFromDataDom(inputClass, domInput);
- }
-
@Override
public Optional<Class<? extends RpcService>> getRpcServiceClassFor(String namespace, String revision) {
- return delegate.getRpcServiceClassFor(namespace, revision);
+ return delegate.getRpcServiceClassFor(namespace,revision);
}
+ public DataContainer dataObjectFromDataDom(Class<? extends DataContainer> inputClass, CompositeNode domInput) {
+ return delegate.dataObjectFromDataDom(inputClass, domInput);
+ }
+
@Override
public void close() throws Exception {
if(delegate != null) {
import org.slf4j.LoggerFactory\r
import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder\rimport com.google.common.collect.Multimaps\r
import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener\r
-\r
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener\rimport java.util.Set
+import com.google.common.collect.ImmutableSet
+import java.util.concurrent.Future
+
class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {\r
\r
val ListenerRegistry<NotificationInterestListener> interestListeners = ListenerRegistry.create;\r
listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)\r
}\r
val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;\r
- executor.invokeAll(tasks);\r
+ submitAll(executor,tasks);\r
+ }\r
+ \r
+ def submitAll(ExecutorService service, Set<NotifyTask> tasks) {
+ val ret = ImmutableSet.<Future<Object>>builder();\r
+ for(task : tasks) {\r
+ ret.add(service.submit(task));\r
+ }\r
+ return ret.build();
}\r
\r
override <T extends Notification> registerNotificationListener(Class<T> notificationType,\r
*/
package org.opendaylight.controller.sal.binding.impl;
-import static com.google.common.base.Preconditions.checkState;
-
+import com.google.common.collect.ImmutableClassToInstanceMap;
import org.opendaylight.controller.md.sal.binding.util.AbstractBindingSalProviderInstance;
import org.opendaylight.controller.md.sal.binding.util.BindingContextUtils;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableClassToInstanceMap;
+import static com.google.common.base.Preconditions.checkState;
public class RootBindingAwareBroker implements //
Mutable, //
private final DataChange<P, D> dataChange;
private final D originalConfigurationSubtree;
-
-
private final D originalOperationalSubtree;
private final D updatedOperationalSubtree;
private final D updatedConfigurationSubtree;
<build>
<plugins>
+ <!-- TODO - unite yang-maven-plugin configuration in md-sal-->
<plugin>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-maven-plugin</artifactId>
checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
checkState(eventExecutor != null, "Event executor must be set.");
- val listener = new NetconfDeviceListener(this, eventExecutor);
+ val listener = new NetconfDeviceListener(this);
val task = startClientTask(dispatcher, listener)
if (mountInstance != null) {
commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
*/
package org.opendaylight.controller.sal.connect.netconf;
-import com.google.common.base.Objects;
-
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Promise;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.eclipse.xtext.xbase.lib.Exceptions;
-import org.eclipse.xtext.xbase.lib.Functions.Function0;
import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener;
import org.opendaylight.controller.netconf.client.NetconfClientSession;
-import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
-import org.opendaylight.controller.netconf.util.xml.XmlElement;
-import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
-import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.controller.sal.connect.netconf.NetconfMapping;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.w3c.dom.Document;
-
-@SuppressWarnings("all")
-class NetconfDeviceListener extends NetconfClientSessionListener {
- private final NetconfDevice device;
- private final EventExecutor eventExecutor;
-
- public NetconfDeviceListener(final NetconfDevice device, final EventExecutor eventExecutor) {
- this.device = device;
- this.eventExecutor = eventExecutor;
- }
- private Promise<NetconfMessage> messagePromise;
- private ConcurrentMap<String, Promise<NetconfMessage>> promisedMessages;
+import com.google.common.base.Preconditions;
- private final ReentrantLock promiseLock = new ReentrantLock();
+class NetconfDeviceListener extends AbstractNetconfClientNotifySessionListener {
+ private final NetconfDevice device;
- public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
- if (isNotification(message)) {
- this.onNotification(session, message);
- } else {
- try {
- this.promiseLock.lock();
- boolean _notEquals = (!Objects.equal(this.messagePromise, null));
- if (_notEquals) {
- this.device.logger.debug("Setting promised reply {} with message {}", this.messagePromise, message);
- this.messagePromise.setSuccess(message);
- this.messagePromise = null;
- }
- } finally {
- this.promiseLock.unlock();
- }
- }
+ public NetconfDeviceListener(final NetconfDevice device) {
+ this.device = Preconditions.checkNotNull(device);
}
/**
* NetconfClientSessionListener#onMessage(NetconfClientSession,
* NetconfMessage)}
*/
+ @Override
public void onNotification(final NetconfClientSession session, final NetconfMessage message) {
this.device.logger.debug("Received NETCONF notification.", message);
CompositeNode domNotification = null;
}
}
}
-
- private static CompositeNode getNotificationBody(final CompositeNode node) {
- List<Node<? extends Object>> _children = node.getChildren();
- for (final Node<? extends Object> child : _children) {
- if ((child instanceof CompositeNode)) {
- return ((CompositeNode) child);
- }
- }
- return null;
- }
-
- public NetconfMessage getLastMessage(final int attempts, final int attemptMsDelay) throws InterruptedException {
- final Promise<NetconfMessage> promise = this.promiseReply();
- this.device.logger.debug("Waiting for reply {}", promise);
- int _plus = (attempts * attemptMsDelay);
- final boolean messageAvailable = promise.await(_plus);
- if (messageAvailable) {
- try {
- try {
- return promise.get();
- } catch (Throwable _e) {
- throw Exceptions.sneakyThrow(_e);
- }
- } catch (final Throwable _t) {
- if (_t instanceof ExecutionException) {
- final ExecutionException e = (ExecutionException) _t;
- IllegalStateException _illegalStateException = new IllegalStateException(e);
- throw _illegalStateException;
- } else {
- throw Exceptions.sneakyThrow(_t);
- }
- }
- }
- String _plus_1 = ("Unsuccessful after " + Integer.valueOf(attempts));
- String _plus_2 = (_plus_1 + " attempts.");
- IllegalStateException _illegalStateException_1 = new IllegalStateException(_plus_2);
- throw _illegalStateException_1;
- }
-
- public synchronized Promise<NetconfMessage> promiseReply() {
- this.device.logger.debug("Promising reply.");
- this.promiseLock.lock();
- try {
- boolean _equals = Objects.equal(this.messagePromise, null);
- if (_equals) {
- Promise<NetconfMessage> _newPromise = this.eventExecutor.<NetconfMessage> newPromise();
- this.messagePromise = _newPromise;
- return this.messagePromise;
- }
- return this.messagePromise;
- } finally {
- this.promiseLock.unlock();
- }
- }
-
- public boolean isNotification(final NetconfMessage message) {
- Document _document = message.getDocument();
- final XmlElement xmle = XmlElement.fromDomDocument(_document);
- String _name = xmle.getName();
- return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(_name);
- }
}
<version>1.1-SNAPSHOT</version>
</parent>
<artifactId>sal-remote</artifactId>
- <packaging>jar</packaging>
+ <packaging>bundle</packaging>
<scm>
<connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
<developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
<url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
<tag>HEAD</tag>
</scm>
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-api</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
<build>
<plugins>
<plugin>
</plugin>
</plugins>
</build>
-
- <dependencies>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-binding-api</artifactId>
- <version>1.1-SNAPSHOT</version>
- </dependency>
- </dependencies>
</project>
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.restconf.service.impl;
-
-import java.util.concurrent.Future;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.BeginTransactionOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.SalRemoteService;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
-public class SalRemoteServiceImpl implements SalRemoteService {
- @Override
- public Future<RpcResult<BeginTransactionOutput>> beginTransaction() {
- return null;
- }
-
- @Override
- public Future<RpcResult<CreateDataChangeEventSubscriptionOutput>> createDataChangeEventSubscription(CreateDataChangeEventSubscriptionInput input) {
- return null;
- }
-
- @Override
- public Future<RpcResult<CreateNotificationStreamOutput>> createNotificationStream(CreateNotificationStreamInput input) {
- return null;
- }
-}
<version>1.1-SNAPSHOT</version>
</parent>
<artifactId>sal-restconf-broker</artifactId>
- <packaging>jar</packaging>
+ <packaging>bundle</packaging>
<scm>
<connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
<developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
<url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
<tag>HEAD</tag>
</scm>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <extensions>true</extensions>
- </plugin>
- </plugins>
- </build>
-
<dependencies>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-binding-api</artifactId>
- <version>1.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-util</artifactId>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-remote</artifactId>
- <version>1.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-broker-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-config</artifactId>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-core-api</artifactId>
- <version>1.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>restconf-client-api</artifactId>
<version>${yangtools.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>restconf-client-impl</artifactId>
+ <version>${yangtools.version}</version>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+ <Import-Package>
+ *
+ </Import-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.binding.impl;
-
-import org.opendaylight.controller.sal.binding.api.NotificationListener;
-import org.opendaylight.controller.sal.binding.api.NotificationService;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.binding.Notification;
-
-public class NotificationServiceImpl implements NotificationService {
- @Override
- public <T extends Notification> void addNotificationListener(Class<T> notificationType, NotificationListener<T> listener) {
-
- }
-
- @Override
- public void addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
-
- }
-
- @Override
- public void removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
-
- }
-
- @Override
- public <T extends Notification> void removeNotificationListener(Class<T> notificationType, NotificationListener<T> listener) {
-
- }
-
- @Override
- public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(Class<T> notificationType, NotificationListener<T> listener) {
- //TODO implementation using sal-remote
- return null;
- }
-
- @Override
- public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
- //TODO implementation using sal-remote
- return null;
- }
-}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.sal.restconf.binding.impl;
import java.net.URL;
import java.util.concurrent.Future;
package org.opendaylight.controller.sal.restconf.broker;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Consumer;
-import org.opendaylight.controller.sal.core.api.Provider;
+import com.google.common.collect.ImmutableClassToInstanceMap;
+import org.opendaylight.controller.md.sal.binding.util.BindingContextUtils;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.binding.api.BindingAwareService;
+import org.opendaylight.controller.sal.binding.api.NotificationService;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
+import org.opendaylight.controller.sal.restconf.broker.impl.RemoteServicesFactory;
+import org.opendaylight.yangtools.restconf.client.api.RestconfClientContext;
import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkState;
-public class SalRemoteServiceBroker implements Broker,AutoCloseable {
+public class SalRemoteServiceBroker implements BindingAwareBroker,AutoCloseable {
- @Override
- public void close() throws Exception {
+ private static final Logger logger = LoggerFactory.getLogger(SalRemoteServiceBroker.class.toString());
+ private ImmutableClassToInstanceMap<BindingAwareService> supportedConsumerServices;
+
+ private final String identifier;
+
+ private RpcConsumerRegistry rpcBroker;
+ private NotificationService notificationBroker;
+ private DataBrokerService dataBroker;
+ private final RemoteServicesFactory servicesFactory;
+
+ public SalRemoteServiceBroker(String instanceName,RestconfClientContext clientContext){
+ this.identifier = instanceName;
+ this.servicesFactory = new RemoteServicesFactory(clientContext);
}
- @Override
- public ConsumerSession registerConsumer(Consumer cons, BundleContext context) {
- return null;
+ public void start() {
+ logger.info("Starting Binding Aware Broker: {}", identifier);
+
+ supportedConsumerServices = ImmutableClassToInstanceMap.<BindingAwareService> builder()
+ .put(NotificationService.class, servicesFactory.getNotificationService()) //
+ .put(DataBrokerService.class,servicesFactory.getDataBrokerService() ) //
+ .put(RpcConsumerRegistry.class,servicesFactory.getRpcConsumerRegistry() ).build();
}
+ public ProviderContext registerProvider(BindingAwareProvider provider, BundleContext ctx) {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void close() throws Exception {
+ //TODO decide if serviceFactory should close clientContext or it has to be closed by consumer
+ }
@Override
- public ProviderSession registerProvider(Provider prov, BundleContext context) {
- return null;
+ public ConsumerContext registerConsumer(BindingAwareConsumer consumer, BundleContext ctx) {
+ checkState(supportedConsumerServices != null, "Broker is not initialized.");
+ return BindingContextUtils.createConsumerContextAndInitialize(consumer, supportedConsumerServices);
}
+
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.broker.event;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.DataChangedNotification;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class RemoteDataChangeEvent implements DataChangeEvent<InstanceIdentifier<? extends DataObject>,DataObject> {
+
+
+ private final DataChangedNotification dataChangedNotification;
+
+
+ public RemoteDataChangeEvent(DataChangedNotification dataChangedNotification){
+
+ this.dataChangedNotification = dataChangedNotification;
+ }
+
+ @Override
+ public DataObject getOriginalConfigurationSubtree() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataObject getOriginalOperationalSubtree() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataObject getUpdatedConfigurationSubtree() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataObject getUpdatedOperationalSubtree() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<InstanceIdentifier<?>, DataObject> getCreatedOperationalData() {
+ return new HashMap<InstanceIdentifier<?>, DataObject>(){{
+ for (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent d :dataChangedNotification.getDataChangeEvent()){
+ if (d.getOperation().getIntValue() == 0 && d.getStore().getIntValue() == 1){
+ put(d.getPath(),d);
+ }
+ }
+ }};
+ }
+
+ @Override
+ public Map<InstanceIdentifier<?>, DataObject> getCreatedConfigurationData() {
+ return new HashMap<InstanceIdentifier<?>, DataObject>(){{
+ for (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent d :dataChangedNotification.getDataChangeEvent()){
+ if (d.getOperation().getIntValue() == 0 && d.getStore().getIntValue() == 0){
+ put(d.getPath(),d);
+ }
+ }
+ }};
+ }
+
+ @Override
+ public Map<InstanceIdentifier<?>, DataObject> getUpdatedOperationalData() {
+ return new HashMap<InstanceIdentifier<?>, DataObject>(){{
+ for (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent d :dataChangedNotification.getDataChangeEvent()){
+ if (d.getOperation().getIntValue() == 1 && d.getStore().getIntValue() == 1){
+ put(d.getPath(),d);
+ }
+ }
+ }};
+ }
+
+ @Override
+ public Map<InstanceIdentifier<?>, DataObject> getUpdatedConfigurationData() {
+ return new HashMap<InstanceIdentifier<?>, DataObject>(){{
+ for (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent d :dataChangedNotification.getDataChangeEvent()){
+ if (d.getOperation().getIntValue() == 1 && d.getStore().getIntValue() == 0){
+ put(d.getPath(),d);
+ }
+ }
+ }};
+ }
+
+ @Override
+ public Set<InstanceIdentifier<?>> getRemovedConfigurationData() {
+ return new HashSet<InstanceIdentifier<?>>(){{
+ for (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent d :dataChangedNotification.getDataChangeEvent()){
+ if (d.getOperation().getIntValue() == 2 && d.getStore().getIntValue() == 0){
+ add(d.getPath());
+ }
+ }
+ }};
+ }
+
+ @Override
+ public Set<InstanceIdentifier<?>> getRemovedOperationalData() {
+ return new HashSet<InstanceIdentifier<?>>(){{
+ for (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent d :dataChangedNotification.getDataChangeEvent()){
+ if (d.getOperation().getIntValue() == 2 && d.getStore().getIntValue() == 1){
+ add(d.getPath());
+ }
+ }
+ }};
+ }
+
+ @Override
+ public Map<InstanceIdentifier<?>, DataObject> getOriginalConfigurationData() {
+ return new HashMap<InstanceIdentifier<?>, DataObject>(){{
+ for (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent d :dataChangedNotification.getDataChangeEvent()){
+ if (d.getOperation().getIntValue() == 1 && d.getStore().getIntValue() == 0){
+ put(d.getPath(),d);
+ }
+ }
+ }};
+ }
+
+ @Override
+ public Map<InstanceIdentifier<?>, DataObject> getOriginalOperationalData() {
+ return new HashMap<InstanceIdentifier<?>, DataObject>(){{
+ for (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent d :dataChangedNotification.getDataChangeEvent()){
+ if (d.getOperation().getIntValue() == 1 && d.getStore().getIntValue() == 1){
+ put(d.getPath(),d);
+ }
+ }
+ }};
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.broker.impl;
+
+import com.google.common.base.Optional;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.common.DataStoreIdentifier;
+import org.opendaylight.controller.sal.restconf.broker.listeners.RemoteDataChangeNotificationListener;
+import org.opendaylight.controller.sal.restconf.broker.tools.RemoteStreamTools;
+import org.opendaylight.controller.sal.restconf.broker.transactions.RemoteDataModificationTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.BeginTransactionOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.SalRemoteService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.restconf.client.api.RestconfClientContext;
+import org.opendaylight.yangtools.restconf.client.api.event.EventStreamInfo;
+import org.opendaylight.yangtools.restconf.client.api.event.ListenableEventStreamContext;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.DataRoot;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataBrokerServiceImpl implements DataBrokerService {
+
+ private static final Logger logger = LoggerFactory.getLogger(DataBrokerServiceImpl.class.toString());
+ private RestconfClientContext restconfClientContext;
+ private SalRemoteService salRemoteService;
+
+ public DataBrokerServiceImpl(RestconfClientContext restconfClientContext) {
+ this.restconfClientContext = restconfClientContext;
+ this.salRemoteService = this.restconfClientContext.getRpcServiceContext(SalRemoteService.class).getRpcService();
+ }
+ @Override
+ public <T extends DataRoot> T getData(DataStoreIdentifier store, Class<T> rootType) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ @Override
+ public <T extends DataRoot> T getData(DataStoreIdentifier store, T filter) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ @Override
+ public <T extends DataRoot> T getCandidateData(DataStoreIdentifier store, Class<T> rootType) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ @Override
+ public <T extends DataRoot> T getCandidateData(DataStoreIdentifier store, T filter) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ @Override
+ public RpcResult<DataRoot> editCandidateData(DataStoreIdentifier store, DataRoot changeSet) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ @Override
+ public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ @Override
+ public DataObject getData(InstanceIdentifier<? extends DataObject> data) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ @Override
+ public DataObject getConfigurationData(InstanceIdentifier<?> data) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ @Override
+ public DataModificationTransaction beginTransaction() {
+ Future<RpcResult<BeginTransactionOutput>> rpcResultFuture = this.salRemoteService.beginTransaction();
+ //TODO finish yang model for proper remoteDataModificationTransaction setup
+ RemoteDataModificationTransaction remoteDataModificationTransaction = new RemoteDataModificationTransaction();
+ return remoteDataModificationTransaction;
+ }
+
+ @Override
+ public void registerChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener changeListener) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ @Override
+ public void unregisterChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener changeListener) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ @Override
+ public DataObject readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
+ try {
+ Optional<DataObject> optDataObject = (Optional<DataObject>) this.restconfClientContext.getConfigurationDatastore().readData(path).get();
+ if (optDataObject.isPresent()){
+ return optDataObject.get();
+ }
+ } catch (InterruptedException e) {
+ logger.trace("Reading configuration data interrupted {}",e);
+ } catch (ExecutionException e) {
+ logger.trace("Reading configuration execution exception {}",e);
+ }
+ throw new IllegalStateException("No data to return.");
+ }
+
+ @Override
+ public DataObject readOperationalData(InstanceIdentifier<? extends DataObject> path) {
+ try {
+ Optional<DataObject> optDataObject = (Optional<DataObject>) this.restconfClientContext.getOperationalDatastore().readData(path).get();
+ if (optDataObject.isPresent()){
+ return optDataObject.get();
+ }
+ } catch (InterruptedException e) {
+ logger.trace("Reading configuration data interrupted {}",e);
+ } catch (ExecutionException e) {
+ logger.trace("Reading configuration execution exception {}",e);
+ }
+ throw new IllegalStateException("No data to return.");
+ }
+ @Override
+ public ListenerRegistration<DataChangeListener> registerDataChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener listener) {
+ CreateDataChangeEventSubscriptionInputBuilder inputBuilder = new CreateDataChangeEventSubscriptionInputBuilder();
+ Future<RpcResult<CreateDataChangeEventSubscriptionOutput>> rpcResultFuture = salRemoteService.createDataChangeEventSubscription(inputBuilder.setPath(path).build());
+ String streamName = "";
+ try {
+ if (rpcResultFuture.get().isSuccessful()){
+ streamName = rpcResultFuture.get().getResult().getStreamName();
+ }
+ } catch (InterruptedException e) {
+ logger.trace("Interupted while getting rpc result due to {}",e);
+ } catch (ExecutionException e) {
+ logger.trace("Execution exception while getting rpc result due to {}",e);
+ }
+ final Map<String,EventStreamInfo> desiredEventStream = RemoteStreamTools.createEventStream(restconfClientContext,streamName);
+ ListenableEventStreamContext restConfListenableEventStreamContext = restconfClientContext.getEventStreamContext(desiredEventStream.get(streamName));
+ RemoteDataChangeNotificationListener remoteDataChangeNotificationListener = new RemoteDataChangeNotificationListener(listener);
+ restConfListenableEventStreamContext.registerNotificationListener(remoteDataChangeNotificationListener);
+ return new SalRemoteDataListenerRegistration(listener);
+ }
+
+ private class SalRemoteDataListenerRegistration implements ListenerRegistration<DataChangeListener> {
+ private DataChangeListener dataChangeListener;
+ public SalRemoteDataListenerRegistration(DataChangeListener dataChangeListener){
+ this.dataChangeListener = dataChangeListener;
+ }
+ @Override
+ public DataChangeListener getInstance() {
+ return this.dataChangeListener;
+ }
+ @Override
+ public void close() throws Exception {
+ //noop
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.broker.impl;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.controller.sal.binding.api.NotificationService;
+import org.opendaylight.controller.sal.restconf.broker.listeners.RemoteNotificationListener;
+import org.opendaylight.controller.sal.restconf.broker.tools.RemoteStreamTools;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.QName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.SalRemoteService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.restconf.client.api.RestconfClientContext;
+import org.opendaylight.yangtools.restconf.client.api.event.EventStreamInfo;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+public class NotificationServiceImpl implements NotificationService {
+ private SalRemoteService salRemoteService;
+ private RestconfClientContext restconfClientContext;
+
+ private final Multimap<Class<? extends Notification>,NotificationListener<? extends Object>> listeners;
+ private ExecutorService _executor;
+
+ public NotificationServiceImpl(RestconfClientContext restconfClienetContext){
+ this.restconfClientContext = restconfClienetContext;
+ this.salRemoteService = this.restconfClientContext.getRpcServiceContext(SalRemoteService.class).getRpcService();
+
+ HashMultimap<Class<? extends Notification>,NotificationListener<? extends Object>> _create = HashMultimap.<Class<? extends Notification>, NotificationListener<? extends Object>>create();
+ SetMultimap<Class<? extends Notification>,NotificationListener<? extends Object>> _synchronizedSetMultimap = Multimaps.<Class<? extends Notification>, NotificationListener<? extends Object>>synchronizedSetMultimap(_create);
+ this.listeners = _synchronizedSetMultimap;
+
+ }
+ public ExecutorService getExecutor() {
+ return this._executor;
+ }
+
+ public void setExecutor(final ExecutorService executor) {
+ this._executor = executor;
+ }
+
+ @Override
+ public <T extends Notification> void addNotificationListener(Class<T> notificationType, NotificationListener<T> listener) {
+ this.listeners.put(notificationType, listener);
+ }
+
+ @Override
+ public void addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
+ UnsupportedOperationException _unsupportedOperationException = new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");
+ throw _unsupportedOperationException;
+ }
+
+ @Override
+ public void removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
+ UnsupportedOperationException _unsupportedOperationException = new UnsupportedOperationException(
+ "Deprecated method. Use RegisterNotificationListener returned value to close registration.");
+ throw _unsupportedOperationException;
+ }
+
+ @Override
+ public <T extends Notification> void removeNotificationListener(Class<T> notificationType, NotificationListener<T> listener) {
+ this.listeners.remove(notificationType, listener);
+ }
+
+ @Override
+ public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(Class<T> notificationType, NotificationListener<T> listener) {
+ //TODO implementation using sal-remote
+ List<QName> notifications = new ArrayList<QName>();
+ notifications.add(new QName(notificationType.toString()));
+ String notificationStreamName = RemoteStreamTools.createNotificationStream(salRemoteService, notifications);
+ final Map<String,EventStreamInfo> desiredEventStream = RemoteStreamTools.createEventStream(restconfClientContext, notificationStreamName);
+ RemoteNotificationListener remoteNotificationListener = new RemoteNotificationListener(listener);
+ ListenerRegistration listenerRegistration = restconfClientContext.getEventStreamContext(desiredEventStream.get(desiredEventStream.get(notificationStreamName))).registerNotificationListener(remoteNotificationListener);
+ SalNotificationRegistration salNotificationRegistration = new SalNotificationRegistration(listenerRegistration);
+ return salNotificationRegistration;
+ }
+
+ @Override
+ public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
+ //TODO implementation using sal-remote
+ String notificationStreamName = RemoteStreamTools.createNotificationStream(salRemoteService, null);
+ final Map<String,EventStreamInfo> desiredEventStream = RemoteStreamTools.createEventStream(restconfClientContext, notificationStreamName);
+ ListenerRegistration listenerRegistration = restconfClientContext.getEventStreamContext(desiredEventStream.get(desiredEventStream.get(notificationStreamName))).registerNotificationListener(listener);
+ return listenerRegistration;
+ }
+
+ private class SalNotificationRegistration<T extends Notification> implements Registration<NotificationListener<T>>{
+ private Registration registration;
+
+ public SalNotificationRegistration(ListenerRegistration listenerRegistration){
+ this.registration = listenerRegistration;
+ }
+
+ @Override
+ public NotificationListener<T> getInstance() {
+ return this.getInstance();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.registration.close();
+ }
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.broker.impl;
+
+import org.opendaylight.controller.sal.binding.api.NotificationService;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
+import org.opendaylight.yangtools.restconf.client.api.RestconfClientContext;
+
+public class RemoteServicesFactory {
+
+ private final RestconfClientContext restconfClientContext;
+
+ public RemoteServicesFactory(RestconfClientContext restconfClientContext){
+ this.restconfClientContext = restconfClientContext;
+ }
+
+ public DataBrokerService getDataBrokerService(){
+ return new DataBrokerServiceImpl(this.restconfClientContext);
+ }
+
+ public NotificationService getNotificationService(){
+ return new NotificationServiceImpl(this.restconfClientContext);
+ }
+
+ public RpcConsumerRegistry getRpcConsumerRegistry(){
+ return new RpcConsumerRegistryImpl(this.restconfClientContext);
+ }
+
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.sal.restconf.broker.impl;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.yangtools.restconf.client.api.RestconfClientContext;
import org.opendaylight.yangtools.yang.binding.RpcService;
public class RpcConsumerRegistryImpl implements RpcConsumerRegistry {
+
+ private RestconfClientContext restconfClientContext;
+
+ public RpcConsumerRegistryImpl(RestconfClientContext restconfClientContext){
+ this.restconfClientContext = restconfClientContext;
+ }
@Override
public <T extends RpcService> T getRpcService(Class<T> module) {
- //TODO implementation using restconf-client
- return null;
+ return restconfClientContext.getRpcServiceContext(module).getRpcService();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.broker.listeners;
+
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.restconf.broker.event.RemoteDataChangeEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.DataChangedNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.SalRemoteListener;
+
+public class RemoteDataChangeNotificationListener implements SalRemoteListener {
+
+
+ private final DataChangeListener dataChangeListener;
+
+ public RemoteDataChangeNotificationListener(DataChangeListener dataChangeListener){
+ this.dataChangeListener = dataChangeListener;
+ }
+ @Override
+ public void onDataChangedNotification(DataChangedNotification notification) {
+ this.dataChangeListener.onDataChanged(new RemoteDataChangeEvent(notification));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.broker.listeners;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+
+public class RemoteNotificationListener implements org.opendaylight.yangtools.yang.binding.NotificationListener {
+
+ org.opendaylight.controller.sal.binding.api.NotificationListener listener;
+
+ public RemoteNotificationListener(NotificationListener listener){
+ this.listener = listener;
+ }
+ public NotificationListener getListener(){
+ return this.listener;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.broker.listeners;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+
+public class SalNotificationListener implements NotificationListener {
+ private NotificationListener notificationListener;
+
+ public SalNotificationListener( NotificationListener notificationListener){
+ this.notificationListener = notificationListener;
+ }
+ @Override
+ public void onNotification(Notification notification) {
+ this.notificationListener.onNotification(notification);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.broker.tools;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.QName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.SalRemoteService;
+import org.opendaylight.yangtools.restconf.client.api.RestconfClientContext;
+import org.opendaylight.yangtools.restconf.client.api.event.EventStreamInfo;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RemoteStreamTools {
+ private static final Logger logger = LoggerFactory.getLogger(RemoteStreamTools.class.toString());
+
+ public static String createNotificationStream(SalRemoteService salRemoteService,List<QName> notifications){
+ CreateNotificationStreamInputBuilder notificationStreamInputBuilder = new CreateNotificationStreamInputBuilder();
+
+ if (null == notifications){
+ notificationStreamInputBuilder.setNotifications(notifications);
+ }
+
+ Future<RpcResult<CreateNotificationStreamOutput>> notificationStream = salRemoteService.createNotificationStream(notificationStreamInputBuilder.build());
+
+ String nofiticationStreamIdentifier = "";
+ try {
+ if (notificationStream.get().isSuccessful()){
+ nofiticationStreamIdentifier = notificationStream.get().getResult().getNotificationStreamIdentifier();
+ }
+ } catch (InterruptedException e) {
+ logger.trace("Interrupted while resolving notification stream identifier due to {}",e);
+ } catch (ExecutionException e) {
+ logger.trace("Execution exception while resolving notification stream identifier due to {}",e);
+ }
+ return nofiticationStreamIdentifier;
+ }
+
+ public static Map<String,EventStreamInfo> createEventStream(RestconfClientContext restconfClientContext, String desiredStreamName){
+ ListenableFuture<Set<EventStreamInfo>> availableEventStreams = restconfClientContext.getAvailableEventStreams();
+ final Map<String,EventStreamInfo> desiredEventStream = new HashMap<String,EventStreamInfo>();
+
+ try {
+ Iterator<EventStreamInfo> it = availableEventStreams.get().iterator();
+ while (it.hasNext()){
+ if (it.next().getIdentifier().equals(desiredStreamName)){
+ desiredEventStream.put(desiredStreamName,it.next());
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.trace("Resolving of event stream interrupted due to {}",e);
+ } catch (ExecutionException e) {
+ logger.trace("Resolving of event stream failed due to {}",e);
+ }
+ return desiredEventStream;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.broker.transactions;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+public class RemoteDataModificationTransaction implements DataModificationTransaction {
+ //TODO implement this
+
+ @Override
+ public Object getIdentifier() {
+ return null;
+ }
+
+ @Override
+ public TransactionStatus getStatus() {
+ return null;
+ }
+
+ @Override
+ public void putRuntimeData(InstanceIdentifier<? extends DataObject> path, DataObject data) {
+
+ }
+
+ @Override
+ public void putOperationalData(InstanceIdentifier<? extends DataObject> path, DataObject data) {
+
+ }
+
+ @Override
+ public void putConfigurationData(InstanceIdentifier<? extends DataObject> path, DataObject data) {
+
+ }
+
+ @Override
+ public void removeRuntimeData(InstanceIdentifier<? extends DataObject> path) {
+
+ }
+
+ @Override
+ public void removeOperationalData(InstanceIdentifier<? extends DataObject> path) {
+
+ }
+
+ @Override
+ public void removeConfigurationData(InstanceIdentifier<? extends DataObject> path) {
+
+ }
+
+ @Override
+ public Future<RpcResult<TransactionStatus>> commit() {
+ return null;
+ }
+
+ @Override
+ public ListenerRegistration<DataTransactionListener> registerListener(DataTransactionListener listener) {
+ return null;
+ }
+
+ @Override
+ public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedOperationalData() {
+ return null;
+ }
+
+ @Override
+ public Map<InstanceIdentifier<? extends DataObject>, DataObject> getCreatedConfigurationData() {
+ return null;
+ }
+
+ @Override
+ public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedOperationalData() {
+ return null;
+ }
+
+ @Override
+ public Map<InstanceIdentifier<? extends DataObject>, DataObject> getUpdatedConfigurationData() {
+ return null;
+ }
+
+ @Override
+ public Set<InstanceIdentifier<? extends DataObject>> getRemovedConfigurationData() {
+ return null;
+ }
+
+ @Override
+ public Set<InstanceIdentifier<? extends DataObject>> getRemovedOperationalData() {
+ return null;
+ }
+
+ @Override
+ public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalConfigurationData() {
+ return null;
+ }
+
+ @Override
+ public Map<InstanceIdentifier<? extends DataObject>, DataObject> getOriginalOperationalData() {
+ return null;
+ }
+
+ @Override
+ public DataObject readOperationalData(InstanceIdentifier<? extends DataObject> path) {
+ return null;
+ }
+
+ @Override
+ public DataObject readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
+ return null;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl.test;
+
+public class DataBrokerImplTest {
+
+ public static void main(String[] args){
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl.test;
+
+public class NotificationServiceImplTest {
+
+ public static void main(String[] args){
+
+ }
+}
--- /dev/null
+/*
+ * Copyright IBM Corporation, 2013. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Prefix;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.Layer3Match;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.layer._3.match.Ipv4Match;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Utility class for comparing flows.
+ */
+final class FlowComparator {
+ private final static Logger logger = LoggerFactory.getLogger(FlowComparator.class);
+
+ private FlowComparator() {
+
+ }
+
+ public static boolean flowEquals(Flow statsFlow, Flow storedFlow) {
+ if (statsFlow.getClass() != storedFlow.getClass()) {
+ return false;
+ }
+ if (statsFlow.getContainerName()== null) {
+ if (storedFlow.getContainerName()!= null) {
+ return false;
+ }
+ } else if(!statsFlow.getContainerName().equals(storedFlow.getContainerName())) {
+ return false;
+ }
+ if (statsFlow.getMatch()== null) {
+ if (storedFlow.getMatch() != null) {
+ return false;
+ }
+ } //else if(!statsFlow.getMatch().equals(storedFlow.getMatch())) {
+ else if(!matchEquals(statsFlow.getMatch(), storedFlow.getMatch())) {
+ return false;
+ }
+ if (storedFlow.getPriority() == null) {
+ if (statsFlow.getPriority() != null && statsFlow.getPriority()!= 0x8000) {
+ return false;
+ }
+ } else if(!statsFlow.getPriority().equals(storedFlow.getPriority())) {
+ return false;
+ }
+ if (statsFlow.getTableId() == null) {
+ if (storedFlow.getTableId() != null) {
+ return false;
+ }
+ } else if(!statsFlow.getTableId().equals(storedFlow.getTableId())) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Explicit equals method to compare the 'match' for flows stored in the data-stores and flow fetched from the switch.
+ * Flow installation process has three steps
+ * 1) Store flow in config data store
+ * 2) and send it to plugin for installation
+ * 3) Flow gets installed in switch
+ *
+ * The flow user wants to install and what finally gets installed in switch can be slightly different.
+ * E.g, If user installs flow with src/dst ip=10.0.0.1/24, when it get installed in the switch
+ * src/dst ip will be changes to 10.0.0.0/24 because of netmask of 24. When statistics manager fetch
+ * stats it gets 10.0.0.0/24 rather then 10.0.0.1/24. Custom match takes care of by using masked ip
+ * while comparing two ip addresses.
+ *
+ * Sometimes when user don't provide few values that is required by flow installation request, like
+ * priority,hard timeout, idle timeout, cookies etc, plugin usages default values before sending
+ * request to the switch. So when statistics manager gets flow statistics, it gets the default value.
+ * But the flow stored in config data store don't have those defaults value. I included those checks
+ * in the customer flow/match equal function.
+ *
+ *
+ * @param statsFlow
+ * @param storedFlow
+ * @return
+ */
+ public static boolean matchEquals(Match statsFlow, Match storedFlow) {
+ if (statsFlow == storedFlow) {
+ return true;
+ }
+ if (storedFlow.getClass() != statsFlow.getClass()) {
+ return false;
+ }
+ if (storedFlow.getEthernetMatch() == null) {
+ if (statsFlow.getEthernetMatch() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getEthernetMatch().equals(statsFlow.getEthernetMatch())) {
+ return false;
+ }
+ if (storedFlow.getIcmpv4Match()== null) {
+ if (statsFlow.getIcmpv4Match() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getIcmpv4Match().equals(statsFlow.getIcmpv4Match())) {
+ return false;
+ }
+ if (storedFlow.getIcmpv6Match() == null) {
+ if (statsFlow.getIcmpv6Match() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getIcmpv6Match().equals(statsFlow.getIcmpv6Match())) {
+ return false;
+ }
+ if (storedFlow.getInPhyPort() == null) {
+ if (statsFlow.getInPhyPort() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getInPhyPort().equals(statsFlow.getInPhyPort())) {
+ return false;
+ }
+ if (storedFlow.getInPort()== null) {
+ if (statsFlow.getInPort() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getInPort().equals(statsFlow.getInPort())) {
+ return false;
+ }
+ if (storedFlow.getIpMatch()== null) {
+ if (statsFlow.getIpMatch() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getIpMatch().equals(statsFlow.getIpMatch())) {
+ return false;
+ }
+ if (storedFlow.getLayer3Match()== null) {
+ if (statsFlow.getLayer3Match() != null) {
+ return false;
+ }
+ } else if(!layer3MatchEquals(statsFlow.getLayer3Match(),storedFlow.getLayer3Match())) {
+ return false;
+ }
+ if (storedFlow.getLayer4Match()== null) {
+ if (statsFlow.getLayer4Match() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getLayer4Match().equals(statsFlow.getLayer4Match())) {
+ return false;
+ }
+ if (storedFlow.getMetadata() == null) {
+ if (statsFlow.getMetadata() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getMetadata().equals(statsFlow.getMetadata())) {
+ return false;
+ }
+ if (storedFlow.getProtocolMatchFields() == null) {
+ if (statsFlow.getProtocolMatchFields() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getProtocolMatchFields().equals(statsFlow.getProtocolMatchFields())) {
+ return false;
+ }
+ if (storedFlow.getTunnel()== null) {
+ if (statsFlow.getTunnel() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getTunnel().equals(statsFlow.getTunnel())) {
+ return false;
+ }
+ if (storedFlow.getVlanMatch()== null) {
+ if (statsFlow.getVlanMatch() != null) {
+ return false;
+ }
+ } else if(!storedFlow.getVlanMatch().equals(statsFlow.getVlanMatch())) {
+ return false;
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ static boolean layer3MatchEquals(Layer3Match statsLayer3Match, Layer3Match storedLayer3Match){
+ boolean verdict = true;
+ if(statsLayer3Match instanceof Ipv4Match && storedLayer3Match instanceof Ipv4Match){
+ Ipv4Match statsIpv4Match = (Ipv4Match)statsLayer3Match;
+ Ipv4Match storedIpv4Match = (Ipv4Match)storedLayer3Match;
+
+ if (verdict) {
+ verdict = compareNullSafe(
+ storedIpv4Match.getIpv4Destination(), statsIpv4Match.getIpv4Destination());
+ }
+ if (verdict) {
+ verdict = compareNullSafe(
+ statsIpv4Match.getIpv4Source(), storedIpv4Match.getIpv4Source());
+ }
+ } else {
+ Boolean nullCheckOut = checkNullValues(storedLayer3Match, statsLayer3Match);
+ if (nullCheckOut != null) {
+ verdict = nullCheckOut;
+ } else {
+ verdict = storedLayer3Match.equals(statsLayer3Match);
+ }
+ }
+
+ return verdict;
+ }
+
+ private static boolean compareNullSafe(Ipv4Prefix statsIpv4, Ipv4Prefix storedIpv4) {
+ boolean verdict = true;
+ Boolean checkDestNullValuesOut = checkNullValues(storedIpv4, statsIpv4);
+ if (checkDestNullValuesOut != null) {
+ verdict = checkDestNullValuesOut;
+ } else if(!IpAddressEquals(statsIpv4, storedIpv4)){
+ verdict = false;
+ }
+
+ return verdict;
+ }
+
+ private static Boolean checkNullValues(Object v1, Object v2) {
+ Boolean verdict = null;
+ if (v1 == null && v2 != null) {
+ verdict = Boolean.FALSE;
+ } else if (v1 != null && v2 == null) {
+ verdict = Boolean.FALSE;
+ } else if (v1 == null && v2 == null) {
+ verdict = Boolean.TRUE;
+ }
+
+ return verdict;
+ }
+
+ /**
+ * TODO: why don't we use the default Ipv4Prefix.equals()?
+ *
+ * @param statsIpAddress
+ * @param storedIpAddress
+ * @return true if IPv4prefixes equals
+ */
+ private static boolean IpAddressEquals(Ipv4Prefix statsIpAddress, Ipv4Prefix storedIpAddress) {
+ IntegerIpAddress statsIpAddressInt = StrIpToIntIp(statsIpAddress.getValue());
+ IntegerIpAddress storedIpAddressInt = StrIpToIntIp(storedIpAddress.getValue());
+
+ if(IpAndMaskBasedMatch(statsIpAddressInt,storedIpAddressInt)){
+ return true;
+ }
+ if(IpBasedMatch(statsIpAddressInt,storedIpAddressInt)){
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean IpAndMaskBasedMatch(IntegerIpAddress statsIpAddressInt,IntegerIpAddress storedIpAddressInt){
+ return ((statsIpAddressInt.getIp() & statsIpAddressInt.getMask()) == (storedIpAddressInt.getIp() & storedIpAddressInt.getMask()));
+ }
+
+ private static boolean IpBasedMatch(IntegerIpAddress statsIpAddressInt,IntegerIpAddress storedIpAddressInt){
+ return (statsIpAddressInt.getIp() == storedIpAddressInt.getIp());
+ }
+
+ /**
+ * Method return integer version of ip address. Converted int will be mask if
+ * mask specified
+ */
+ private static IntegerIpAddress StrIpToIntIp(String ipAddresss){
+
+ String[] parts = ipAddresss.split("/");
+ String ip = parts[0];
+ int prefix;
+
+ if (parts.length < 2) {
+ prefix = 32;
+ } else {
+ prefix = Integer.parseInt(parts[1]);
+ }
+
+ IntegerIpAddress integerIpAddress = null;
+ try {
+ Inet4Address addr = (Inet4Address) InetAddress.getByName(ip);
+ byte[] addrBytes = addr.getAddress();
+ int ipInt = ((addrBytes[0] & 0xFF) << 24) |
+ ((addrBytes[1] & 0xFF) << 16) |
+ ((addrBytes[2] & 0xFF) << 8) |
+ ((addrBytes[3] & 0xFF) << 0);
+
+ int mask = 0xffffffff << 32 - prefix;
+
+ integerIpAddress = new IntegerIpAddress(ipInt, mask);
+ } catch (UnknownHostException e){
+ logger.error("Failed to determine host IP address by name: {}", e.getMessage(), e);
+ }
+
+ return integerIpAddress;
+ }
+
+ private static class IntegerIpAddress{
+ int ip;
+ int mask;
+ public IntegerIpAddress(int ip, int mask) {
+ this.ip = ip;
+ this.mask = mask;
+ }
+ public int getIp() {
+ return ip;
+ }
+ public int getMask() {
+ return mask;
+ }
+ }
+}
+++ /dev/null
-/*
- * Copyright IBM Corporation, 2013. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.statistics.manager;
-
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
-
-/**
- * Main responsibility of this class to clean up all the stale statistics data
- * associated to Flow,Meter,Group,Queue.
- * @author avishnoi@in.ibm.com
- *
- */
-public class NodeStatisticsAger {
-
- private final int NUMBER_OF_WAIT_CYCLES =2;
-
- private final StatisticsProvider statisticsProvider;
-
- private final NodeKey targetNodeKey;
-
- private final Map<GroupDescStats,Date> groupDescStatsUpdate
- = new ConcurrentHashMap<GroupDescStats,Date>();
-
- private final Map<MeterConfigStats,Date> meterConfigStatsUpdate
- = new ConcurrentHashMap<MeterConfigStats,Date>();
-
- private final Map<FlowEntry,Date> flowStatsUpdate
- = new ConcurrentHashMap<FlowEntry,Date>();
-
- private final Map<QueueEntry,Date> queuesStatsUpdate
- = new ConcurrentHashMap<QueueEntry,Date>();
-
- public NodeStatisticsAger(StatisticsProvider statisticsProvider, NodeKey nodeKey){
- this.targetNodeKey = nodeKey;
- this.statisticsProvider = statisticsProvider;
- }
-
- public class FlowEntry{
- private final Short tableId;
- private final Flow flow;
-
- public FlowEntry(Short tableId, Flow flow){
- this.tableId = tableId;
- this.flow = flow;
- }
-
- public Short getTableId() {
- return tableId;
- }
-
- public Flow getFlow() {
- return flow;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + getOuterType().hashCode();
- result = prime * result + ((flow == null) ? 0 : flow.hashCode());
- result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- FlowEntry other = (FlowEntry) obj;
- if (!getOuterType().equals(other.getOuterType()))
- return false;
- if (flow == null) {
- if (other.flow != null)
- return false;
- } else if (!flow.equals(other.flow))
- return false;
- if (tableId == null) {
- if (other.tableId != null)
- return false;
- } else if (!tableId.equals(other.tableId))
- return false;
- return true;
- }
-
- private NodeStatisticsAger getOuterType() {
- return NodeStatisticsAger.this;
- }
-
- }
-
- public class QueueEntry{
- private final NodeConnectorId nodeConnectorId;
- private final QueueId queueId;
- public QueueEntry(NodeConnectorId ncId, QueueId queueId){
- this.nodeConnectorId = ncId;
- this.queueId = queueId;
- }
- public NodeConnectorId getNodeConnectorId() {
- return nodeConnectorId;
- }
- public QueueId getQueueId() {
- return queueId;
- }
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + getOuterType().hashCode();
- result = prime * result + ((nodeConnectorId == null) ? 0 : nodeConnectorId.hashCode());
- result = prime * result + ((queueId == null) ? 0 : queueId.hashCode());
- return result;
- }
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof QueueEntry)) {
- return false;
- }
- QueueEntry other = (QueueEntry) obj;
- if (!getOuterType().equals(other.getOuterType())) {
- return false;
- }
- if (nodeConnectorId == null) {
- if (other.nodeConnectorId != null) {
- return false;
- }
- } else if (!nodeConnectorId.equals(other.nodeConnectorId)) {
- return false;
- }
- if (queueId == null) {
- if (other.queueId != null) {
- return false;
- }
- } else if (!queueId.equals(other.queueId)) {
- return false;
- }
- return true;
- }
- private NodeStatisticsAger getOuterType() {
- return NodeStatisticsAger.this;
- }
- }
-
- public NodeKey getTargetNodeKey() {
- return targetNodeKey;
- }
-
- public Map<GroupDescStats, Date> getGroupDescStatsUpdate() {
- return groupDescStatsUpdate;
- }
-
- public Map<MeterConfigStats, Date> getMeterConfigStatsUpdate() {
- return meterConfigStatsUpdate;
- }
-
- public Map<FlowEntry, Date> getFlowStatsUpdate() {
- return flowStatsUpdate;
- }
-
- public Map<QueueEntry, Date> getQueuesStatsUpdate() {
- return queuesStatsUpdate;
- }
-
- public void updateGroupDescStats(List<GroupDescStats> list){
- Date expiryTime = getExpiryTime();
- for(GroupDescStats groupDescStats : list)
- this.groupDescStatsUpdate.put(groupDescStats, expiryTime);
- }
-
- public void updateMeterConfigStats(List<MeterConfigStats> list){
- Date expiryTime = getExpiryTime();
- for(MeterConfigStats meterConfigStats: list)
- this.meterConfigStatsUpdate.put(meterConfigStats, expiryTime);
- }
-
- public void updateFlowStats(FlowEntry flowEntry){
- this.flowStatsUpdate.put(flowEntry, getExpiryTime());
- }
- public void updateQueueStats(QueueEntry queueEntry){
- this.queuesStatsUpdate.put(queueEntry, getExpiryTime());
- }
-
- private Date getExpiryTime(){
- Date expires = new Date();
- expires.setTime(expires.getTime()+StatisticsProvider.STATS_THREAD_EXECUTION_TIME*NUMBER_OF_WAIT_CYCLES);
- return expires;
- }
-
- public void cleanStaleStatistics(){
- //Clean stale statistics related to group
- for (Iterator<GroupDescStats> it = this.groupDescStatsUpdate.keySet().iterator();it.hasNext();){
- GroupDescStats groupDescStats = it.next();
- Date now = new Date();
- Date expiryTime = this.groupDescStatsUpdate.get(groupDescStats);
- if(now.after(expiryTime)){
- cleanGroupStatsFromDataStore(groupDescStats );
- it.remove();
- }
- }
-
- //Clean stale statistics related to meter
- for (Iterator<MeterConfigStats> it = this.meterConfigStatsUpdate.keySet().iterator();it.hasNext();){
- MeterConfigStats meterConfigStats = it.next();
- Date now = new Date();
- Date expiryTime = this.meterConfigStatsUpdate.get(meterConfigStats);
- if(now.after(expiryTime)){
- cleanMeterStatsFromDataStore(meterConfigStats);
- it.remove();
- }
- }
-
- //Clean stale statistics related to flow
- for (Iterator<FlowEntry> it = this.flowStatsUpdate.keySet().iterator();it.hasNext();){
- FlowEntry flowEntry = it.next();
- Date now = new Date();
- Date expiryTime = this.flowStatsUpdate.get(flowEntry);
- if(now.after(expiryTime)){
- cleanFlowStatsFromDataStore(flowEntry);
- it.remove();
- }
- }
-
- //Clean stale statistics related to queue
- for (Iterator<QueueEntry> it = this.queuesStatsUpdate.keySet().iterator();it.hasNext();){
- QueueEntry queueEntry = it.next();
- Date now = new Date();
- Date expiryTime = this.queuesStatsUpdate.get(queueEntry);
- if(now.after(expiryTime)){
- cleanQueueStatsFromDataStore(queueEntry);
- it.remove();
- }
- }
-
- }
-
- private void cleanQueueStatsFromDataStore(QueueEntry queueEntry) {
- InstanceIdentifier<?> queueRef
- = InstanceIdentifier.builder(Nodes.class)
- .child(Node.class, this.targetNodeKey)
- .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
- .augmentation(FlowCapableNodeConnector.class)
- .child(Queue.class, new QueueKey(queueEntry.getQueueId()))
- .augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
- cleanStaleStatisticsFromDataStore(queueRef);
- }
-
- private void cleanFlowStatsFromDataStore(FlowEntry flowEntry) {
- InstanceIdentifier<?> flowRef
- = InstanceIdentifier.builder(Nodes.class).child(Node.class, this.targetNodeKey)
- .augmentation(FlowCapableNode.class)
- .child(Table.class, new TableKey(flowEntry.getTableId()))
- .child(Flow.class,flowEntry.getFlow().getKey())
- .augmentation(FlowStatisticsData.class).toInstance();
-
- cleanStaleStatisticsFromDataStore(flowRef);
-
- }
-
- private void cleanMeterStatsFromDataStore(MeterConfigStats meterConfigStats) {
- InstanceIdentifierBuilder<Meter> meterRef
- = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
- .augmentation(FlowCapableNode.class)
- .child(Meter.class,new MeterKey(meterConfigStats.getMeterId()));
-
- InstanceIdentifier<?> nodeMeterConfigStatsAugmentation = meterRef.augmentation(NodeMeterConfigStats.class).toInstance();
-
- cleanStaleStatisticsFromDataStore(nodeMeterConfigStatsAugmentation);
-
- InstanceIdentifier<?> nodeMeterStatisticsAugmentation = meterRef.augmentation(NodeMeterStatistics.class).toInstance();
-
- cleanStaleStatisticsFromDataStore(nodeMeterStatisticsAugmentation);
-
- }
-
- private void cleanGroupStatsFromDataStore(GroupDescStats groupDescStats) {
- InstanceIdentifierBuilder<Group> groupRef
- = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
- .augmentation(FlowCapableNode.class)
- .child(Group.class,new GroupKey(groupDescStats.getGroupId()));
-
- InstanceIdentifier<?> nodeGroupDescStatsAugmentation = groupRef.augmentation(NodeGroupDescStats.class).toInstance();
-
- cleanStaleStatisticsFromDataStore(nodeGroupDescStatsAugmentation);
-
- InstanceIdentifier<?> nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance();
-
- cleanStaleStatisticsFromDataStore(nodeGroupStatisticsAugmentation);
- }
-
- private void cleanStaleStatisticsFromDataStore(InstanceIdentifier<? extends DataObject> ii){
- if(ii != null){
- DataModificationTransaction it = this.statisticsProvider.startChange();
- it.removeOperationalData(ii);
- it.commit();
- }
- }
-}
--- /dev/null
+/*
+ * Copyright IBM Corporation, 2013. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStatsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.desc.GroupDescBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStatsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterConfigStatsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.flow.capable.node.connector.queue.statistics.FlowCapableNodeConnectorQueueStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class handles the lifecycle of per-node statistics. It receives data
+ * from StatisticsListener, stores it in the data store and keeps track of
+ * when the data should be removed.
+ *
+ * @author avishnoi@in.ibm.com
+ */
+public class NodeStatisticsHandler {
+ private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
+ private static final int NUMBER_OF_WAIT_CYCLES = 2;
+
+ private final Map<GroupDescStats,Long> groupDescStatsUpdate = new HashMap<>();
+ private final Map<MeterConfigStats,Long> meterConfigStatsUpdate = new HashMap<>();
+ private final Map<FlowEntry,Long> flowStatsUpdate = new HashMap<>();
+ private final Map<QueueEntry,Long> queuesStatsUpdate = new HashMap<>();
+ private final InstanceIdentifier<Node> targetNodeIdentifier;
+ private final StatisticsProvider statisticsProvider;
+ private final NodeKey targetNodeKey;
+ private int unaccountedFlowsCounter = 1;
+
+ public NodeStatisticsHandler(StatisticsProvider statisticsProvider, NodeKey nodeKey){
+ this.statisticsProvider = Preconditions.checkNotNull(statisticsProvider);
+ this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
+ this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
+ }
+
+ private static class FlowEntry {
+ private final Short tableId;
+ private final Flow flow;
+
+ public FlowEntry(Short tableId, Flow flow){
+ this.tableId = tableId;
+ this.flow = flow;
+ }
+
+ public Short getTableId() {
+ return tableId;
+ }
+
+ public Flow getFlow() {
+ return flow;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((flow == null) ? 0 : flow.hashCode());
+ result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ FlowEntry other = (FlowEntry) obj;
+ if (flow == null) {
+ if (other.flow != null)
+ return false;
+ } else if (!flow.equals(other.flow))
+ return false;
+ if (tableId == null) {
+ if (other.tableId != null)
+ return false;
+ } else if (!tableId.equals(other.tableId))
+ return false;
+ return true;
+ }
+ }
+
+ private static final class QueueEntry{
+ private final NodeConnectorId nodeConnectorId;
+ private final QueueId queueId;
+ public QueueEntry(NodeConnectorId ncId, QueueId queueId){
+ this.nodeConnectorId = ncId;
+ this.queueId = queueId;
+ }
+ public NodeConnectorId getNodeConnectorId() {
+ return nodeConnectorId;
+ }
+ public QueueId getQueueId() {
+ return queueId;
+ }
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((nodeConnectorId == null) ? 0 : nodeConnectorId.hashCode());
+ result = prime * result + ((queueId == null) ? 0 : queueId.hashCode());
+ return result;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof QueueEntry)) {
+ return false;
+ }
+ QueueEntry other = (QueueEntry) obj;
+ if (nodeConnectorId == null) {
+ if (other.nodeConnectorId != null) {
+ return false;
+ }
+ } else if (!nodeConnectorId.equals(other.nodeConnectorId)) {
+ return false;
+ }
+ if (queueId == null) {
+ if (other.queueId != null) {
+ return false;
+ }
+ } else if (!queueId.equals(other.queueId)) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ public NodeKey getTargetNodeKey() {
+ return targetNodeKey;
+ }
+
+ public synchronized void updateGroupDescStats(List<GroupDescStats> list){
+ final Long expiryTime = getExpiryTime();
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+ for (GroupDescStats groupDescStats : list) {
+ GroupBuilder groupBuilder = new GroupBuilder();
+ GroupKey groupKey = new GroupKey(groupDescStats.getGroupId());
+ groupBuilder.setKey(groupKey);
+
+ InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Group.class,groupKey).toInstance();
+
+ NodeGroupDescStatsBuilder groupDesc= new NodeGroupDescStatsBuilder();
+ GroupDescBuilder stats = new GroupDescBuilder();
+ stats.fieldsFrom(groupDescStats);
+ groupDesc.setGroupDesc(stats.build());
+
+ //Update augmented data
+ groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
+
+ trans.putOperationalData(groupRef, groupBuilder.build());
+ this.groupDescStatsUpdate.put(groupDescStats, expiryTime);
+ }
+
+ trans.commit();
+ }
+
+
+ public synchronized void updateGroupStats(List<GroupStats> list) {
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+ for(GroupStats groupStats : list) {
+ GroupBuilder groupBuilder = new GroupBuilder();
+ GroupKey groupKey = new GroupKey(groupStats.getGroupId());
+ groupBuilder.setKey(groupKey);
+
+ InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Group.class,groupKey).toInstance();
+
+ NodeGroupStatisticsBuilder groupStatisticsBuilder= new NodeGroupStatisticsBuilder();
+ GroupStatisticsBuilder stats = new GroupStatisticsBuilder();
+ stats.fieldsFrom(groupStats);
+ groupStatisticsBuilder.setGroupStatistics(stats.build());
+
+ //Update augmented data
+ groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build());
+ trans.putOperationalData(groupRef, groupBuilder.build());
+
+ // FIXME: should we be tracking this data?
+ }
+
+ trans.commit();
+ }
+
+ public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
+ final Long expiryTime = getExpiryTime();
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+ for(MeterConfigStats meterConfigStats : list) {
+ MeterBuilder meterBuilder = new MeterBuilder();
+ MeterKey meterKey = new MeterKey(meterConfigStats.getMeterId());
+ meterBuilder.setKey(meterKey);
+
+ InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Meter.class,meterKey).toInstance();
+
+ NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder();
+ MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder();
+ stats.fieldsFrom(meterConfigStats);
+ meterConfig.setMeterConfigStats(stats.build());
+
+ //Update augmented data
+ meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
+
+ trans.putOperationalData(meterRef, meterBuilder.build());
+ this.meterConfigStatsUpdate.put(meterConfigStats, expiryTime);
+ }
+
+ trans.commit();
+ }
+
+
+ public synchronized void updateMeterStats(List<MeterStats> list) {
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+ for(MeterStats meterStats : list) {
+ MeterBuilder meterBuilder = new MeterBuilder();
+ MeterKey meterKey = new MeterKey(meterStats.getMeterId());
+ meterBuilder.setKey(meterKey);
+
+ InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Meter.class,meterKey).toInstance();
+
+ NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder();
+ MeterStatisticsBuilder stats = new MeterStatisticsBuilder();
+ stats.fieldsFrom(meterStats);
+ meterStatsBuilder.setMeterStatistics(stats.build());
+
+ //Update augmented data
+ meterBuilder.addAugmentation(NodeMeterStatistics.class, meterStatsBuilder.build());
+ trans.putOperationalData(meterRef, meterBuilder.build());
+
+ // FIXME: should we be tracking this data?
+ }
+
+ trans.commit();
+ }
+
+ public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
+ final Long expiryTime = getExpiryTime();
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+ for (QueueIdAndStatisticsMap swQueueStats : list) {
+
+ QueueEntry queueEntry = new QueueEntry(swQueueStats.getNodeConnectorId(),swQueueStats.getQueueId());
+
+ FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
+
+ FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
+
+ queueStatisticsBuilder.fieldsFrom(swQueueStats);
+
+ queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
+
+ InstanceIdentifier<Queue> queueRef
+ = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class, targetNodeKey)
+ .child(NodeConnector.class, new NodeConnectorKey(swQueueStats.getNodeConnectorId()))
+ .augmentation(FlowCapableNodeConnector.class)
+ .child(Queue.class, new QueueKey(swQueueStats.getQueueId())).toInstance();
+
+ QueueBuilder queueBuilder = new QueueBuilder();
+ FlowCapableNodeConnectorQueueStatisticsData qsd = queueStatisticsDataBuilder.build();
+ queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, qsd);
+ queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
+
+ logger.debug("Augmenting queue statistics {} of queue {} to port {}",
+ qsd,
+ swQueueStats.getQueueId(),
+ swQueueStats.getNodeConnectorId());
+
+ trans.putOperationalData(queueRef, queueBuilder.build());
+ this.queuesStatsUpdate.put(queueEntry, expiryTime);
+ }
+
+ trans.commit();
+ }
+
+ public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+ for (FlowTableAndStatisticsMap ftStats : list) {
+
+ InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
+ .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(ftStats.getTableId().getValue())).toInstance();
+
+ FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder();
+
+ FlowTableStatisticsBuilder statisticsBuilder = new FlowTableStatisticsBuilder();
+ statisticsBuilder.setActiveFlows(ftStats.getActiveFlows());
+ statisticsBuilder.setPacketsLookedUp(ftStats.getPacketsLookedUp());
+ statisticsBuilder.setPacketsMatched(ftStats.getPacketsMatched());
+
+ final FlowTableStatistics stats = statisticsBuilder.build();
+ statisticsDataBuilder.setFlowTableStatistics(stats);
+
+ logger.debug("Augment flow table statistics: {} for table {} on Node {}",
+ stats,ftStats.getTableId(), targetNodeKey);
+
+ TableBuilder tableBuilder = new TableBuilder();
+ tableBuilder.setKey(new TableKey(ftStats.getTableId().getValue()));
+ tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build());
+ trans.putOperationalData(tableRef, tableBuilder.build());
+
+ // FIXME: should we be tracking this data?
+ }
+
+ trans.commit();
+ }
+
+ public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+ for(NodeConnectorStatisticsAndPortNumberMap portStats : list) {
+
+ FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder
+ = new FlowCapableNodeConnectorStatisticsBuilder();
+ statisticsBuilder.setBytes(portStats.getBytes());
+ statisticsBuilder.setCollisionCount(portStats.getCollisionCount());
+ statisticsBuilder.setDuration(portStats.getDuration());
+ statisticsBuilder.setPackets(portStats.getPackets());
+ statisticsBuilder.setReceiveCrcError(portStats.getReceiveCrcError());
+ statisticsBuilder.setReceiveDrops(portStats.getReceiveDrops());
+ statisticsBuilder.setReceiveErrors(portStats.getReceiveErrors());
+ statisticsBuilder.setReceiveFrameError(portStats.getReceiveFrameError());
+ statisticsBuilder.setReceiveOverRunError(portStats.getReceiveOverRunError());
+ statisticsBuilder.setTransmitDrops(portStats.getTransmitDrops());
+ statisticsBuilder.setTransmitErrors(portStats.getTransmitErrors());
+
+ //Augment data to the node-connector
+ FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder =
+ new FlowCapableNodeConnectorStatisticsDataBuilder();
+
+ statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build());
+
+ InstanceIdentifier<NodeConnector> nodeConnectorRef = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class, targetNodeKey)
+ .child(NodeConnector.class, new NodeConnectorKey(portStats.getNodeConnectorId())).toInstance();
+
+ // FIXME: can we bypass this read?
+ NodeConnector nodeConnector = (NodeConnector)trans.readOperationalData(nodeConnectorRef);
+ if(nodeConnector != null){
+ final FlowCapableNodeConnectorStatisticsData stats = statisticsDataBuilder.build();
+ logger.debug("Augmenting port statistics {} to port {}",stats,nodeConnectorRef.toString());
+ NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder();
+ nodeConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, stats);
+ trans.putOperationalData(nodeConnectorRef, nodeConnectorBuilder.build());
+ }
+
+ // FIXME: should we be tracking this data?
+ }
+
+ trans.commit();
+ }
+
+ public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
+ if (tableId != null) {
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+
+ InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
+ .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
+
+ AggregateFlowStatisticsDataBuilder aggregateFlowStatisticsDataBuilder = new AggregateFlowStatisticsDataBuilder();
+ AggregateFlowStatisticsBuilder aggregateFlowStatisticsBuilder = new AggregateFlowStatisticsBuilder(flowStats);
+
+ aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
+
+ logger.debug("Augment aggregate statistics: {} for table {} on Node {}",
+ aggregateFlowStatisticsBuilder.build().toString(),tableId,targetNodeKey);
+
+ TableBuilder tableBuilder = new TableBuilder();
+ tableBuilder.setKey(new TableKey(tableId));
+ tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
+ trans.putOperationalData(tableRef, tableBuilder.build());
+
+ // FIXME: should we be tracking this data?
+ trans.commit();
+ }
+ }
+
+ public synchronized void updateGroupFeatures(GroupFeatures notification) {
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+ final NodeBuilder nodeData = new NodeBuilder();
+ nodeData.setKey(targetNodeKey);
+
+ NodeGroupFeaturesBuilder nodeGroupFeatures = new NodeGroupFeaturesBuilder();
+ GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder(notification);
+ nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
+
+ //Update augmented data
+ nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
+ trans.putOperationalData(targetNodeIdentifier, nodeData.build());
+
+ // FIXME: should we be tracking this data?
+ trans.commit();
+ }
+
+ public synchronized void updateMeterFeatures(MeterFeatures features) {
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+ final NodeBuilder nodeData = new NodeBuilder();
+ nodeData.setKey(targetNodeKey);
+
+ NodeMeterFeaturesBuilder nodeMeterFeatures = new NodeMeterFeaturesBuilder();
+ MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder(features);
+ nodeMeterFeatures.setMeterFeatures(meterFeature.build());
+
+ //Update augmented data
+ nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
+ trans.putOperationalData(targetNodeIdentifier, nodeData.build());
+
+ // FIXME: should we be tracking this data?
+ trans.commit();
+ }
+
+ public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
+ final Long expiryTime = getExpiryTime();
+ final DataModificationTransaction trans = statisticsProvider.startChange();
+
+ for(FlowAndStatisticsMapList map : list) {
+ short tableId = map.getTableId();
+ boolean foundOriginalFlow = false;
+
+ FlowBuilder flowBuilder = new FlowBuilder();
+
+ FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
+
+ FlowBuilder flow = new FlowBuilder();
+ flow.setContainerName(map.getContainerName());
+ flow.setBufferId(map.getBufferId());
+ flow.setCookie(map.getCookie());
+ flow.setCookieMask(map.getCookieMask());
+ flow.setFlags(map.getFlags());
+ flow.setFlowName(map.getFlowName());
+ flow.setHardTimeout(map.getHardTimeout());
+ if(map.getFlowId() != null)
+ flow.setId(new FlowId(map.getFlowId().getValue()));
+ flow.setIdleTimeout(map.getIdleTimeout());
+ flow.setInstallHw(map.isInstallHw());
+ flow.setInstructions(map.getInstructions());
+ if(map.getFlowId()!= null)
+ flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
+ flow.setMatch(map.getMatch());
+ flow.setOutGroup(map.getOutGroup());
+ flow.setOutPort(map.getOutPort());
+ flow.setPriority(map.getPriority());
+ flow.setStrict(map.isStrict());
+ flow.setTableId(tableId);
+
+ Flow flowRule = flow.build();
+
+ FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
+ stats.setByteCount(map.getByteCount());
+ stats.setPacketCount(map.getPacketCount());
+ stats.setDuration(map.getDuration());
+
+ GenericStatistics flowStats = stats.build();
+
+ //Augment the data to the flow node
+
+ FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
+ flowStatistics.setByteCount(flowStats.getByteCount());
+ flowStatistics.setPacketCount(flowStats.getPacketCount());
+ flowStatistics.setDuration(flowStats.getDuration());
+ flowStatistics.setContainerName(map.getContainerName());
+ flowStatistics.setBufferId(map.getBufferId());
+ flowStatistics.setCookie(map.getCookie());
+ flowStatistics.setCookieMask(map.getCookieMask());
+ flowStatistics.setFlags(map.getFlags());
+ flowStatistics.setFlowName(map.getFlowName());
+ flowStatistics.setHardTimeout(map.getHardTimeout());
+ flowStatistics.setIdleTimeout(map.getIdleTimeout());
+ flowStatistics.setInstallHw(map.isInstallHw());
+ flowStatistics.setInstructions(map.getInstructions());
+ flowStatistics.setMatch(map.getMatch());
+ flowStatistics.setOutGroup(map.getOutGroup());
+ flowStatistics.setOutPort(map.getOutPort());
+ flowStatistics.setPriority(map.getPriority());
+ flowStatistics.setStrict(map.isStrict());
+ flowStatistics.setTableId(tableId);
+
+ flowStatisticsData.setFlowStatistics(flowStatistics.build());
+
+ logger.debug("Flow : {}",flowRule.toString());
+ logger.debug("Statistics to augment : {}",flowStatistics.build().toString());
+
+ InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
+ .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
+
+ Table table= (Table)trans.readConfigurationData(tableRef);
+
+ //TODO: Not a good way to do it, need to figure out better way.
+ //TODO: major issue in any alternate approach is that flow key is incrementally assigned
+ //to the flows stored in data store.
+ // Augment same statistics to all the matching masked flow
+ if(table != null){
+
+ for(Flow existingFlow : table.getFlow()){
+ logger.debug("Existing flow in data store : {}",existingFlow.toString());
+ if(FlowComparator.flowEquals(flowRule,existingFlow)){
+ InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(tableId))
+ .child(Flow.class,existingFlow.getKey()).toInstance();
+ flowBuilder.setKey(existingFlow.getKey());
+ flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+ logger.debug("Found matching flow in the datastore, augmenting statistics");
+ foundOriginalFlow = true;
+ // Update entry with timestamp of latest response
+ flow.setKey(existingFlow.getKey());
+ FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
+ flowStatsUpdate.put(flowStatsEntry, expiryTime);
+
+ trans.putOperationalData(flowRef, flowBuilder.build());
+ }
+ }
+ }
+
+ table = (Table)trans.readOperationalData(tableRef);
+ if(!foundOriginalFlow && table != null){
+
+ for(Flow existingFlow : table.getFlow()){
+ FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
+ if(augmentedflowStatisticsData != null){
+ FlowBuilder existingOperationalFlow = new FlowBuilder();
+ existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
+ logger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
+ if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
+ InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(tableId))
+ .child(Flow.class,existingFlow.getKey()).toInstance();
+ flowBuilder.setKey(existingFlow.getKey());
+ flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+ logger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
+ foundOriginalFlow = true;
+
+ // Update entry with timestamp of latest response
+ flow.setKey(existingFlow.getKey());
+ FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
+ flowStatsUpdate.put(flowStatsEntry, expiryTime);
+ trans.putOperationalData(flowRef, flowBuilder.build());
+ break;
+ }
+ }
+ }
+ }
+ if(!foundOriginalFlow){
+ String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
+ this.unaccountedFlowsCounter++;
+ FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
+ InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(tableId))
+ .child(Flow.class,newFlowKey).toInstance();
+ flowBuilder.setKey(newFlowKey);
+ flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+ logger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",
+ flowBuilder.build());
+
+ // Update entry with timestamp of latest response
+ flow.setKey(newFlowKey);
+ FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
+ flowStatsUpdate.put(flowStatsEntry, expiryTime);
+ trans.putOperationalData(flowRef, flowBuilder.build());
+ }
+ }
+
+ trans.commit();
+ }
+
+ private static Long getExpiryTime(){
+ final long now = System.nanoTime();
+ return now + TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_THREAD_EXECUTION_TIME * NUMBER_OF_WAIT_CYCLES);
+ }
+
+ public synchronized void cleanStaleStatistics(){
+ final DataModificationTransaction trans = this.statisticsProvider.startChange();
+ final long now = System.nanoTime();
+
+ //Clean stale statistics related to group
+ for (Iterator<Entry<GroupDescStats, Long>> it = this.groupDescStatsUpdate.entrySet().iterator();it.hasNext();){
+ Entry<GroupDescStats, Long> e = it.next();
+ if (now > e.getValue()) {
+ cleanGroupStatsFromDataStore(trans, e.getKey());
+ it.remove();
+ }
+ }
+
+ //Clean stale statistics related to meter
+ for (Iterator<Entry<MeterConfigStats, Long>> it = this.meterConfigStatsUpdate.entrySet().iterator();it.hasNext();){
+ Entry<MeterConfigStats, Long> e = it.next();
+ if (now > e.getValue()) {
+ cleanMeterStatsFromDataStore(trans, e.getKey());
+ it.remove();
+ }
+ }
+
+ //Clean stale statistics related to flow
+ for (Iterator<Entry<FlowEntry, Long>> it = this.flowStatsUpdate.entrySet().iterator();it.hasNext();){
+ Entry<FlowEntry, Long> e = it.next();
+ if (now > e.getValue()) {
+ cleanFlowStatsFromDataStore(trans, e.getKey());
+ it.remove();
+ }
+ }
+
+ //Clean stale statistics related to queue
+ for (Iterator<Entry<QueueEntry, Long>> it = this.queuesStatsUpdate.entrySet().iterator();it.hasNext();){
+ Entry<QueueEntry, Long> e = it.next();
+ if (now > e.getValue()) {
+ cleanQueueStatsFromDataStore(trans, e.getKey());
+ it.remove();
+ }
+ }
+
+ trans.commit();
+ }
+
+ private void cleanQueueStatsFromDataStore(DataModificationTransaction trans, QueueEntry queueEntry) {
+ InstanceIdentifier<?> queueRef
+ = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class, this.targetNodeKey)
+ .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
+ .augmentation(FlowCapableNodeConnector.class)
+ .child(Queue.class, new QueueKey(queueEntry.getQueueId()))
+ .augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
+ trans.removeOperationalData(queueRef);
+ }
+
+ private void cleanFlowStatsFromDataStore(DataModificationTransaction trans, FlowEntry flowEntry) {
+ InstanceIdentifier<?> flowRef
+ = InstanceIdentifier.builder(Nodes.class).child(Node.class, this.targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(flowEntry.getTableId()))
+ .child(Flow.class,flowEntry.getFlow().getKey())
+ .augmentation(FlowStatisticsData.class).toInstance();
+ trans.removeOperationalData(flowRef);
+ }
+
+ private void cleanMeterStatsFromDataStore(DataModificationTransaction trans, MeterConfigStats meterConfigStats) {
+ InstanceIdentifierBuilder<Meter> meterRef
+ = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Meter.class,new MeterKey(meterConfigStats.getMeterId()));
+
+ InstanceIdentifier<?> nodeMeterConfigStatsAugmentation = meterRef.augmentation(NodeMeterConfigStats.class).toInstance();
+ trans.removeOperationalData(nodeMeterConfigStatsAugmentation);
+
+ InstanceIdentifier<?> nodeMeterStatisticsAugmentation = meterRef.augmentation(NodeMeterStatistics.class).toInstance();
+ trans.removeOperationalData(nodeMeterStatisticsAugmentation);
+ }
+
+ private void cleanGroupStatsFromDataStore(DataModificationTransaction trans, GroupDescStats groupDescStats) {
+ InstanceIdentifierBuilder<Group> groupRef
+ = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Group.class,new GroupKey(groupDescStats.getGroupId()));
+
+ InstanceIdentifier<?> nodeGroupDescStatsAugmentation = groupRef.augmentation(NodeGroupDescStats.class).toInstance();
+ trans.removeOperationalData(nodeGroupDescStatsAugmentation);
+
+ InstanceIdentifier<?> nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance();
+ trans.removeOperationalData(nodeGroupStatisticsAugmentation);
+ }
+}
--- /dev/null
+/*
+ * Copyright IBM Corporation, 2013. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterStatisticsUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for listening for statistics update notifications and
+ * routing them to the appropriate NodeStatisticsHandler.
+
+ * TODO: Need to add error message listener and clean-up the associated tx id
+ * if it exists in the tx-id cache.
+ * @author vishnoianil
+ */
+public class StatisticsListener implements OpendaylightGroupStatisticsListener,
+ OpendaylightMeterStatisticsListener,
+ OpendaylightFlowStatisticsListener,
+ OpendaylightPortStatisticsListener,
+ OpendaylightFlowTableStatisticsListener,
+ OpendaylightQueueStatisticsListener{
+
+ private final static Logger sucLogger = LoggerFactory.getLogger(StatisticsListener.class);
+ private final StatisticsProvider statisticsManager;
+ private final MultipartMessageManager messageManager;
+
+ /**
+ * default ctor
+ * @param manager
+ */
+ public StatisticsListener(final StatisticsProvider manager){
+ this.statisticsManager = manager;
+ this.messageManager = this.statisticsManager.getMultipartMessageManager();
+ }
+
+ @Override
+ public void onMeterConfigStatsUpdated(final MeterConfigStatsUpdated notification) {
+ //Check if response is for the request statistics-manager sent.
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
+ return;
+
+ //Add statistics to local cache
+ final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
+ if (handler != null) {
+ handler.updateMeterConfigStats(notification.getMeterConfigStats());
+ }
+ }
+
+ @Override
+ public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
+ //Check if response is for the request statistics-manager sent.
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
+ return;
+
+ //Add statistics to local cache
+ final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
+ if (handler != null) {
+ handler.updateMeterStats(notification.getMeterStats());
+ }
+ }
+
+ @Override
+ public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
+ //Check if response is for the request statistics-manager sent.
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
+ return;
+
+ final NodeStatisticsHandler handler = statisticsManager.getStatisticsHandler(notification.getId());
+ if (handler != null) {
+ handler.updateGroupDescStats(notification.getGroupDescStats());
+ }
+ }
+
+ @Override
+ public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
+ //Check if response is for the request statistics-manager sent.
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
+ return;
+
+ final NodeStatisticsHandler handler = statisticsManager.getStatisticsHandler(notification.getId());
+ if (handler != null) {
+ handler.updateGroupStats(notification.getGroupStats());
+ }
+ }
+
+ @Override
+ public void onMeterFeaturesUpdated(MeterFeaturesUpdated notification) {
+ final NodeStatisticsHandler sna = this.statisticsManager.getStatisticsHandler(notification.getId());
+ if (sna != null) {
+ sna.updateMeterFeatures(notification);
+ }
+ }
+
+ @Override
+ public void onGroupFeaturesUpdated(GroupFeaturesUpdated notification) {
+ final NodeStatisticsHandler sna = this.statisticsManager.getStatisticsHandler(notification.getId());
+ if (sna != null) {
+ sna.updateGroupFeatures(notification);
+ }
+ }
+
+ @Override
+ public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
+ //Check if response is for the request statistics-manager sent.
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
+ return;
+
+ sucLogger.debug("Received flow stats update : {}",notification.toString());
+ final NodeStatisticsHandler sna = this.statisticsManager.getStatisticsHandler(notification.getId());
+ if (sna != null) {
+ sna.updateFlowStats(notification.getFlowAndStatisticsMapList());
+ }
+ }
+
+ @Override
+ public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
+ //Check if response is for the request statistics-manager sent.
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
+ return;
+
+ final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
+ if (handler != null) {
+ final Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId());
+ handler.updateAggregateFlowStats(tableId, notification);
+ }
+ }
+
+ @Override
+ public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
+ //Check if response is for the request statistics-manager sent.
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
+ return;
+
+ final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
+ if (handler != null) {
+ handler.updateNodeConnectorStats(notification.getNodeConnectorStatisticsAndPortNumberMap());
+ }
+ }
+
+ @Override
+ public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
+ //Check if response is for the request statistics-manager sent.
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
+ return;
+
+ final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
+ if (handler != null) {
+ handler.updateFlowTableStats(notification.getFlowTableAndStatisticsMap());
+ }
+ }
+
+ @Override
+ public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
+ //Check if response is for the request statistics-manager sent.
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
+ return;
+
+ //Add statistics to local cache
+ final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
+ if (handler != null) {
+ handler.updateQueueStats(notification.getQueueIdAndStatisticsMap());
+ }
+ }
+}
+
import org.osgi.framework.BundleContext;
public class StatisticsManagerActivator extends AbstractBindingAwareProvider {
+ private StatisticsProvider statsProvider;
- private static ProviderContext pSession;
-
- private static StatisticsProvider statsProvider = new StatisticsProvider();
-
@Override
public void onSessionInitiated(ProviderContext session) {
-
- pSession = session;
- DataProviderService dps = session.<DataProviderService>getSALService(DataProviderService.class);
- StatisticsManagerActivator.statsProvider.setDataService(dps);
- DataBrokerService dbs = session.<DataBrokerService>getSALService(DataBrokerService.class);
- StatisticsManagerActivator.statsProvider.setDataBrokerService(dbs);
- NotificationProviderService nps = session.<NotificationProviderService>getSALService(NotificationProviderService.class);
- StatisticsManagerActivator.statsProvider.setNotificationService(nps);
- StatisticsManagerActivator.statsProvider.start();
+ final DataBrokerService dbs = session.getSALService(DataBrokerService.class);
+ final DataProviderService dps = session.getSALService(DataProviderService.class);
+ final NotificationProviderService nps = session.getSALService(NotificationProviderService.class);
+ statsProvider = new StatisticsProvider(dps);
+ statsProvider.start(dbs, nps, session);
}
-
+
@Override
protected void stopImpl(BundleContext context) {
- StatisticsManagerActivator.statsProvider.close();
- }
-
- public static ProviderContext getProviderContext(){
- return pSession;
+ if (statsProvider != null) {
+ statsProvider.close();
+ statsProvider = null;
+ }
}
-
}
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
+import com.google.common.base.Preconditions;
+
+/**
* Following are main responsibilities of the class:
- * 1) Invoke statistics request thread to send periodic statistics request to all the
- * flow capable switch connected to the controller. It sends statistics request for
- * Group,Meter,Table,Flow,Queue,Aggregate stats.
- *
- * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
+ * 1) Invoke statistics request thread to send periodic statistics request to all the
+ * flow capable switch connected to the controller. It sends statistics request for
+ * Group,Meter,Table,Flow,Queue,Aggregate stats.
+ *
+ * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
* operational data store.
- *
+ *
* @author avishnoi@in.ibm.com
*
*/
public class StatisticsProvider implements AutoCloseable {
+ public static final int STATS_THREAD_EXECUTION_TIME= 15000;
- public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
-
- private DataProviderService dps;
-
- private DataBrokerService dbs;
+ private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
+
+ private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
+ private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
+ private final DataProviderService dps;
+
+ //Local caching of stats
+ private final ConcurrentMap<NodeId,NodeStatisticsHandler> statisticsCache = new ConcurrentHashMap<>();
- private NotificationProviderService nps;
-
private OpendaylightGroupStatisticsService groupStatsService;
-
+
private OpendaylightMeterStatisticsService meterStatsService;
-
+
private OpendaylightFlowStatisticsService flowStatsService;
-
+
private OpendaylightPortStatisticsService portStatsService;
private OpendaylightFlowTableStatisticsService flowTableStatsService;
private OpendaylightQueueStatisticsService queueStatsService;
- private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
-
private StatisticsUpdateHandler statsUpdateHandler;
-
+
private Thread statisticsRequesterThread;
-
+
private Thread statisticsAgerThread;
- private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
-
- public static final int STATS_THREAD_EXECUTION_TIME= 15000;
- //Local caching of stats
-
- private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache =
- new ConcurrentHashMap<NodeId,NodeStatisticsAger>();
-
- public DataProviderService getDataService() {
- return this.dps;
- }
-
- public void setDataService(final DataProviderService dataService) {
- this.dps = dataService;
- }
-
- public DataBrokerService getDataBrokerService() {
- return this.dbs;
- }
-
- public void setDataBrokerService(final DataBrokerService dataBrokerService) {
- this.dbs = dataBrokerService;
- }
- public NotificationProviderService getNotificationService() {
- return this.nps;
- }
-
- public void setNotificationService(final NotificationProviderService notificationService) {
- this.nps = notificationService;
+ public StatisticsProvider(final DataProviderService dataService) {
+ this.dps = Preconditions.checkNotNull(dataService);
}
public MultipartMessageManager getMultipartMessageManager() {
return multipartMessageManager;
}
- private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
-
+ private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
+
private Registration<NotificationListener> listenerRegistration;
-
- public void start() {
-
- NotificationProviderService nps = this.getNotificationService();
- Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
- this.listenerRegistration = registerNotificationListener;
-
+
+ public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
+
+ this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
+
statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
-
- registerDataStoreUpdateListener(this.getDataBrokerService());
-
+ registerDataStoreUpdateListener(dbs);
+
// Get Group/Meter statistics service instance
- groupStatsService = StatisticsManagerActivator.getProviderContext().
- getRpcService(OpendaylightGroupStatisticsService.class);
-
- meterStatsService = StatisticsManagerActivator.getProviderContext().
- getRpcService(OpendaylightMeterStatisticsService.class);
-
- flowStatsService = StatisticsManagerActivator.getProviderContext().
- getRpcService(OpendaylightFlowStatisticsService.class);
-
- portStatsService = StatisticsManagerActivator.getProviderContext().
- getRpcService(OpendaylightPortStatisticsService.class);
-
- flowTableStatsService = StatisticsManagerActivator.getProviderContext().
- getRpcService(OpendaylightFlowTableStatisticsService.class);
-
- queueStatsService = StatisticsManagerActivator.getProviderContext().
- getRpcService(OpendaylightQueueStatisticsService.class);
-
+ groupStatsService = rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class);
+ meterStatsService = rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class);
+ flowStatsService = rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class);
+ portStatsService = rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class);
+ flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
+ queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
+
statisticsRequesterThread = new Thread( new Runnable(){
@Override
while(true){
try {
statsRequestSender();
-
+
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
spLogger.error("Exception occurred while sending stats request : {}",e);
}
}
});
-
+
spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
-
+
statisticsRequesterThread.start();
-
+
statisticsAgerThread = new Thread( new Runnable(){
@Override
public void run() {
while(true){
try {
- for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
+ for(NodeStatisticsHandler nodeStatisticsAger : statisticsCache.values()){
nodeStatisticsAger.cleanStaleStatistics();
}
multipartMessageManager.cleanStaleTransactionIds();
-
+
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
spLogger.error("Exception occurred while sending stats request : {}",e);
}
}
});
-
+
spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
statisticsAgerThread.start();
-
+
spLogger.info("Statistics Provider started.");
}
-
+
private void registerDataStoreUpdateListener(DataBrokerService dbs) {
//Register for Node updates
InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
.child(Table.class)
.child(Flow.class).toInstance();
dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
-
+
//Register for meter updates
InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
.augmentation(FlowCapableNode.class)
.child(Meter.class).toInstance();
dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
-
- //Register for group updates
+
+ //Register for group updates
InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
.augmentation(FlowCapableNode.class)
.child(Group.class).toInstance();
}
protected DataModificationTransaction startChange() {
-
- DataProviderService dps = this.getDataService();
return dps.beginTransaction();
}
-
+
private void statsRequestSender(){
-
+
List<Node> targetNodes = getAllConnectedNodes();
-
+
if(targetNodes == null)
return;
-
+
for (Node targetNode : targetNodes){
-
+
if(targetNode.getAugmentation(FlowCapableNode.class) != null){
sendStatisticsRequestsToNode(targetNode);
}
}
}
-
+
public void sendStatisticsRequestsToNode(Node targetNode){
-
+
spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
-
+
InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
-
+
NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-
+
try{
if(flowStatsService != null){
sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
spLogger.error("Exception occured while sending statistics requests : {}", e);
}
}
-
+
public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
- final GetFlowTablesStatisticsInputBuilder input =
+ final GetFlowTablesStatisticsInputBuilder input =
new GetFlowTablesStatisticsInputBuilder();
-
+
input.setNode(targetNodeRef);
- Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
+ Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
flowTableStatsService.getFlowTablesStatistics(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
-
+
input.setNode(targetNode);
-
- Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
+
+ Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
-
+
}
-
+
public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
final GetFlowStatisticsFromFlowTableInputBuilder input =
new GetFlowStatisticsFromFlowTableInputBuilder();
-
+
input.setNode(targetNode);
input.fieldsFrom(flow);
-
- Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
+
+ Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
flowStatsService.getFlowStatisticsFromFlowTable(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
-
+
}
public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
-
+
List<Short> tablesId = getTablesFromNode(targetNodeKey);
-
+
if(tablesId.size() != 0){
for(Short id : tablesId){
-
+
sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
}
}else{
spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
}
}
-
+
public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
-
+
spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
- GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
+ GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-
+
input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
- Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
+ Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
-
+
multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
, StatsRequestType.AGGR_FLOW);
}
public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+
final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
+ Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
portStatsService.getAllNodeConnectorsStatistics(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_PORT);
}
public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+
final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllGroupStatisticsOutput>> response =
+ Future<RpcResult<GetAllGroupStatisticsOutput>> response =
groupStatsService.getAllGroupStatistics(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_GROUP);
}
-
+
public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetGroupDescriptionOutput>> response =
+ Future<RpcResult<GetGroupDescriptionOutput>> response =
groupStatsService.getGroupDescription(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.GROUP_DESC);
}
-
+
public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+
GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllMeterStatisticsOutput>> response =
+ Future<RpcResult<GetAllMeterStatisticsOutput>> response =
meterStatsService.getAllMeterStatistics(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_METER);;
}
-
+
public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+
GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
+ Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
meterStatsService.getAllMeterConfigStatistics(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.METER_CONFIG);;
}
-
+
public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
-
+
input.setNode(targetNode);
-
- Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
+
+ Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
-
+
input.setNode(targetNode);
input.setNodeConnectorId(nodeConnectorId);
input.setQueueId(queueId);
- Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
+ Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
queueStatsService.getQueueStatisticsFromGivenPort(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
}
- public ConcurrentMap<NodeId, NodeStatisticsAger> getStatisticsCache() {
- return statisticsCache;
+ /**
+ * Get the handler for a particular node.
+ *
+ * @param nodeId source node
+ * @return Node statistics handler for that node. Null if the statistics should
+ * not handled.
+ */
+ public final NodeStatisticsHandler getStatisticsHandler(final NodeId nodeId) {
+ Preconditions.checkNotNull(nodeId);
+ NodeStatisticsHandler ager = statisticsCache.get(nodeId);
+ if (ager == null) {
+ ager = new NodeStatisticsHandler(this, new NodeKey(nodeId));
+ statisticsCache.put(nodeId, ager);
+ }
+
+ return ager;
}
-
+
private List<Node> getAllConnectedNodes(){
-
Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
if(nodes == null)
return null;
-
+
spLogger.debug("Number of connected nodes : {}",nodes.getNode().size());
return nodes.getNode();
}
-
+
private List<Short> getTablesFromNode(NodeKey nodeKey){
InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
-
+
FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
List<Short> tablesId = new ArrayList<Short>();
if(node != null && node.getTable()!=null){
NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
return nodeKey.getId();
}
-
+
@SuppressWarnings("deprecation")
@Override
public void close(){
-
+
try {
spLogger.info("Statistics Provider stopped.");
if (this.listenerRegistration != null) {
-
+
this.listenerRegistration.close();
-
+
this.statisticsRequesterThread.destroy();
-
+
this.statisticsAgerThread.destroy();
-
+
}
} catch (Throwable e) {
throw Exceptions.sneakyThrow(e);
+++ /dev/null
-/*
- * Copyright IBM Corporation, 2013. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.statistics.manager;
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-
-import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.FlowEntry;
-import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.QueueEntry;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Prefix;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStatsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.desc.GroupDescBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterStatisticsUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStatsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterConfigStatsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.Layer3Match;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.layer._3.match.Ipv4Match;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.flow.capable.node.connector.queue.statistics.FlowCapableNodeConnectorQueueStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class implement statistics manager related listener interface and augment all the
- * received statistics data to data stores.
- * TODO: Need to add error message listener and clean-up the associated tx id
- * if it exists in the tx-id cache.
- * @author vishnoianil
- *
- */
-public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsListener,
- OpendaylightMeterStatisticsListener,
- OpendaylightFlowStatisticsListener,
- OpendaylightPortStatisticsListener,
- OpendaylightFlowTableStatisticsListener,
- OpendaylightQueueStatisticsListener{
-
- private final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class);
-
- private final StatisticsProvider statisticsManager;
- private final MultipartMessageManager messageManager;
-
- private int unaccountedFlowsCounter = 1;
-
- /**
- * default ctor
- * @param manager
- */
- public StatisticsUpdateCommiter(final StatisticsProvider manager){
-
- this.statisticsManager = manager;
- this.messageManager = this.statisticsManager.getMultipartMessageManager();
- }
-
- public StatisticsProvider getStatisticsManager(){
- return statisticsManager;
- }
-
- @Override
- public void onMeterConfigStatsUpdated(MeterConfigStatsUpdated notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- NodeKey key = new NodeKey(notification.getId());
-
- //Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
- }
- cache.get(notification.getId()).updateMeterConfigStats(notification.getMeterConfigStats());
-
- //Publish data to configuration data store
- List<MeterConfigStats> meterConfigStatsList = notification.getMeterConfigStats();
-
- for(MeterConfigStats meterConfigStats : meterConfigStatsList){
- DataModificationTransaction it = this.statisticsManager.startChange();
- MeterBuilder meterBuilder = new MeterBuilder();
- MeterKey meterKey = new MeterKey(meterConfigStats.getMeterId());
- meterBuilder.setKey(meterKey);
-
- InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
- .augmentation(FlowCapableNode.class)
- .child(Meter.class,meterKey).toInstance();
-
- NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder();
- MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder();
- stats.fieldsFrom(meterConfigStats);
- meterConfig.setMeterConfigStats(stats.build());
-
- //Update augmented data
- meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
- it.putOperationalData(meterRef, meterBuilder.build());
- it.commit();
-
- }
- }
-
- @Override
- public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
-
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- NodeKey key = new NodeKey(notification.getId());
-
- List<MeterStats> meterStatsList = notification.getMeterStats();
-
- for(MeterStats meterStats : meterStatsList){
-
- //Publish data to configuration data store
- DataModificationTransaction it = this.statisticsManager.startChange();
- MeterBuilder meterBuilder = new MeterBuilder();
- MeterKey meterKey = new MeterKey(meterStats.getMeterId());
- meterBuilder.setKey(meterKey);
-
- InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
- .augmentation(FlowCapableNode.class)
- .child(Meter.class,meterKey).toInstance();
-
- NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder();
- MeterStatisticsBuilder stats = new MeterStatisticsBuilder();
- stats.fieldsFrom(meterStats);
- meterStatsBuilder.setMeterStatistics(stats.build());
-
- //Update augmented data
- meterBuilder.addAugmentation(NodeMeterStatistics.class, meterStatsBuilder.build());
- it.putOperationalData(meterRef, meterBuilder.build());
- it.commit();
- }
- }
-
- @Override
- public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
-
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- NodeKey key = new NodeKey(notification.getId());
-
- //Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
- }
- cache.get(notification.getId()).updateGroupDescStats(notification.getGroupDescStats());
-
- //Publish data to configuration data store
- List<GroupDescStats> groupDescStatsList = notification.getGroupDescStats();
-
- for(GroupDescStats groupDescStats : groupDescStatsList){
- DataModificationTransaction it = this.statisticsManager.startChange();
-
- GroupBuilder groupBuilder = new GroupBuilder();
- GroupKey groupKey = new GroupKey(groupDescStats.getGroupId());
- groupBuilder.setKey(groupKey);
-
- InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
- .augmentation(FlowCapableNode.class)
- .child(Group.class,groupKey).toInstance();
-
- NodeGroupDescStatsBuilder groupDesc= new NodeGroupDescStatsBuilder();
- GroupDescBuilder stats = new GroupDescBuilder();
- stats.fieldsFrom(groupDescStats);
- groupDesc.setGroupDesc(stats.build());
-
- //Update augmented data
- groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
-
- it.putOperationalData(groupRef, groupBuilder.build());
- it.commit();
- }
- }
-
- @Override
- public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
-
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- //Publish data to configuration data store
- NodeKey key = new NodeKey(notification.getId());
- List<GroupStats> groupStatsList = notification.getGroupStats();
-
- for(GroupStats groupStats : groupStatsList){
- DataModificationTransaction it = this.statisticsManager.startChange();
-
- GroupBuilder groupBuilder = new GroupBuilder();
- GroupKey groupKey = new GroupKey(groupStats.getGroupId());
- groupBuilder.setKey(groupKey);
-
- InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
- .augmentation(FlowCapableNode.class)
- .child(Group.class,groupKey).toInstance();
-
- NodeGroupStatisticsBuilder groupStatisticsBuilder= new NodeGroupStatisticsBuilder();
- GroupStatisticsBuilder stats = new GroupStatisticsBuilder();
- stats.fieldsFrom(groupStats);
- groupStatisticsBuilder.setGroupStatistics(stats.build());
-
- //Update augmented data
- groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build());
- it.putOperationalData(groupRef, groupBuilder.build());
- it.commit();
- }
- }
-
- @Override
- public void onMeterFeaturesUpdated(MeterFeaturesUpdated notification) {
-
- MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder();
- meterFeature.setMeterBandSupported(notification.getMeterBandSupported());
- meterFeature.setMeterCapabilitiesSupported(notification.getMeterCapabilitiesSupported());
- meterFeature.setMaxBands(notification.getMaxBands());
- meterFeature.setMaxColor(notification.getMaxColor());
- meterFeature.setMaxMeter(notification.getMaxMeter());
-
- //Publish data to configuration data store
- DataModificationTransaction it = this.statisticsManager.startChange();
- NodeKey key = new NodeKey(notification.getId());
- NodeRef ref = getNodeRef(key);
-
- final NodeBuilder nodeData = new NodeBuilder();
- nodeData.setKey(key);
-
- NodeMeterFeaturesBuilder nodeMeterFeatures= new NodeMeterFeaturesBuilder();
- nodeMeterFeatures.setMeterFeatures(meterFeature.build());
-
- //Update augmented data
- nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
-
- InstanceIdentifier<? extends Object> refValue = ref.getValue();
- it.putOperationalData(refValue, nodeData.build());
- it.commit();
- }
-
- @Override
- public void onGroupFeaturesUpdated(GroupFeaturesUpdated notification) {
-
- GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder();
- groupFeatures.setActions(notification.getActions());
- groupFeatures.setGroupCapabilitiesSupported(notification.getGroupCapabilitiesSupported());
- groupFeatures.setGroupTypesSupported(notification.getGroupTypesSupported());
- groupFeatures.setMaxGroups(notification.getMaxGroups());
-
- //Publish data to configuration data store
- DataModificationTransaction it = this.statisticsManager.startChange();
- NodeKey key = new NodeKey(notification.getId());
- NodeRef ref = getNodeRef(key);
-
- final NodeBuilder nodeData = new NodeBuilder();
- nodeData.setKey(key);
-
- NodeGroupFeaturesBuilder nodeGroupFeatures= new NodeGroupFeaturesBuilder();
- nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
-
- //Update augmented data
- nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
-
- InstanceIdentifier<? extends Object> refValue = ref.getValue();
- it.putOperationalData(refValue, nodeData.build());
- it.commit();
- }
-
- @Override
- public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
-
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- NodeKey key = new NodeKey(notification.getId());
- sucLogger.debug("Received flow stats update : {}",notification.toString());
-
- for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){
- short tableId = map.getTableId();
-
- DataModificationTransaction it = this.statisticsManager.startChange();
-
- boolean foundOriginalFlow = false;
-
- FlowBuilder flowBuilder = new FlowBuilder();
-
- FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
-
- FlowBuilder flow = new FlowBuilder();
- flow.setContainerName(map.getContainerName());
- flow.setBufferId(map.getBufferId());
- flow.setCookie(map.getCookie());
- flow.setCookieMask(map.getCookieMask());
- flow.setFlags(map.getFlags());
- flow.setFlowName(map.getFlowName());
- flow.setHardTimeout(map.getHardTimeout());
- if(map.getFlowId() != null)
- flow.setId(new FlowId(map.getFlowId().getValue()));
- flow.setIdleTimeout(map.getIdleTimeout());
- flow.setInstallHw(map.isInstallHw());
- flow.setInstructions(map.getInstructions());
- if(map.getFlowId()!= null)
- flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
- flow.setMatch(map.getMatch());
- flow.setOutGroup(map.getOutGroup());
- flow.setOutPort(map.getOutPort());
- flow.setPriority(map.getPriority());
- flow.setStrict(map.isStrict());
- flow.setTableId(tableId);
-
- Flow flowRule = flow.build();
-
- FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
- stats.setByteCount(map.getByteCount());
- stats.setPacketCount(map.getPacketCount());
- stats.setDuration(map.getDuration());
-
- GenericStatistics flowStats = stats.build();
-
- //Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
- }
- NodeStatisticsAger nsa = cache.get(notification.getId());
-
- //Augment the data to the flow node
-
- FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
- flowStatistics.setByteCount(flowStats.getByteCount());
- flowStatistics.setPacketCount(flowStats.getPacketCount());
- flowStatistics.setDuration(flowStats.getDuration());
- flowStatistics.setContainerName(map.getContainerName());
- flowStatistics.setBufferId(map.getBufferId());
- flowStatistics.setCookie(map.getCookie());
- flowStatistics.setCookieMask(map.getCookieMask());
- flowStatistics.setFlags(map.getFlags());
- flowStatistics.setFlowName(map.getFlowName());
- flowStatistics.setHardTimeout(map.getHardTimeout());
- flowStatistics.setIdleTimeout(map.getIdleTimeout());
- flowStatistics.setInstallHw(map.isInstallHw());
- flowStatistics.setInstructions(map.getInstructions());
- flowStatistics.setMatch(map.getMatch());
- flowStatistics.setOutGroup(map.getOutGroup());
- flowStatistics.setOutPort(map.getOutPort());
- flowStatistics.setPriority(map.getPriority());
- flowStatistics.setStrict(map.isStrict());
- flowStatistics.setTableId(tableId);
-
- flowStatisticsData.setFlowStatistics(flowStatistics.build());
-
- sucLogger.debug("Flow : {}",flowRule.toString());
- sucLogger.debug("Statistics to augment : {}",flowStatistics.build().toString());
-
- InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
-
- Table table= (Table)it.readConfigurationData(tableRef);
-
- //TODO: Not a good way to do it, need to figure out better way.
- //TODO: major issue in any alternate approach is that flow key is incrementally assigned
- //to the flows stored in data store.
- // Augment same statistics to all the matching masked flow
- if(table != null){
-
- for(Flow existingFlow : table.getFlow()){
- sucLogger.debug("Existing flow in data store : {}",existingFlow.toString());
- if(flowEquals(flowRule,existingFlow)){
- it = this.statisticsManager.startChange();
- InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
- .augmentation(FlowCapableNode.class)
- .child(Table.class, new TableKey(tableId))
- .child(Flow.class,existingFlow.getKey()).toInstance();
- flowBuilder.setKey(existingFlow.getKey());
- flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- sucLogger.debug("Found matching flow in the datastore, augmenting statistics");
- foundOriginalFlow = true;
- // Update entry with timestamp of latest response
- flow.setKey(existingFlow.getKey());
- FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
- cache.get(notification.getId()).updateFlowStats(flowStatsEntry);
-
- it.putOperationalData(flowRef, flowBuilder.build());
- it.commit();
- }
- }
- }
-
- table= (Table)it.readOperationalData(tableRef);
- if(!foundOriginalFlow && table != null){
-
- for(Flow existingFlow : table.getFlow()){
- FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
- if(augmentedflowStatisticsData != null){
- FlowBuilder existingOperationalFlow = new FlowBuilder();
- existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
- sucLogger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
- if(flowEquals(flowRule,existingOperationalFlow.build())){
- InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
- .augmentation(FlowCapableNode.class)
- .child(Table.class, new TableKey(tableId))
- .child(Flow.class,existingFlow.getKey()).toInstance();
- flowBuilder.setKey(existingFlow.getKey());
- flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- sucLogger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
- foundOriginalFlow = true;
-
- // Update entry with timestamp of latest response
- flow.setKey(existingFlow.getKey());
- FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
- cache.get(notification.getId()).updateFlowStats(flowStatsEntry);
-
- it.putOperationalData(flowRef, flowBuilder.build());
- it.commit();
- break;
- }
- }
- }
- }
- if(!foundOriginalFlow){
- String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
- this.unaccountedFlowsCounter++;
- FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
- InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
- .augmentation(FlowCapableNode.class)
- .child(Table.class, new TableKey(tableId))
- .child(Flow.class,newFlowKey).toInstance();
- flowBuilder.setKey(newFlowKey);
- flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- sucLogger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",flowBuilder.build());
-
- // Update entry with timestamp of latest response
- flow.setKey(newFlowKey);
- FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
- cache.get(notification.getId()).updateFlowStats(flowStatsEntry);
-
- it.putOperationalData(flowRef, flowBuilder.build());
- it.commit();
- }
- }
- }
-
- @Override
- public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- NodeKey key = new NodeKey(notification.getId());
-
- Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId());
- if(tableId != null){
-
- DataModificationTransaction it = this.statisticsManager.startChange();
-
- InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
-
- AggregateFlowStatisticsDataBuilder aggregateFlowStatisticsDataBuilder = new AggregateFlowStatisticsDataBuilder();
- AggregateFlowStatisticsBuilder aggregateFlowStatisticsBuilder = new AggregateFlowStatisticsBuilder();
- aggregateFlowStatisticsBuilder.setByteCount(notification.getByteCount());
- aggregateFlowStatisticsBuilder.setFlowCount(notification.getFlowCount());
- aggregateFlowStatisticsBuilder.setPacketCount(notification.getPacketCount());
- aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
-
- sucLogger.debug("Augment aggregate statistics: {} for table {} on Node {}",aggregateFlowStatisticsBuilder.build().toString(),tableId,key);
-
- TableBuilder tableBuilder = new TableBuilder();
- tableBuilder.setKey(new TableKey(tableId));
- tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
- it.putOperationalData(tableRef, tableBuilder.build());
- it.commit();
-
- }
- }
-
- @Override
- public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- NodeKey key = new NodeKey(notification.getId());
-
- List<NodeConnectorStatisticsAndPortNumberMap> portsStats = notification.getNodeConnectorStatisticsAndPortNumberMap();
- for(NodeConnectorStatisticsAndPortNumberMap portStats : portsStats){
-
- DataModificationTransaction it = this.statisticsManager.startChange();
-
- FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder
- = new FlowCapableNodeConnectorStatisticsBuilder();
- statisticsBuilder.setBytes(portStats.getBytes());
- statisticsBuilder.setCollisionCount(portStats.getCollisionCount());
- statisticsBuilder.setDuration(portStats.getDuration());
- statisticsBuilder.setPackets(portStats.getPackets());
- statisticsBuilder.setReceiveCrcError(portStats.getReceiveCrcError());
- statisticsBuilder.setReceiveDrops(portStats.getReceiveDrops());
- statisticsBuilder.setReceiveErrors(portStats.getReceiveErrors());
- statisticsBuilder.setReceiveFrameError(portStats.getReceiveFrameError());
- statisticsBuilder.setReceiveOverRunError(portStats.getReceiveOverRunError());
- statisticsBuilder.setTransmitDrops(portStats.getTransmitDrops());
- statisticsBuilder.setTransmitErrors(portStats.getTransmitErrors());
-
- //Augment data to the node-connector
- FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder =
- new FlowCapableNodeConnectorStatisticsDataBuilder();
-
- statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build());
-
- InstanceIdentifier<NodeConnector> nodeConnectorRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).child(NodeConnector.class, new NodeConnectorKey(portStats.getNodeConnectorId())).toInstance();
-
- NodeConnector nodeConnector = (NodeConnector)it.readOperationalData(nodeConnectorRef);
-
- if(nodeConnector != null){
- sucLogger.debug("Augmenting port statistics {} to port {}",statisticsDataBuilder.build().toString(),nodeConnectorRef.toString());
- NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder();
- nodeConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, statisticsDataBuilder.build());
- it.putOperationalData(nodeConnectorRef, nodeConnectorBuilder.build());
- it.commit();
- }
- }
- }
-
- @Override
- public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- NodeKey key = new NodeKey(notification.getId());
-
- List<FlowTableAndStatisticsMap> flowTablesStatsList = notification.getFlowTableAndStatisticsMap();
- for (FlowTableAndStatisticsMap ftStats : flowTablesStatsList){
-
- DataModificationTransaction it = this.statisticsManager.startChange();
-
- InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(ftStats.getTableId().getValue())).toInstance();
-
- FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder();
-
- FlowTableStatisticsBuilder statisticsBuilder = new FlowTableStatisticsBuilder();
- statisticsBuilder.setActiveFlows(ftStats.getActiveFlows());
- statisticsBuilder.setPacketsLookedUp(ftStats.getPacketsLookedUp());
- statisticsBuilder.setPacketsMatched(ftStats.getPacketsMatched());
-
- statisticsDataBuilder.setFlowTableStatistics(statisticsBuilder.build());
-
- sucLogger.debug("Augment flow table statistics: {} for table {} on Node {}",statisticsBuilder.build().toString(),ftStats.getTableId(),key);
-
- TableBuilder tableBuilder = new TableBuilder();
- tableBuilder.setKey(new TableKey(ftStats.getTableId().getValue()));
- tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build());
- it.putOperationalData(tableRef, tableBuilder.build());
- it.commit();
- }
- }
-
- @Override
- public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
-
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- NodeKey key = new NodeKey(notification.getId());
-
- //Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
- }
-
- NodeStatisticsAger nsa = cache.get(notification.getId());
-
- List<QueueIdAndStatisticsMap> queuesStats = notification.getQueueIdAndStatisticsMap();
- for(QueueIdAndStatisticsMap swQueueStats : queuesStats){
-
- QueueEntry queueEntry = nsa.new QueueEntry(swQueueStats.getNodeConnectorId(),swQueueStats.getQueueId());
- nsa.updateQueueStats(queueEntry);
-
- FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
-
- FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
-
- queueStatisticsBuilder.fieldsFrom(swQueueStats);
-
- queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
-
- DataModificationTransaction it = this.statisticsManager.startChange();
-
- InstanceIdentifier<Queue> queueRef
- = InstanceIdentifier.builder(Nodes.class)
- .child(Node.class, key)
- .child(NodeConnector.class, new NodeConnectorKey(swQueueStats.getNodeConnectorId()))
- .augmentation(FlowCapableNodeConnector.class)
- .child(Queue.class, new QueueKey(swQueueStats.getQueueId())).toInstance();
-
- QueueBuilder queueBuilder = new QueueBuilder();
- queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, queueStatisticsDataBuilder.build());
- queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
-
- sucLogger.debug("Augmenting queue statistics {} of queue {} to port {}"
- ,queueStatisticsDataBuilder.build().toString(),
- swQueueStats.getQueueId(),
- swQueueStats.getNodeConnectorId());
-
- it.putOperationalData(queueRef, queueBuilder.build());
- it.commit();
-
- }
-
- }
-
- private static NodeRef getNodeRef(NodeKey nodeKey){
- InstanceIdentifierBuilder<?> builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey);
- return new NodeRef(builder.toInstance());
- }
-
- public boolean flowEquals(Flow statsFlow, Flow storedFlow) {
- if (statsFlow.getClass() != storedFlow.getClass()) {
- return false;
- }
- if (statsFlow.getContainerName()== null) {
- if (storedFlow.getContainerName()!= null) {
- return false;
- }
- } else if(!statsFlow.getContainerName().equals(storedFlow.getContainerName())) {
- return false;
- }
- if (statsFlow.getMatch()== null) {
- if (storedFlow.getMatch() != null) {
- return false;
- }
- } //else if(!statsFlow.getMatch().equals(storedFlow.getMatch())) {
- else if(!matchEquals(statsFlow.getMatch(), storedFlow.getMatch())) {
- return false;
- }
- if (storedFlow.getPriority() == null) {
- if (statsFlow.getPriority() != null && statsFlow.getPriority()!= 0x8000) {
- return false;
- }
- } else if(!statsFlow.getPriority().equals(storedFlow.getPriority())) {
- return false;
- }
- if (statsFlow.getTableId() == null) {
- if (storedFlow.getTableId() != null) {
- return false;
- }
- } else if(!statsFlow.getTableId().equals(storedFlow.getTableId())) {
- return false;
- }
- return true;
- }
-
- /**
- * Explicit equals method to compare the 'match' for flows stored in the data-stores and flow fetched from the switch.
- * Flow installation process has three steps
- * 1) Store flow in config data store
- * 2) and send it to plugin for installation
- * 3) Flow gets installed in switch
- *
- * The flow user wants to install and what finally gets installed in switch can be slightly different.
- * E.g, If user installs flow with src/dst ip=10.0.0.1/24, when it get installed in the switch
- * src/dst ip will be changes to 10.0.0.0/24 because of netmask of 24. When statistics manager fetch
- * stats it gets 10.0.0.0/24 rather then 10.0.0.1/24. Custom match takes care of by using masked ip
- * while comparing two ip addresses.
- *
- * Sometimes when user don't provide few values that is required by flow installation request, like
- * priority,hard timeout, idle timeout, cookies etc, plugin usages default values before sending
- * request to the switch. So when statistics manager gets flow statistics, it gets the default value.
- * But the flow stored in config data store don't have those defaults value. I included those checks
- * in the customer flow/match equal function.
- *
- *
- * @param statsFlow
- * @param storedFlow
- * @return
- */
-
- public boolean matchEquals(Match statsFlow, Match storedFlow) {
- if (statsFlow == storedFlow) {
- return true;
- }
- if (storedFlow.getClass() != statsFlow.getClass()) {
- return false;
- }
- if (storedFlow.getEthernetMatch() == null) {
- if (statsFlow.getEthernetMatch() != null) {
- return false;
- }
- } else if(!storedFlow.getEthernetMatch().equals(statsFlow.getEthernetMatch())) {
- return false;
- }
- if (storedFlow.getIcmpv4Match()== null) {
- if (statsFlow.getIcmpv4Match() != null) {
- return false;
- }
- } else if(!storedFlow.getIcmpv4Match().equals(statsFlow.getIcmpv4Match())) {
- return false;
- }
- if (storedFlow.getIcmpv6Match() == null) {
- if (statsFlow.getIcmpv6Match() != null) {
- return false;
- }
- } else if(!storedFlow.getIcmpv6Match().equals(statsFlow.getIcmpv6Match())) {
- return false;
- }
- if (storedFlow.getInPhyPort() == null) {
- if (statsFlow.getInPhyPort() != null) {
- return false;
- }
- } else if(!storedFlow.getInPhyPort().equals(statsFlow.getInPhyPort())) {
- return false;
- }
- if (storedFlow.getInPort()== null) {
- if (statsFlow.getInPort() != null) {
- return false;
- }
- } else if(!storedFlow.getInPort().equals(statsFlow.getInPort())) {
- return false;
- }
- if (storedFlow.getIpMatch()== null) {
- if (statsFlow.getIpMatch() != null) {
- return false;
- }
- } else if(!storedFlow.getIpMatch().equals(statsFlow.getIpMatch())) {
- return false;
- }
- if (storedFlow.getLayer3Match()== null) {
- if (statsFlow.getLayer3Match() != null) {
- return false;
- }
- } else if(!layer3MatchEquals(statsFlow.getLayer3Match(),storedFlow.getLayer3Match())) {
- return false;
- }
- if (storedFlow.getLayer4Match()== null) {
- if (statsFlow.getLayer4Match() != null) {
- return false;
- }
- } else if(!storedFlow.getLayer4Match().equals(statsFlow.getLayer4Match())) {
- return false;
- }
- if (storedFlow.getMetadata() == null) {
- if (statsFlow.getMetadata() != null) {
- return false;
- }
- } else if(!storedFlow.getMetadata().equals(statsFlow.getMetadata())) {
- return false;
- }
- if (storedFlow.getProtocolMatchFields() == null) {
- if (statsFlow.getProtocolMatchFields() != null) {
- return false;
- }
- } else if(!storedFlow.getProtocolMatchFields().equals(statsFlow.getProtocolMatchFields())) {
- return false;
- }
- if (storedFlow.getTunnel()== null) {
- if (statsFlow.getTunnel() != null) {
- return false;
- }
- } else if(!storedFlow.getTunnel().equals(statsFlow.getTunnel())) {
- return false;
- }
- if (storedFlow.getVlanMatch()== null) {
- if (statsFlow.getVlanMatch() != null) {
- return false;
- }
- } else if(!storedFlow.getVlanMatch().equals(statsFlow.getVlanMatch())) {
- return false;
- }
- return true;
- }
-
- protected static boolean layer3MatchEquals(Layer3Match statsLayer3Match, Layer3Match storedLayer3Match){
- boolean verdict = true;
- if(statsLayer3Match instanceof Ipv4Match && storedLayer3Match instanceof Ipv4Match){
- Ipv4Match statsIpv4Match = (Ipv4Match)statsLayer3Match;
- Ipv4Match storedIpv4Match = (Ipv4Match)storedLayer3Match;
-
- if (verdict) {
- verdict = compareNullSafe(
- storedIpv4Match.getIpv4Destination(), statsIpv4Match.getIpv4Destination());
- }
- if (verdict) {
- verdict = compareNullSafe(
- statsIpv4Match.getIpv4Source(), storedIpv4Match.getIpv4Source());
- }
- } else {
- Boolean nullCheckOut = checkNullValues(storedLayer3Match, statsLayer3Match);
- if (nullCheckOut != null) {
- verdict = nullCheckOut;
- } else {
- verdict = storedLayer3Match.equals(statsLayer3Match);
- }
- }
-
- return verdict;
- }
-
- private static boolean compareNullSafe(Ipv4Prefix statsIpv4, Ipv4Prefix storedIpv4) {
- boolean verdict = true;
- Boolean checkDestNullValuesOut = checkNullValues(storedIpv4, statsIpv4);
- if (checkDestNullValuesOut != null) {
- verdict = checkDestNullValuesOut;
- } else if(!IpAddressEquals(statsIpv4, storedIpv4)){
- verdict = false;
- }
-
- return verdict;
- }
-
- private static Boolean checkNullValues(Object v1, Object v2) {
- Boolean verdict = null;
- if (v1 == null && v2 != null) {
- verdict = Boolean.FALSE;
- } else if (v1 != null && v2 == null) {
- verdict = Boolean.FALSE;
- } else if (v1 == null && v2 == null) {
- verdict = Boolean.TRUE;
- }
-
- return verdict;
- }
-
- /**
- * TODO: why don't we use the default Ipv4Prefix.equals()?
- *
- * @param statsIpAddress
- * @param storedIpAddress
- * @return true if IPv4prefixes equals
- */
- private static boolean IpAddressEquals(Ipv4Prefix statsIpAddress, Ipv4Prefix storedIpAddress) {
- IntegerIpAddress statsIpAddressInt = StrIpToIntIp(statsIpAddress.getValue());
- IntegerIpAddress storedIpAddressInt = StrIpToIntIp(storedIpAddress.getValue());
-
- if(IpAndMaskBasedMatch(statsIpAddressInt,storedIpAddressInt)){
- return true;
- }
- if(IpBasedMatch(statsIpAddressInt,storedIpAddressInt)){
- return true;
- }
- return false;
- }
-
- private static boolean IpAndMaskBasedMatch(IntegerIpAddress statsIpAddressInt,IntegerIpAddress storedIpAddressInt){
- return ((statsIpAddressInt.getIp() & statsIpAddressInt.getMask()) == (storedIpAddressInt.getIp() & storedIpAddressInt.getMask()));
- }
-
- private static boolean IpBasedMatch(IntegerIpAddress statsIpAddressInt,IntegerIpAddress storedIpAddressInt){
- return (statsIpAddressInt.getIp() == storedIpAddressInt.getIp());
- }
-
- /**
- * Method return integer version of ip address. Converted int will be mask if
- * mask specified
- */
- private static IntegerIpAddress StrIpToIntIp(String ipAddresss){
-
- String[] parts = ipAddresss.split("/");
- String ip = parts[0];
- int prefix;
-
- if (parts.length < 2) {
- prefix = 32;
- } else {
- prefix = Integer.parseInt(parts[1]);
- }
-
- IntegerIpAddress integerIpAddress = null;
- try {
- Inet4Address addr = (Inet4Address) InetAddress.getByName(ip);
- byte[] addrBytes = addr.getAddress();
- int ipInt = ((addrBytes[0] & 0xFF) << 24) |
- ((addrBytes[1] & 0xFF) << 16) |
- ((addrBytes[2] & 0xFF) << 8) |
- ((addrBytes[3] & 0xFF) << 0);
-
- int mask = 0xffffffff << 32 - prefix;
-
- integerIpAddress = new IntegerIpAddress(ipInt, mask);
- } catch (UnknownHostException e){
- sucLogger.error("Failed to determine host IP address by name: {}", e.getMessage(), e);
- }
-
- return integerIpAddress;
- }
-
- static class IntegerIpAddress{
- int ip;
- int mask;
- public IntegerIpAddress(int ip, int mask) {
- this.ip = ip;
- this.mask = mask;
- }
- public int getIp() {
- return ip;
- }
- public int getMask() {
- return mask;
- }
- }
-}
-
/**
* Following are two main responsibilities of the class
- * 1) Listen for the create changes in config data store for tree nodes (Flow,Group,Meter,Queue)
+ * 1) Listen for the create changes in config data store for tree nodes (Flow,Group,Meter,Queue)
* and send statistics request to the switch to fetch the statistics
- *
+ *
* 2)Listen for the remove changes in config data store for tree nodes (Flow,Group,Meter,Queue)
* and remove the relative statistics data from operational data store.
- *
+ *
* @author avishnoi@in.ibm.com
*
*/
public class StatisticsUpdateHandler implements DataChangeListener {
- public final static Logger suhLogger = LoggerFactory.getLogger(StatisticsUpdateHandler.class);
-
+ private static final Logger suhLogger = LoggerFactory.getLogger(StatisticsUpdateHandler.class);
private final StatisticsProvider statisticsManager;
-
- public StatisticsUpdateHandler(final StatisticsProvider manager){
+ public StatisticsUpdateHandler(final StatisticsProvider manager){
this.statisticsManager = manager;
}
-
- public StatisticsProvider getStatisticsManager(){
- return statisticsManager;
- }
@SuppressWarnings("unchecked")
@Override
public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
-
+
Map<InstanceIdentifier<?>, DataObject> nodeAdditions = change.getCreatedOperationalData();
for (InstanceIdentifier<? extends DataObject> dataObjectInstance : nodeAdditions.keySet()) {
DataObject dataObject = nodeAdditions.get(dataObjectInstance);
if(dataObject instanceof Node){
-
+
Node node = (Node) dataObject;
if(node.getAugmentation(FlowCapableNode.class) != null){
this.statisticsManager.sendStatisticsRequestsToNode(node);
}
}
}
-
+
+ DataModificationTransaction it = this.statisticsManager.startChange();
Set<InstanceIdentifier<? extends DataObject>> removals = change.getRemovedConfigurationData();
for (InstanceIdentifier<? extends DataObject> dataObjectInstance : removals) {
DataObject dataObject = change.getOriginalConfigurationData().get(dataObjectInstance);
-
+
if(dataObject instanceof Flow){
InstanceIdentifier<Flow> flowII = (InstanceIdentifier<Flow>)dataObjectInstance;
- InstanceIdentifier<?> flowAugmentation =
+ InstanceIdentifier<?> flowAugmentation =
InstanceIdentifier.builder(flowII).augmentation(FlowStatisticsData.class).toInstance();
- removeAugmentedOperationalData(flowAugmentation);
+ it.removeOperationalData(flowAugmentation);
}
if(dataObject instanceof Meter){
InstanceIdentifier<Meter> meterII = (InstanceIdentifier<Meter>)dataObjectInstance;
-
- InstanceIdentifier<?> nodeMeterConfigStatsAugmentation =
+
+ InstanceIdentifier<?> nodeMeterConfigStatsAugmentation =
InstanceIdentifier.builder(meterII).augmentation(NodeMeterConfigStats.class).toInstance();
- removeAugmentedOperationalData(nodeMeterConfigStatsAugmentation);
+ it.removeOperationalData(nodeMeterConfigStatsAugmentation);
- InstanceIdentifier<?> nodeMeterStatisticsAugmentation =
+ InstanceIdentifier<?> nodeMeterStatisticsAugmentation =
InstanceIdentifier.builder(meterII).augmentation(NodeMeterStatistics.class).toInstance();
- removeAugmentedOperationalData(nodeMeterStatisticsAugmentation);
+ it.removeOperationalData(nodeMeterStatisticsAugmentation);
}
-
+
if(dataObject instanceof Group){
InstanceIdentifier<Group> groupII = (InstanceIdentifier<Group>)dataObjectInstance;
-
- InstanceIdentifier<?> nodeGroupDescStatsAugmentation =
+
+ InstanceIdentifier<?> nodeGroupDescStatsAugmentation =
InstanceIdentifier.builder(groupII).augmentation(NodeGroupDescStats.class).toInstance();
- removeAugmentedOperationalData(nodeGroupDescStatsAugmentation);
+ it.removeOperationalData(nodeGroupDescStatsAugmentation);
- InstanceIdentifier<?> nodeGroupStatisticsAugmentation =
+ InstanceIdentifier<?> nodeGroupStatisticsAugmentation =
InstanceIdentifier.builder(groupII).augmentation(NodeGroupStatistics.class).toInstance();
- removeAugmentedOperationalData(nodeGroupStatisticsAugmentation);
+ it.removeOperationalData(nodeGroupStatisticsAugmentation);
}
-
+
if(dataObject instanceof Queue){
InstanceIdentifier<Queue> queueII = (InstanceIdentifier<Queue>)dataObjectInstance;
-
- InstanceIdentifier<?> nodeConnectorQueueStatisticsDataAugmentation =
+
+ InstanceIdentifier<?> nodeConnectorQueueStatisticsDataAugmentation =
InstanceIdentifier.builder(queueII).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
- removeAugmentedOperationalData(nodeConnectorQueueStatisticsDataAugmentation);
+ it.removeOperationalData(nodeConnectorQueueStatisticsDataAugmentation);
}
}
- }
-
- private void removeAugmentedOperationalData(InstanceIdentifier<? extends DataObject> dataObjectInstance ){
- if(dataObjectInstance != null){
- DataModificationTransaction it = this.statisticsManager.startChange();
- it.removeOperationalData(dataObjectInstance);
- it.commit();
- }
+ it.commit();
}
}
import org.slf4j.LoggerFactory;
/**
- *
+ *
*/
public class StatisticsUpdateCommiterTest {
-
+
private static final Logger LOG = LoggerFactory
.getLogger(StatisticsUpdateCommiterTest.class);
/**
- * Test method for {@link org.opendaylight.controller.md.statistics.manager.StatisticsUpdateCommiter#layer3MatchEquals(org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.Layer3Match, org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.Layer3Match)}.
+ * Test method for {@link org.opendaylight.controller.md.statistics.manager.StatisticsListener#layer3MatchEquals(org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.Layer3Match, org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.Layer3Match)}.
*/
@Test
public void testLayer3MatchEquals() {
{{"10.1.2.0/24", "10.1.2.0/24"}, {"10.1.2.0/24", "10.1.1.0/24"}},
{{"10.1.1.0/24", "10.1.2.0/24"}, {"10.1.2.0/24", "10.1.2.0/24"}},
{{"10.1.1.0/24", "10.1.1.0/24"}, {"10.1.2.0/24", "10.1.2.0/24"}},
-
+
{{"10.1.1.0/24", null}, {"10.1.1.0/24", "10.1.2.0/24"}},
{{"10.1.1.0/24", null}, {"10.1.2.0/24", "10.1.2.0/24"}},
{{"10.1.1.0/24", null}, {"10.1.2.0/24", null}},
{{"10.1.1.0/24", null}, {"10.1.1.0/24", null}},
-
+
{{null, "10.1.1.0/24"}, {"10.1.2.0/24", "10.1.1.0/24"}},
{{null, "10.1.1.0/24"}, {"10.1.2.0/24", "10.1.2.0/24"}},
{{null, "10.1.1.0/24"}, {null, "10.1.2.0/24"}},
{{null, "10.1.1.0/24"}, {null, "10.1.1.0/24"}},
-
+
{{null, null}, {null, "10.1.1.0/24"}},
{{null, null}, {null, null}},
};
-
+
boolean[] matches = new boolean[] {
- true,
+ true,
false,
false,
false,
-
+
false,
false,
false,
true,
-
+
false,
false,
false,
true,
-
+
false,
true
};
-
+
for (int i = 0; i < matches.length; i++) {
checkComparisonOfL3Match(
- matchSeeds[i][0][0], matchSeeds[i][0][1],
- matchSeeds[i][1][0], matchSeeds[i][1][1],
+ matchSeeds[i][0][0], matchSeeds[i][0][1],
+ matchSeeds[i][1][0], matchSeeds[i][1][1],
matches[i]);
}
}
* @param m2Source match2 - src
* @param msDestination match2 - dest
* @param matches expected match output
- *
+ *
*/
- private static void checkComparisonOfL3Match(String m1Source, String m1Destination,
+ private static void checkComparisonOfL3Match(String m1Source, String m1Destination,
String m2Source, String msDestination, boolean matches) {
Ipv4Match m1Layer3 = prepareIPv4Match(m1Source, m1Destination);
Ipv4Match m2Layer3 = prepareIPv4Match(m2Source, msDestination);
boolean comparisonResult;
try {
- comparisonResult = StatisticsUpdateCommiter.layer3MatchEquals(m1Layer3, m2Layer3);
- Assert.assertEquals("failed to compare: "+m1Layer3+" vs. "+m2Layer3,
+ comparisonResult = FlowComparator.layer3MatchEquals(m1Layer3, m2Layer3);
+ Assert.assertEquals("failed to compare: "+m1Layer3+" vs. "+m2Layer3,
matches, comparisonResult);
} catch (Exception e) {
LOG.error("failed to compare: {} vs. {}", m1Layer3, m2Layer3, e);
Assert.fail(e.getMessage());
}
}
-
+
private static Ipv4Match prepareIPv4Match(String source, String destination) {
Ipv4MatchBuilder ipv4MatchBuilder = new Ipv4MatchBuilder();
if (source != null) {
if (destination != null) {
ipv4MatchBuilder.setIpv4Destination(new Ipv4Prefix(destination));
}
-
+
return ipv4MatchBuilder.build();
}
package org.opendaylight.controller.netconf.persist.impl;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import io.netty.channel.EventLoopGroup;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.concurrent.Immutable;
+
import org.opendaylight.controller.config.api.ConflictingVersionException;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.client.NetconfClient;
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
import org.opendaylight.controller.netconf.util.NetconfUtil;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageAdditionalHeader;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
-import javax.annotation.concurrent.Immutable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import com.google.common.base.Preconditions;
+import io.netty.channel.EventLoopGroup;
@Immutable
public class ConfigPusher {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000;
private static final int NETCONF_SEND_ATTEMPTS = 20;
}
public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup,
- long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) {
+ long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) {
this.address = address;
this.nettyThreadGroup = nettyThreadGroup;
this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(maxWaitForCapabilitiesMillis);
int attempt = 0;
- String additionalHeader = NetconfMessageAdditionalHeader.toString("unknown", address.getAddress().getHostAddress(),
- Integer.toString(address.getPort()), "tcp", Optional.of("persister"));
+ NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("unknown", address.getAddress().getHostAddress(),
+ Integer.toString(address.getPort()), "tcp", "persister");
Set<String> latestCapabilities = null;
while (System.nanoTime() < deadlineNanos) {
NetconfMessage netconfMessage = netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
NetconfUtil.checkIsMessageOk(netconfMessage);
return netconfMessage;
- } catch (RuntimeException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions
+ } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) {
logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e);
throw new IOException("Failed to execute netconf transaction", e);
}
}
-
// load editConfig.xml template, populate /rpc/edit-config/config with parameter
private static NetconfMessage createEditConfigMessage(Element dataElement) {
String editConfigResourcePath = "/netconfOp/editConfig.xml";
'}';
}
}
-}
\ No newline at end of file
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.api;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+
+import java.io.IOException;
+
+import org.opendaylight.protocol.framework.AbstractProtocolSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>> extends AbstractProtocolSession<NetconfMessage> implements NetconfSession {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSession.class);
+ private final L sessionListener;
+ private final long sessionId;
+ private boolean up = false;
+
+ protected final Channel channel;
+
+ protected AbstractNetconfSession(L sessionListener, Channel channel, long sessionId) {
+ this.sessionListener = sessionListener;
+ this.channel = channel;
+ this.sessionId = sessionId;
+ logger.debug("Session {} created", toString());
+ }
+
+ protected abstract S thisInstance();
+
+ @Override
+ public void close() {
+ channel.close();
+ up = false;
+ sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
+ }
+
+ @Override
+ protected void handleMessage(NetconfMessage netconfMessage) {
+ logger.debug("handling incoming message");
+ sessionListener.onMessage(thisInstance(), netconfMessage);
+ }
+
+ @Override
+ public ChannelFuture sendMessage(NetconfMessage netconfMessage) {
+ return channel.writeAndFlush(netconfMessage);
+ }
+
+ @Override
+ protected void endOfInput() {
+ logger.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
+ : "initialized");
+ if (isUp()) {
+ this.sessionListener.onSessionDown(thisInstance(), new IOException("End of input detected. Close the session."));
+ }
+ }
+
+ @Override
+ protected void sessionUp() {
+ logger.debug("Session {} up", toString());
+ sessionListener.onSessionUp(thisInstance());
+ this.up = true;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuffer sb = new StringBuffer("ServerNetconfSession{");
+ sb.append("sessionId=").append(sessionId);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public final boolean isUp() {
+ return up;
+ }
+
+ public final long getSessionId() {
+ return sessionId;
+ }
+}
+
import org.w3c.dom.Document;
-import com.google.common.base.Optional;
-
/**
* NetconfMessage represents a wrapper around org.w3c.dom.Document. Needed for
* implementing ProtocolMessage interface.
*/
-public final class NetconfMessage {
- private final String additionalHeader;
+public class NetconfMessage {
private final Document doc;
public NetconfMessage(final Document doc) {
- this(doc, null);
- }
-
- public NetconfMessage(Document doc, String additionalHeader) {
this.doc = doc;
- this.additionalHeader = additionalHeader;
}
public Document getDocument() {
return this.doc;
}
-
- public Optional<String> getAdditionalHeader() {
- return additionalHeader== null ? Optional.<String>absent() : Optional.of(additionalHeader);
- }
}
*/
package org.opendaylight.controller.netconf.api;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import java.io.IOException;
+import org.opendaylight.protocol.framework.ProtocolSession;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
-import org.opendaylight.protocol.framework.SessionListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class NetconfSession extends AbstractProtocolSession<NetconfMessage> {
- private static final Logger logger = LoggerFactory.getLogger(NetconfSession.class);
- private final SessionListener<NetconfMessage, NetconfSession, NetconfTerminationReason> sessionListener;
- private final long sessionId;
- private boolean up = false;
-
- protected final Channel channel;
-
- protected NetconfSession(SessionListener<NetconfMessage, NetconfSession, NetconfTerminationReason> sessionListener, Channel channel, long sessionId) {
- this.sessionListener = sessionListener;
- this.channel = channel;
- this.sessionId = sessionId;
- logger.debug("Session {} created", toString());
- }
-
- @Override
- public void close() {
- channel.close();
- up = false;
- sessionListener.onSessionTerminated(this, new NetconfTerminationReason("Session closed"));
- }
-
- @Override
- protected void handleMessage(NetconfMessage netconfMessage) {
- logger.debug("handling incoming message");
- sessionListener.onMessage(this, netconfMessage);
- }
-
- public ChannelFuture sendMessage(NetconfMessage netconfMessage) {
- return channel.writeAndFlush(netconfMessage);
- }
-
- @Override
- protected void endOfInput() {
- logger.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
- : "initialized");
- if (isUp()) {
- this.sessionListener.onSessionDown(this, new IOException("End of input detected. Close the session."));
- }
- }
-
- @Override
- protected void sessionUp() {
- logger.debug("Session {} up", toString());
- sessionListener.onSessionUp(this);
- this.up = true;
- }
-
- @Override
- public String toString() {
- final StringBuffer sb = new StringBuffer("ServerNetconfSession{");
- sb.append("sessionId=").append(sessionId);
- sb.append('}');
- return sb.toString();
- }
-
- public final boolean isUp() {
- return up;
- }
-
- public final long getSessionId() {
- return sessionId;
- }
+public interface NetconfSession extends ProtocolSession<NetconfMessage> {
+ ChannelFuture sendMessage(NetconfMessage message);
}
-
import org.opendaylight.protocol.framework.SessionListener;
-public interface NetconfSessionListener extends
- SessionListener<NetconfMessage, NetconfSession, NetconfTerminationReason> {
+public interface NetconfSessionListener<S extends NetconfSession> extends SessionListener<NetconfMessage, S, NetconfTerminationReason> {
}
/**
* Class extending {@link NetconfClientSessionListener} to provide notification capability.
*/
-public abstract class AbstractNetconfClientNotifySessionListener extends NetconfClientSessionListener {
+public abstract class AbstractNetconfClientNotifySessionListener extends SimpleNetconfClientSessionListener {
/*
* Maybe some capabilities could be expressed as internal NetconfClientSessionListener handlers.
* It would enable NetconfClient functionality to be extended by using namespace handlers.
* @param message {@see NetconfClientSessionListener#onMessage(NetconfClientSession, NetconfMessage)}
*/
@Override
- public final synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
+ public final void onMessage(NetconfClientSession session, NetconfMessage message) {
if (isNotification(message)) {
onNotification(session, message);
} else {
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.protocol.framework.NeverReconnectStrategy;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.TimedReconnectStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+/**
+ * @deprecated Use {@link NetconfClientDispatcher.createClient()} or {@link NetconfClientDispatcher.createReconnectingClient()} instead.
+ */
+@Deprecated
public class NetconfClient implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(NetconfClient.class);
private NetconfClient(String clientLabelForLogging, InetSocketAddress address, ReconnectStrategy strat, NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException {
this.label = clientLabelForLogging;
dispatch = netconfClientDispatcher;
- sessionListener = new NetconfClientSessionListener();
+ sessionListener = new SimpleNetconfClientSessionListener();
Future<NetconfClientSession> clientFuture = dispatch.createClient(address, sessionListener, strat);
this.address = address;
clientSession = get(clientFuture);
return new NetconfClient(clientLabelForLogging,address,strategy,netconfClientDispatcher);
}
- public static NetconfClient clientFor(String clientLabelForLogging, InetSocketAddress address, ReconnectStrategy strategy, NetconfClientDispatcher netconfClientDispatcher,NetconfClientSessionListener listener) throws InterruptedException {
+ public static NetconfClient clientFor(String clientLabelForLogging, InetSocketAddress address,
+ ReconnectStrategy strategy, NetconfClientDispatcher netconfClientDispatcher, NetconfClientSessionListener listener) throws InterruptedException {
return new NetconfClient(clientLabelForLogging,address,strategy,netconfClientDispatcher,listener);
}
this.sessionId = clientSession.getSessionId();
}
- public NetconfMessage sendMessage(NetconfMessage message) {
+ public Future<NetconfMessage> sendRequest(NetconfMessage message) {
+ return ((SimpleNetconfClientSessionListener)sessionListener).sendRequest(message);
+ }
+
+ /**
+ * @deprecated Use {@link sendRequest} instead
+ */
+ @Deprecated
+ public NetconfMessage sendMessage(NetconfMessage message) throws ExecutionException, InterruptedException, TimeoutException {
return sendMessage(message, 5, 1000);
}
- public NetconfMessage sendMessage(NetconfMessage message, int attempts, int attemptMsDelay) {
- Stopwatch stopwatch = new Stopwatch().start();
- Preconditions.checkState(clientSession.isUp(), "Session was not up yet");
+ /**
+ * @deprecated Use {@link sendRequest} instead
+ */
+ @Deprecated
+ public NetconfMessage sendMessage(NetconfMessage message, int attempts, int attemptMsDelay) throws ExecutionException, InterruptedException, TimeoutException {
//logger.debug("Sending message: {}",XmlUtil.toString(message.getDocument()));
- clientSession.sendMessage(message);
+ final Stopwatch stopwatch = new Stopwatch().start();
+
try {
- return sessionListener.getLastMessage(attempts, attemptMsDelay);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(this + " Cannot read message from " + address, e);
- } catch (IllegalStateException e) {
- throw new IllegalStateException(this + " Cannot read message from " + address, e);
+ return sendRequest(message).get(attempts * attemptMsDelay, TimeUnit.MILLISECONDS);
} finally {
stopwatch.stop();
- logger.debug("Total time spent waiting for response {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ logger.debug("Total time spent waiting for response from {}: {} ms", address, stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
}
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Optional;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfSession;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+
import org.opendaylight.controller.netconf.util.AbstractChannelInitializer;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.opendaylight.protocol.framework.AbstractDispatcher;
import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.SessionListener;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.protocol.framework.SessionListenerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.net.InetSocketAddress;
+import com.google.common.base.Optional;
public class NetconfClientDispatcher extends AbstractDispatcher<NetconfClientSession, NetconfClientSessionListener> implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(NetconfClient.class);
+ private static final Logger logger = LoggerFactory.getLogger(NetconfClientDispatcher.class);
- private final NetconfClientSessionNegotiatorFactory negotatorFactory;
+ private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
private final HashedWheelTimer timer;
- public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup, long clientConnectionTimeoutMillis) {
+ public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
+ long clientConnectionTimeoutMillis) {
super(bossGroup, workerGroup);
timer = new HashedWheelTimer();
- this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.<String>absent(), clientConnectionTimeoutMillis);
+ this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer,
+ Optional.<NetconfHelloMessageAdditionalHeader> absent(), clientConnectionTimeoutMillis);
}
- public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup, String additionalHeader, long connectionTimeoutMillis) {
+ public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
+ NetconfHelloMessageAdditionalHeader additionalHeader, long connectionTimeoutMillis) {
super(bossGroup, workerGroup);
timer = new HashedWheelTimer();
- this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader), connectionTimeoutMillis);
+ this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader),
+ connectionTimeoutMillis);
}
public Future<NetconfClientSession> createClient(InetSocketAddress address,
}
private void initialize(SocketChannel ch, Promise<NetconfClientSession> promise) {
- new ClientChannelInitializer( negotatorFactory, sessionListener).initialize(ch, promise);
+ new ClientChannelInitializer(negotiatorFactory, sessionListener).initialize(ch, promise);
}
});
}
- private static class ClientChannelInitializer extends AbstractChannelInitializer {
+ public Future<Void> createReconnectingClient(final InetSocketAddress address,
+ final NetconfClientSessionListener listener,
+ final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) {
+ final ClientChannelInitializer init = new ClientChannelInitializer(negotiatorFactory, listener);
+
+ return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy,
+ new PipelineInitializer<NetconfClientSession>() {
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<NetconfClientSession> promise) {
+ init.initialize(ch, promise);
+ }
+ });
+ }
+
+ private static class ClientChannelInitializer extends AbstractChannelInitializer<NetconfClientSession> {
private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
private final NetconfClientSessionListener sessionListener;
private ClientChannelInitializer(NetconfClientSessionNegotiatorFactory negotiatorFactory,
- NetconfClientSessionListener sessionListener) {
+ NetconfClientSessionListener sessionListener) {
this.negotiatorFactory = negotiatorFactory;
this.sessionListener = sessionListener;
}
@Override
- public void initialize(SocketChannel ch, Promise<? extends NetconfSession> promise) {
+ public void initialize(SocketChannel ch, Promise<NetconfClientSession> promise) {
super.initialize(ch,promise);
}
@Override
- protected void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise) {
- ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() {
- @Override
- public SessionListener<NetconfMessage, NetconfClientSession, NetconfTerminationReason> getSessionListener() {
- return sessionListener;
- }
- }, ch, promise));
+ protected void initializeSessionNegotiator(SocketChannel ch, Promise<NetconfClientSession> promise) {
+ ch.pipeline().addAfter(NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_SESSION_NEGOTIATOR,
+ negotiatorFactory.getSessionNegotiator(
+ new SessionListenerFactory<NetconfClientSessionListener>() {
+ @Override
+ public NetconfClientSessionListener getSessionListener() {
+ return sessionListener;
+ }
+ }, ch, promise));
}
-
}
+
@Override
public void close() {
try {
import java.util.Collection;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfSession;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
-import org.opendaylight.protocol.framework.SessionListener;
+import org.opendaylight.controller.netconf.api.AbstractNetconfSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NetconfClientSession extends NetconfSession {
+public final class NetconfClientSession extends AbstractNetconfSession<NetconfClientSession, NetconfClientSessionListener> {
private static final Logger logger = LoggerFactory.getLogger(NetconfClientSession.class);
private final Collection<String> capabilities;
- public NetconfClientSession(SessionListener<NetconfMessage, NetconfSession, NetconfTerminationReason> sessionListener, Channel channel, long sessionId,
+ public NetconfClientSession(NetconfClientSessionListener sessionListener, Channel channel, long sessionId,
Collection<String> capabilities) {
super(sessionListener,channel,sessionId);
this.capabilities = capabilities;
return capabilities;
}
+ @Override
+ protected NetconfClientSession thisInstance() {
+ return this;
+ }
}
/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
-import org.opendaylight.protocol.framework.SessionListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class NetconfClientSessionListener implements
- SessionListener<NetconfMessage, NetconfClientSession, NetconfTerminationReason> {
-
- private static final Logger logger = LoggerFactory.getLogger(NetconfClientSessionListener.class);
- private AtomicBoolean up = new AtomicBoolean(false);
-
- @Override
- public void onSessionUp(NetconfClientSession clientSession) {
- up.set(true);
- }
-
- @Override
- public void onSessionDown(NetconfClientSession clientSession, Exception e) {
- logger.debug("Client Session {} down, reason: {}", clientSession, e.getMessage());
- up.set(false);
- }
-
- @Override
- public void onSessionTerminated(NetconfClientSession clientSession,
- NetconfTerminationReason netconfTerminationReason) {
- logger.debug("Client Session {} terminated, reason: {}", clientSession,
- netconfTerminationReason.getErrorMessage());
- up.set(false);
- }
-
- @Override
- public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
- synchronized (messages) {
- this.messages.add(message);
- }
- }
-
- private int lastReadMessage = -1;
- private List<NetconfMessage> messages = Lists.newArrayList();
-
- public NetconfMessage getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
- Preconditions.checkState(up.get(), "Session was not up yet");
-
- for (int i = 0; i < attempts; i++) {
- synchronized (messages) {
- if (messages.size() - 1 > lastReadMessage) {
- lastReadMessage++;
- return messages.get(lastReadMessage);
- }
- }
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
- if (up.get() == false)
- throw new IllegalStateException("Session ended while trying to read message");
- Thread.sleep(attemptMsDelay);
- }
+public interface NetconfClientSessionListener extends NetconfSessionListener<NetconfClientSession> {
- throw new IllegalStateException("No netconf message to read");
- }
}
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import io.netty.channel.Channel;
-import io.netty.util.Timer;
-import io.netty.util.concurrent.Promise;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
+import java.util.Collection;
+import java.util.List;
+
+import javax.annotation.Nullable;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+
import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
import org.opendaylight.controller.netconf.util.AbstractNetconfSessionNegotiator;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.controller.netconf.util.xml.XMLNetconfUtil;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.protocol.framework.SessionListener;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
-import javax.annotation.Nullable;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpression;
-import java.util.Collection;
-import java.util.List;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+import io.netty.channel.Channel;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.Promise;
public class NetconfClientSessionNegotiator extends
- AbstractNetconfSessionNegotiator<NetconfSessionPreferences, NetconfClientSession> {
+ AbstractNetconfSessionNegotiator<NetconfSessionPreferences, NetconfClientSession, NetconfClientSessionListener> {
protected NetconfClientSessionNegotiator(NetconfSessionPreferences sessionPreferences,
- Promise<NetconfClientSession> promise, Channel channel, Timer timer, SessionListener sessionListener,
+ Promise<NetconfClientSession> promise, Channel channel, Timer timer, NetconfClientSessionListener sessionListener,
long connectionTimeoutMillis) {
super(sessionPreferences, promise, channel, timer, sessionListener, connectionTimeoutMillis);
}
}
@Override
- protected NetconfClientSession getSession(SessionListener sessionListener, Channel channel, NetconfMessage message) {
+ protected NetconfClientSession getSession(NetconfClientSessionListener sessionListener, Channel channel, NetconfHelloMessage message) {
return new NetconfClientSession(sessionListener, channel, extractSessionId(message.getDocument()),
getCapabilities(message.getDocument()));
}
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.util.Timer;
import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.io.InputStream;
+
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.protocol.framework.SessionListenerFactory;
import org.opendaylight.protocol.framework.SessionNegotiator;
import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
import org.xml.sax.SAXException;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory {
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
- private final Timer timer;
+public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfMessage, NetconfClientSession, NetconfClientSessionListener> {
- private final Optional<String> additionalHeader;
+ private final Optional<NetconfHelloMessageAdditionalHeader> additionalHeader;
private final long connectionTimeoutMillis;
+ private final Timer timer;
- public NetconfClientSessionNegotiatorFactory(Timer timer, Optional<String> additionalHeader, long connectionTimeoutMillis) {
- this.timer = timer;
+ public NetconfClientSessionNegotiatorFactory(Timer timer, Optional<NetconfHelloMessageAdditionalHeader> additionalHeader, long connectionTimeoutMillis) {
+ this.timer = Preconditions.checkNotNull(timer);
this.additionalHeader = additionalHeader;
this.connectionTimeoutMillis = connectionTimeoutMillis;
}
}
@Override
- public SessionNegotiator getSessionNegotiator(SessionListenerFactory sessionListenerFactory, Channel channel,
- Promise promise) {
+ public SessionNegotiator<NetconfClientSession> getSessionNegotiator(SessionListenerFactory<NetconfClientSessionListener> sessionListenerFactory, Channel channel,
+ Promise<NetconfClientSession> promise) {
// Hello message needs to be recreated every time
NetconfMessage helloMessage = loadHelloMessageTemplate();
+
if(this.additionalHeader.isPresent()) {
- helloMessage = new NetconfMessage(helloMessage.getDocument(), additionalHeader.get());
- }
+ helloMessage = new NetconfHelloMessage(helloMessage.getDocument(), additionalHeader.get());
+ } else
+ helloMessage = new NetconfHelloMessage(helloMessage.getDocument());
+
NetconfSessionPreferences proposal = new NetconfSessionPreferences(helloMessage);
return new NetconfClientSessionNegotiator(proposal, promise, channel, timer,
sessionListenerFactory.getSessionListener(), connectionTimeoutMillis);
}
-
}
import java.io.IOException;
import java.net.InetSocketAddress;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfSession;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
import org.opendaylight.controller.netconf.util.AbstractChannelInitializer;
import org.opendaylight.controller.netconf.util.handler.ssh.SshHandler;
import org.opendaylight.controller.netconf.util.handler.ssh.authentication.AuthenticationHandler;
import org.opendaylight.controller.netconf.util.handler.ssh.client.Invoker;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.SessionListener;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.protocol.framework.SessionListenerFactory;
import com.google.common.base.Optional;
private final AuthenticationHandler authHandler;
private final HashedWheelTimer timer;
- private final NetconfClientSessionNegotiatorFactory negotatorFactory;
+ private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
public NetconfSshClientDispatcher(AuthenticationHandler authHandler, EventLoopGroup bossGroup,
EventLoopGroup workerGroup, long connectionTimeoutMillis) {
super(bossGroup, workerGroup, connectionTimeoutMillis);
this.authHandler = authHandler;
this.timer = new HashedWheelTimer();
- this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.<String>absent(), connectionTimeoutMillis);
+ this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer,
+ Optional.<NetconfHelloMessageAdditionalHeader> absent(), connectionTimeoutMillis);
}
public NetconfSshClientDispatcher(AuthenticationHandler authHandler, EventLoopGroup bossGroup,
- EventLoopGroup workerGroup, String additionalHeader, long socketTimeoutMillis) {
+ EventLoopGroup workerGroup, NetconfHelloMessageAdditionalHeader additionalHeader, long socketTimeoutMillis) {
super(bossGroup, workerGroup, additionalHeader, socketTimeoutMillis);
this.authHandler = authHandler;
this.timer = new HashedWheelTimer();
- this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader), socketTimeoutMillis);
+ this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader),
+ socketTimeoutMillis);
}
@Override
@Override
public void initializeChannel(SocketChannel arg0, Promise<NetconfClientSession> arg1) {
- new NetconfSshClientInitializer(authHandler, negotatorFactory, sessionListener).initialize(arg0, arg1);
+ new NetconfSshClientInitializer(authHandler, negotiatorFactory, sessionListener).initialize(arg0, arg1);
}
});
}
- private static final class NetconfSshClientInitializer extends AbstractChannelInitializer {
+ @Override
+ public Future<Void> createReconnectingClient(final InetSocketAddress address,
+ final NetconfClientSessionListener listener,
+ final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) {
+ final NetconfSshClientInitializer init = new NetconfSshClientInitializer(authHandler, negotiatorFactory, listener);
+
+ return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy,
+ new PipelineInitializer<NetconfClientSession>() {
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<NetconfClientSession> promise) {
+ init.initialize(ch, promise);
+ }
+ });
+ }
+
+ private static final class NetconfSshClientInitializer extends AbstractChannelInitializer<NetconfClientSession> {
private final AuthenticationHandler authenticationHandler;
private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
}
@Override
- public void initialize(SocketChannel ch, Promise<? extends NetconfSession> promise) {
+ public void initialize(SocketChannel ch, Promise<NetconfClientSession> promise) {
try {
Invoker invoker = Invoker.subsystem("netconf");
ch.pipeline().addFirst(new SshHandler(authenticationHandler, invoker));
}
@Override
- protected void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise) {
- ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() {
+ protected void initializeSessionNegotiator(SocketChannel ch,
+ Promise<NetconfClientSession> promise) {
+ ch.pipeline().addAfter(NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_SESSION_NEGOTIATOR,
+ negotiatorFactory.getSessionNegotiator(new SessionListenerFactory<NetconfClientSessionListener>() {
@Override
- public SessionListener<NetconfMessage, NetconfClientSession, NetconfTerminationReason> getSessionListener() {
+ public NetconfClientSessionListener getSessionListener() {
return sessionListener;
}
}, ch, promise));
-
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SimpleNetconfClientSessionListener implements NetconfClientSessionListener {
+ private static final class RequestEntry {
+ final Promise<NetconfMessage> promise;
+ final NetconfMessage request;
+
+ public RequestEntry(Promise<NetconfMessage> future, NetconfMessage request) {
+ this.promise = Preconditions.checkNotNull(future);
+ this.request = Preconditions.checkNotNull(request);
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(SimpleNetconfClientSessionListener.class);
+
+ @GuardedBy("this")
+ private final Queue<RequestEntry> requests = new ArrayDeque<>();
+
+ @GuardedBy("this")
+ private NetconfClientSession clientSession;
+
+ @GuardedBy("this")
+ private void dispatchRequest() {
+ while (!requests.isEmpty()) {
+ final RequestEntry e = requests.peek();
+ if (e.promise.setUncancellable()) {
+ logger.debug("Sending message {}", e.request);
+ clientSession.sendMessage(e.request);
+ break;
+ }
+
+ logger.debug("Message {} has been cancelled, skipping it", e.request);
+ requests.poll();
+ }
+ }
+
+ @Override
+ public final synchronized void onSessionUp(NetconfClientSession clientSession) {
+ this.clientSession = Preconditions.checkNotNull(clientSession);
+ logger.debug("Client session {} went up", clientSession);
+ dispatchRequest();
+ }
+
+ private synchronized void tearDown(final Exception cause) {
+ final RequestEntry e = requests.poll();
+ if (e != null) {
+ e.promise.setFailure(cause);
+ }
+
+ this.clientSession = null;
+ }
+
+ @Override
+ public final void onSessionDown(NetconfClientSession clientSession, Exception e) {
+ logger.debug("Client Session {} went down unexpectedly", clientSession, e);
+ tearDown(e);
+ }
+
+ @Override
+ public final void onSessionTerminated(NetconfClientSession clientSession,
+ NetconfTerminationReason netconfTerminationReason) {
+ logger.debug("Client Session {} terminated, reason: {}", clientSession,
+ netconfTerminationReason.getErrorMessage());
+ tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage()));
+ }
+
+ @Override
+ public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
+ logger.debug("New message arrived: {}", message);
+
+ final RequestEntry e = requests.poll();
+ if (e != null) {
+ e.promise.setSuccess(message);
+ dispatchRequest();
+ } else {
+ logger.info("Ignoring unsolicited message {}", message);
+ }
+ }
+
+ final synchronized Future<NetconfMessage> sendRequest(NetconfMessage message) {
+ final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.<NetconfMessage>newPromise(), message);
+
+ requests.add(req);
+ if (clientSession != null) {
+ dispatchRequest();
+ }
+
+ return req.promise;
+ }
+}
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.Promise;
-import org.opendaylight.controller.netconf.api.NetconfSession;
+
+import java.net.InetSocketAddress;
+
import org.opendaylight.controller.netconf.impl.util.DeserializerExceptionHandler;
import org.opendaylight.controller.netconf.util.AbstractChannelInitializer;
import org.opendaylight.protocol.framework.AbstractDispatcher;
-import java.net.InetSocketAddress;
-
-public class NetconfServerDispatcher extends AbstractDispatcher<NetconfSession, NetconfServerSessionListener> {
+public class NetconfServerDispatcher extends AbstractDispatcher<NetconfServerSession, NetconfServerSessionListener> {
private final ServerChannelInitializer initializer;
public ChannelFuture createServer(InetSocketAddress address) {
- return super.createServer(address, new PipelineInitializer<NetconfSession>() {
+ return super.createServer(address, new PipelineInitializer<NetconfServerSession>() {
@Override
- public void initializeChannel(final SocketChannel ch, final Promise<NetconfSession> promise) {
+ public void initializeChannel(final SocketChannel ch, final Promise<NetconfServerSession> promise) {
initializer.initialize(ch, promise);
}
});
}
- public static class ServerChannelInitializer extends AbstractChannelInitializer {
+ public static class ServerChannelInitializer extends AbstractChannelInitializer<NetconfServerSession> {
+
+ public static final String DESERIALIZER_EX_HANDLER_KEY = "deserializerExHandler";
private final NetconfServerSessionNegotiatorFactory negotiatorFactory;
private final NetconfServerSessionListenerFactory listenerFactory;
}
@Override
- protected void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise) {
- ch.pipeline().addLast("deserializerExHandler", new DeserializerExceptionHandler());
- ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
+ protected void initializeMessageDecoder(SocketChannel ch) {
+ super.initializeMessageDecoder(ch);
+ ch.pipeline().addLast(DESERIALIZER_EX_HANDLER_KEY, new DeserializerExceptionHandler());
}
+ @Override
+ protected void initializeSessionNegotiator(SocketChannel ch, Promise<NetconfServerSession> promise) {
+ ch.pipeline().addAfter(DESERIALIZER_EX_HANDLER_KEY, AbstractChannelInitializer.NETCONF_SESSION_NEGOTIATOR, negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
+ }
}
}
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfSession;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+import org.opendaylight.controller.netconf.api.AbstractNetconfSession;
import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
-import org.opendaylight.protocol.framework.SessionListener;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.NetconfTcp;
import com.google.common.base.Preconditions;
-public class NetconfServerSession extends NetconfSession implements NetconfManagementSession {
+public final class NetconfServerSession extends AbstractNetconfSession<NetconfServerSession, NetconfServerSessionListener> implements NetconfManagementSession {
private static final Logger logger = LoggerFactory.getLogger(NetconfServerSession.class);
- private final NetconfServerSessionNegotiator.AdditionalHeader header;
+ private final NetconfHelloMessageAdditionalHeader header;
private Date loginTime;
private long inRpcSuccess, inRpcFail, outRpcError;
- public NetconfServerSession(SessionListener<NetconfMessage, NetconfSession, NetconfTerminationReason> sessionListener, Channel channel, long sessionId,
- NetconfServerSessionNegotiator.AdditionalHeader header) {
+ public NetconfServerSession(NetconfServerSessionListener sessionListener, Channel channel, long sessionId,
+ NetconfHelloMessageAdditionalHeader header) {
super(sessionListener, channel, sessionId);
this.header = header;
logger.debug("Session {} created", toString());
public static final String ISO_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
+ private static final String dateTimePatternString = DateAndTime.PATTERN_CONSTANTS.get(0);
+ private static final Pattern dateTimePattern = Pattern.compile(dateTimePatternString);
+
@Override
public Session toManagementSession() {
SessionBuilder builder = new SessionBuilder();
Preconditions.checkState(DateAndTime.PATTERN_CONSTANTS.size() == 1);
String formattedDateTime = formatDateTime(loginTime);
- String pattern = DateAndTime.PATTERN_CONSTANTS.get(0);
- Matcher matcher = Pattern.compile(pattern).matcher(formattedDateTime);
- Preconditions.checkState(matcher.matches(), "Formatted datetime %s does not match pattern %s", formattedDateTime, pattern);
+
+ Matcher matcher = dateTimePattern.matcher(formattedDateTime);
+ Preconditions.checkState(matcher.matches(), "Formatted datetime %s does not match pattern %s", formattedDateTime, dateTimePattern);
builder.setLoginTime(new DateAndTime(formattedDateTime));
builder.setInBadRpcs(new ZeroBasedCounter32(inRpcFail));
builder.setInRpcs(new ZeroBasedCounter32(inRpcSuccess));
builder.setOutRpcErrors(new ZeroBasedCounter32(outRpcError));
- builder.setUsername(header.getUsername());
+ builder.setUsername(header.getUserName());
builder.setTransport(getTransportForString(header.getTransport()));
builder.setOutNotifications(new ZeroBasedCounter32(0L));
builder.setKey(new SessionKey(getSessionId()));
Session1Builder builder1 = new Session1Builder();
- builder1.setSessionIdentifier(header.getSessionType());
+ builder1.setSessionIdentifier(header.getSessionIdentifier());
builder.addAugmentation(Session1.class, builder1.build());
return builder.build();
private Class<? extends Transport> getTransportForString(String transport) {
switch(transport) {
- case "ssh" : return NetconfSsh.class;
- case "tcp" : return NetconfTcp.class;
- default: throw new IllegalArgumentException("Unknown transport type " + transport);
+ case "ssh" : return NetconfSsh.class;
+ case "tcp" : return NetconfTcp.class;
+ default: throw new IllegalArgumentException("Unknown transport type " + transport);
}
}
return dateFormat.format(loginTime);
}
+ @Override
+ protected NetconfServerSession thisInstance() {
+ return this;
+ }
}
package org.opendaylight.controller.netconf.impl;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
+import static com.google.common.base.Preconditions.checkState;
+
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationRouterImpl;
import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.protocol.framework.SessionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
-import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
-public class NetconfServerSessionListener implements
- SessionListener<NetconfMessage, NetconfServerSession, NetconfTerminationReason> {
+public class NetconfServerSessionListener implements NetconfSessionListener<NetconfServerSession> {
+ public static final String MESSAGE_ID = "message-id";
static final Logger logger = LoggerFactory.getLogger(NetconfServerSessionListener.class);
- public static final String MESSAGE_ID = "message-id";
private final SessionMonitoringService monitoringService;
+ private final NetconfOperationRouterImpl operationRouter;
- private NetconfOperationRouterImpl operationRouter;
-
- public NetconfServerSessionListener(NetconfOperationRouterImpl operationRouter,
- SessionMonitoringService monitoringService) {
+ public NetconfServerSessionListener(NetconfOperationRouterImpl operationRouter, SessionMonitoringService monitoringService) {
this.operationRouter = operationRouter;
this.monitoringService = monitoringService;
}
package org.opendaylight.controller.netconf.impl;
-import com.google.common.base.Optional;
-import io.netty.channel.Channel;
-import io.netty.util.Timer;
-import io.netty.util.concurrent.Promise;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
+import java.net.InetSocketAddress;
+
import org.opendaylight.controller.netconf.api.NetconfServerSessionPreferences;
-import org.opendaylight.controller.netconf.impl.util.AdditionalHeaderUtil;
import org.opendaylight.controller.netconf.util.AbstractNetconfSessionNegotiator;
-import org.opendaylight.protocol.framework.SessionListener;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
+import com.google.common.base.Optional;
+
+import io.netty.channel.Channel;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.Promise;
public class NetconfServerSessionNegotiator extends
- AbstractNetconfSessionNegotiator<NetconfServerSessionPreferences, NetconfServerSession> {
+ AbstractNetconfSessionNegotiator<NetconfServerSessionPreferences, NetconfServerSession, NetconfServerSessionListener> {
static final Logger logger = LoggerFactory.getLogger(NetconfServerSessionNegotiator.class);
protected NetconfServerSessionNegotiator(NetconfServerSessionPreferences sessionPreferences,
- Promise<NetconfServerSession> promise, Channel channel, Timer timer, SessionListener sessionListener,
+ Promise<NetconfServerSession> promise, Channel channel, Timer timer, NetconfServerSessionListener sessionListener,
long connectionTimeoutMillis) {
super(sessionPreferences, promise, channel, timer, sessionListener, connectionTimeoutMillis);
}
@Override
- protected NetconfServerSession getSession(SessionListener sessionListener, Channel channel, NetconfMessage message) {
- Optional<String> additionalHeader = message.getAdditionalHeader();
+ protected NetconfServerSession getSession(NetconfServerSessionListener sessionListener, Channel channel, NetconfHelloMessage message) {
+ Optional<NetconfHelloMessageAdditionalHeader> additionalHeader = message.getAdditionalHeader();
- AdditionalHeader parsedHeader;
+ NetconfHelloMessageAdditionalHeader parsedHeader;
if (additionalHeader.isPresent()) {
- parsedHeader = AdditionalHeaderUtil.fromString(additionalHeader.get());
+ parsedHeader = additionalHeader.get();
} else {
- parsedHeader = new AdditionalHeader("unknown", ((InetSocketAddress)channel.localAddress()).getHostString(),
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.localAddress();
+ parsedHeader = new NetconfHelloMessageAdditionalHeader("unknown", inetSocketAddress.getHostString(), Integer.toString(inetSocketAddress.getPort()),
"tcp", "client");
}
+
logger.debug("Additional header from hello parsed as {} from {}", parsedHeader, additionalHeader);
return new NetconfServerSession(sessionListener, channel, sessionPreferences.getSessionId(), parsedHeader);
}
- public static class AdditionalHeader {
-
- private final String username;
- private final String address;
- private final String transport;
- private final String sessionIdentifier;
-
- public AdditionalHeader(String userName, String hostAddress, String transport, String sessionIdentifier) {
- this.address = hostAddress;
- this.username = userName;
- this.transport = transport;
- this.sessionIdentifier = sessionIdentifier;
- }
-
- String getUsername() {
- return username;
- }
-
- String getAddress() {
- return address;
- }
-
- String getTransport() {
- return transport;
- }
-
- String getSessionType() {
- return sessionIdentifier;
- }
-
- @Override
- public String toString() {
- final StringBuffer sb = new StringBuffer("AdditionalHeader{");
- sb.append("username='").append(username).append('\'');
- sb.append(", address='").append(address).append('\'');
- sb.append(", transport='").append(transport).append('\'');
- sb.append('}');
- return sb.toString();
- }
- }
-
-}
+ }
import io.netty.channel.Channel;
import io.netty.util.Timer;
import io.netty.util.concurrent.Promise;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfServerSessionPreferences;
import org.opendaylight.controller.netconf.impl.mapping.CapabilityProvider;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListener;
import org.opendaylight.controller.netconf.util.NetconfUtil;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.controller.netconf.util.xml.XMLNetconfUtil;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import javax.xml.xpath.XPathExpression;
import java.io.InputStream;
-public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorFactory {
+public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfHelloMessage, NetconfServerSession, NetconfServerSessionListener> {
public static final String SERVER_HELLO_XML_LOCATION = "/server_hello.xml";
}
@Override
- public SessionNegotiator getSessionNegotiator(SessionListenerFactory sessionListenerFactory, Channel channel,
- Promise promise) {
+ public SessionNegotiator<NetconfServerSession> getSessionNegotiator(SessionListenerFactory<NetconfServerSessionListener> sessionListenerFactory, Channel channel,
+ Promise<NetconfServerSession> promise) {
long sessionId = idProvider.getNextSessionId();
NetconfServerSessionPreferences proposal = new NetconfServerSessionPreferences(createHelloMessage(sessionId),
private static final XPathExpression capabilitiesXPath = XMLNetconfUtil
.compileXPath("/netconf:hello/netconf:capabilities");
- private NetconfMessage createHelloMessage(long sessionId) {
+ private NetconfHelloMessage createHelloMessage(long sessionId) {
Document helloMessageTemplate = getHelloTemplateClone();
// change session ID
capabilityElement.setTextContent(capability);
capabilitiesElement.appendChild(capabilityElement);
}
- return new NetconfMessage(helloMessageTemplate);
+ return new NetconfHelloMessage(helloMessageTemplate);
}
private synchronized Document getHelloTemplateClone() {
- return (Document) this.helloMessageTemplate.cloneNode(true);
+ return (Document) helloMessageTemplate.cloneNode(true);
}
}
package org.opendaylight.controller.netconf.impl.mapping.operations;
-import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
-import org.opendaylight.controller.netconf.mapping.api.DefaultNetconfOperation;
import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
import org.opendaylight.controller.netconf.util.mapping.AbstractNetconfOperation;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-public class DefaultCloseSession extends AbstractNetconfOperation implements DefaultNetconfOperation {
+public class DefaultCloseSession extends AbstractNetconfOperation {
public static final String CLOSE_SESSION = "close-session";
- private NetconfSession netconfSession;
public DefaultCloseSession(String netconfSessionIdForReporting) {
super(netconfSessionIdForReporting);
opRouter.close();
return document.createElement(XmlNetconfConstants.OK);
}
-
- @Override
- public void setNetconfSession(NetconfSession s) {
- this.netconfSession = s;
- }
-
- public NetconfSession getNetconfSession() {
- return netconfSession;
- }
}
import java.util.HashMap;
import java.util.Map;
-import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
import org.opendaylight.controller.netconf.impl.mapping.CapabilityProvider;
-import org.opendaylight.controller.netconf.mapping.api.DefaultNetconfOperation;
import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
import org.opendaylight.controller.netconf.util.mapping.AbstractNetconfOperation;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
-public final class DefaultGetSchema extends AbstractNetconfOperation implements DefaultNetconfOperation {
-
- private final CapabilityProvider cap;
- private NetconfSession netconfSession;
+public final class DefaultGetSchema extends AbstractNetconfOperation {
+ public static final String GET_SCHEMA = "get-schema";
+ public static final String IDENTIFIER = "identifier";
+ public static final String VERSION = "version";
private static final Logger logger = LoggerFactory.getLogger(DefaultGetSchema.class);
+ private final CapabilityProvider cap;
public DefaultGetSchema(CapabilityProvider cap, String netconfSessionIdForReporting) {
super(netconfSessionIdForReporting);
this.cap = cap;
}
- public static final String GET_SCHEMA = "get-schema";
- public static final String IDENTIFIER = "identifier";
- public static final String VERSION = "version";
-
@Override
protected HandlingPriority canHandle(String netconfOperationName, String namespace) {
if (netconfOperationName.equals("get-schema") == false)
} else {
version = Optional.absent();
}
-
}
}
-
- public void setNetconfSession(NetconfSession s) {
- this.netconfSession = s;
- }
-
- public NetconfSession getNetconfSession() {
- return netconfSession;
- }
}
*/
package org.opendaylight.controller.netconf.impl.osgi;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.opendaylight.controller.netconf.api.NetconfSession;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
+import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
import org.opendaylight.controller.netconf.impl.mapping.CapabilityProvider;
import org.opendaylight.controller.netconf.impl.mapping.operations.DefaultCloseSession;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
public class NetconfOperationRouterImpl implements NetconfOperationRouter {
public NetconfOperationRouterImpl(NetconfOperationServiceSnapshot netconfOperationServiceSnapshot,
CapabilityProvider capabilityProvider, DefaultCommitNotificationProducer commitNotifier) {
- this.netconfOperationServiceSnapshot = netconfOperationServiceSnapshot;
+ this.netconfOperationServiceSnapshot = Preconditions.checkNotNull(netconfOperationServiceSnapshot);
+ this.capabilityProvider = Preconditions.checkNotNull(capabilityProvider);
- this.capabilityProvider = capabilityProvider;
-
- Set<NetconfOperation> defaultNetconfOperations = Sets.newHashSet();
- defaultNetconfOperations.add(new DefaultGetSchema(capabilityProvider, netconfOperationServiceSnapshot
- .getNetconfSessionIdForReporting()));
- defaultNetconfOperations.add(new DefaultCloseSession(netconfOperationServiceSnapshot
- .getNetconfSessionIdForReporting()));
- defaultNetconfOperations.add(new DefaultStartExi(netconfOperationServiceSnapshot
- .getNetconfSessionIdForReporting()));
- defaultNetconfOperations.add(new DefaultStopExi(netconfOperationServiceSnapshot
- .getNetconfSessionIdForReporting()));
+ final String sessionId = netconfOperationServiceSnapshot.getNetconfSessionIdForReporting();
+ final Set<NetconfOperation> defaultNetconfOperations = Sets.newHashSet();
+ defaultNetconfOperations.add(new DefaultGetSchema(capabilityProvider, sessionId));
+ defaultNetconfOperations.add(new DefaultCloseSession(sessionId));
+ defaultNetconfOperations.add(new DefaultStartExi(sessionId));
+ defaultNetconfOperations.add(new DefaultStopExi(sessionId));
allNetconfOperations = getAllNetconfOperations(defaultNetconfOperations, netconfOperationServiceSnapshot);
- DefaultCommit defaultCommit = new DefaultCommit(commitNotifier, capabilityProvider,
- netconfOperationServiceSnapshot.getNetconfSessionIdForReporting());
+ DefaultCommit defaultCommit = new DefaultCommit(commitNotifier, capabilityProvider, sessionId);
Set<NetconfOperationFilter> defaultFilters = Sets.<NetconfOperationFilter> newHashSet(defaultCommit);
allSortedFilters = getAllNetconfFilters(defaultFilters, netconfOperationServiceSnapshot);
}
private class NetconfOperationExecution implements NetconfOperationFilterChain {
private final NetconfOperation operationWithHighestPriority;
- private NetconfOperationExecution(NetconfOperation operationWithHighestPriority) {
- this.operationWithHighestPriority = operationWithHighestPriority;
- }
-
public NetconfOperationExecution(TreeMap<HandlingPriority, Set<NetconfOperation>> sortedPriority,
HandlingPriority highestFoundPriority) {
operationWithHighestPriority = sortedPriority.get(highestFoundPriority).iterator().next();
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.impl.util;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.opendaylight.controller.netconf.impl.NetconfServerSessionNegotiator.AdditionalHeader;
-
-import com.google.common.base.Preconditions;
-
-public class AdditionalHeaderUtil {
-
- private static final Pattern pattern = Pattern
- .compile("\\[(?<username>[^;]+);(?<address>[0-9\\.]+)[:/](?<port>[0-9]+);(?<transport>[a-z]+)[^\\]]+\\]");
- private static final Pattern customHeaderPattern = Pattern
- .compile("\\[(?<username>[^;]+);(?<address>[0-9\\.]+)[:/](?<port>[0-9]+);(?<transport>[a-z]+);(?<sessionIdentifier>[a-z]+)[^\\]]+\\]");
-
- public static AdditionalHeader fromString(String additionalHeader) {
- additionalHeader = additionalHeader.trim();
- Matcher matcher = pattern.matcher(additionalHeader);
- Matcher matcher2 = customHeaderPattern.matcher(additionalHeader);
- Preconditions.checkArgument(matcher.matches(), "Additional header in wrong format %s, expected %s",
- additionalHeader, pattern);
- String username = matcher.group("username");
- String address = matcher.group("address");
- String transport = matcher.group("transport");
- String sessionIdentifier = "client";
- if (matcher2.matches()) {
- sessionIdentifier = matcher2.group("sessionIdentifier");
- }
- return new AdditionalHeader(username, address, transport, sessionIdentifier);
- }
-
-}
import junit.framework.Assert;
import org.junit.Test;
-import org.opendaylight.controller.netconf.impl.util.AdditionalHeaderUtil;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
public class AdditionalHeaderParserTest {
@Test
public void testParsing() throws Exception {
String s = "[netconf;10.12.0.102:48528;ssh;;;;;;]";
- NetconfServerSessionNegotiator.AdditionalHeader header = AdditionalHeaderUtil.fromString(s);
- Assert.assertEquals("netconf", header.getUsername());
+ NetconfHelloMessageAdditionalHeader header = NetconfHelloMessageAdditionalHeader.fromString(s);
+ Assert.assertEquals("netconf", header.getUserName());
Assert.assertEquals("10.12.0.102", header.getAddress());
Assert.assertEquals("ssh", header.getTransport());
}
@Test
public void testParsing2() throws Exception {
String s = "[tomas;10.0.0.0/10000;tcp;1000;1000;;/home/tomas;;]";
- NetconfServerSessionNegotiator.AdditionalHeader header = AdditionalHeaderUtil.fromString(s);
- Assert.assertEquals("tomas", header.getUsername());
+ NetconfHelloMessageAdditionalHeader header = NetconfHelloMessageAdditionalHeader.fromString(s);
+ Assert.assertEquals("tomas", header.getUserName());
Assert.assertEquals("10.0.0.0", header.getAddress());
Assert.assertEquals("tcp", header.getTransport());
}
@Test(expected = IllegalArgumentException.class)
public void testParsingNoUsername() throws Exception {
String s = "[10.12.0.102:48528;ssh;;;;;;]";
- AdditionalHeaderUtil.fromString(s);
+ NetconfHelloMessageAdditionalHeader.fromString(s);
}
}
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
}
nettyGroup = new NioEventLoopGroup();
- netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, 5000);
+ NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
+ netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
package org.opendaylight.controller.netconf.it;
-import ch.ethz.ssh2.Connection;
-import ch.ethz.ssh2.Session;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import static java.util.Collections.emptyList;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import io.netty.channel.ChannelFuture;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import javax.management.ObjectName;
+import javax.xml.parsers.ParserConfigurationException;
+
import junit.framework.Assert;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.w3c.dom.Node;
import org.xml.sax.SAXException;
-import javax.management.ObjectName;
-import javax.xml.parsers.ParserConfigurationException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
+import ch.ethz.ssh2.Connection;
+import ch.ethz.ssh2.Session;
-import static java.util.Collections.emptyList;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
public class NetconfITTest extends AbstractNetconfConfigTest {
private static final String PASSWORD = "netconf";
private NetconfMessage getConfig, getConfigCandidate, editConfig,
- closeSession, startExi, stopExi;
+ closeSession, startExi, stopExi;
private DefaultCommitNotificationProducer commitNot;
private NetconfServerDispatcher dispatch;
}
}
- */
+ */
@Test
public void testCloseSession() throws Exception {
assertEquals("ok", XmlElement.fromDomDocument(rpcReply).getOnlyChildElement().getName());
}
- private Document assertGetConfigWorks(final NetconfClient netconfClient) throws InterruptedException {
+ private Document assertGetConfigWorks(final NetconfClient netconfClient) throws InterruptedException, ExecutionException, TimeoutException {
return assertGetConfigWorks(netconfClient, this.getConfig);
}
private Document assertGetConfigWorks(final NetconfClient netconfClient, final NetconfMessage getConfigMessage)
- throws InterruptedException {
+ throws InterruptedException, ExecutionException, TimeoutException {
final NetconfMessage rpcReply = netconfClient.sendMessage(getConfigMessage);
assertNotNull(rpcReply);
assertEquals("data", XmlElement.fromDomDocument(rpcReply.getDocument()).getOnlyChildElement().getName());
sess.getStdin().write(XmlUtil.toString(this.getConfig.getDocument()).getBytes());
new Thread(){
- public void run(){
- while (true){
- byte[] bytes = new byte[1024];
- int c = 0;
- try {
- c = sess.getStdout().read(bytes);
- } catch (IOException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- logger.info("got data:"+bytes);
- if (c == 0) break;
- }
- }
+ @Override
+ public void run(){
+ while (true){
+ byte[] bytes = new byte[1024];
+ int c = 0;
+ try {
+ c = sess.getStdout().read(bytes);
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ logger.info("got data:"+bytes);
+ if (c == 0) break;
+ }
+ }
}.join();
}
import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
+import org.opendaylight.controller.netconf.util.handler.NetconfHelloMessageToXMLEncoder;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
-import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
-public abstract class AbstractChannelInitializer {
+public abstract class AbstractChannelInitializer<S extends NetconfSession> {
- public void initialize(SocketChannel ch, Promise<? extends NetconfSession> promise){
- ch.pipeline().addLast("aggregator", new NetconfMessageAggregator(FramingMechanism.EOM));
- ch.pipeline().addLast(new NetconfXMLToMessageDecoder());
- initializeAfterDecoder(ch, promise);
- ch.pipeline().addLast("frameEncoder", FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM));
- ch.pipeline().addLast(new NetconfMessageToXMLEncoder());
+ public static final String NETCONF_MESSAGE_DECODER = "netconfMessageDecoder";
+ public static final String NETCONF_MESSAGE_AGGREGATOR = "aggregator";
+ public static final String NETCONF_MESSAGE_ENCODER = "netconfMessageEncoder";
+ public static final String NETCONF_MESSAGE_FRAME_ENCODER = "frameEncoder";
+ public static final String NETCONF_SESSION_NEGOTIATOR = "negotiator";
+
+ public void initialize(SocketChannel ch, Promise<S> promise) {
+ ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfMessageAggregator(FramingMechanism.EOM));
+ initializeMessageDecoder(ch);
+ ch.pipeline().addLast(NETCONF_MESSAGE_FRAME_ENCODER, FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM));
+ initializeMessageEncoder(ch);
+
+ initializeSessionNegotiator(ch, promise);
+ }
+
+ protected void initializeMessageEncoder(SocketChannel ch) {
+ // Special encoding handler for hello message to include additional header if available,
+ // it is thrown away after successful negotiation
+ ch.pipeline().addLast(NETCONF_MESSAGE_ENCODER, new NetconfHelloMessageToXMLEncoder());
+ }
+
+ protected void initializeMessageDecoder(SocketChannel ch) {
+ // Special decoding handler for hello message to parse additional header if available,
+ // it is thrown away after successful negotiation
+ ch.pipeline().addLast(NETCONF_MESSAGE_DECODER, new NetconfXMLToHelloMessageDecoder());
}
- protected abstract void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise);
+ /**
+ * Insert session negotiator into the pipeline. It must be inserted after message decoder
+ * identified by {@link AbstractChannelInitializer#NETCONF_MESSAGE_DECODER}, (or any other custom decoder processor)
+ */
+ protected abstract void initializeSessionNegotiator(SocketChannel ch, Promise<S> promise);
}
package org.opendaylight.controller.netconf.util;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.Timeout;
-import io.netty.util.Timer;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.netconf.api.AbstractNetconfSession;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfSession;
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
-import org.opendaylight.controller.netconf.util.xml.XmlElement;
-import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
-import org.opendaylight.protocol.framework.SessionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
-import java.util.concurrent.TimeUnit;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
-public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionPreferences, S extends NetconfSession>
- extends AbstractSessionNegotiator<NetconfMessage, S> {
+public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionPreferences, S extends AbstractNetconfSession<S, L>, L extends NetconfSessionListener<S>>
+extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
+ public static final String CHUNK_DECODER_CHANNEL_HANDLER_KEY = "chunkDecoder";
protected final P sessionPreferences;
- private final SessionListener sessionListener;
+ private final L sessionListener;
private Timeout timeout;
/**
private final long connectionTimeoutMillis;
protected AbstractNetconfSessionNegotiator(P sessionPreferences, Promise<S> promise, Channel channel, Timer timer,
- SessionListener sessionListener, long connectionTimeoutMillis) {
+ L sessionListener, long connectionTimeoutMillis) {
super(promise, channel);
this.sessionPreferences = sessionPreferences;
this.timer = timer;
}
@Override
- protected void handleMessage(NetconfMessage netconfMessage) {
+ protected void handleMessage(NetconfHelloMessage netconfMessage) {
final Document doc = netconfMessage.getDocument();
- if (isHelloMessage(doc)) {
- if (containsBase11Capability(doc)
- && containsBase11Capability(sessionPreferences.getHelloMessage().getDocument())) {
- channel.pipeline().replace("frameEncoder", "frameEncoder",
- FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
- channel.pipeline().replace("aggregator", "aggregator",
- new NetconfMessageAggregator(FramingMechanism.CHUNK));
- channel.pipeline().addAfter("aggregator", "chunkDecoder", new NetconfMessageChunkDecoder());
+ // Only Hello message should arrive during negotiation
+ if (netconfMessage instanceof NetconfHelloMessage) {
+
+ replaceHelloMessageHandlers();
+
+ if (shouldUseChunkFraming(doc)) {
+ insertChunkFramingToPipeline();
}
+
changeState(State.ESTABLISHED);
- S session = getSession(sessionListener, channel, netconfMessage);
+ S session = getSession(sessionListener, channel, (NetconfHelloMessage)netconfMessage);
+
negotiationSuccessful(session);
} else {
final IllegalStateException cause = new IllegalStateException(
}
}
- protected abstract S getSession(SessionListener sessionListener, Channel channel, NetconfMessage message);
+ /**
+ * Insert chunk framing handlers into the pipeline
+ */
+ private void insertChunkFramingToPipeline() {
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER,
+ FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
+ new NetconfMessageAggregator(FramingMechanism.CHUNK));
+ channel.pipeline().addAfter(AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
+ CHUNK_DECODER_CHANNEL_HANDLER_KEY, new NetconfMessageChunkDecoder());
+ }
- private boolean isHelloMessage(Document doc) {
- try {
- XmlElement.fromDomElementWithExpected(doc.getDocumentElement(), "hello",
- XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
+ private boolean shouldUseChunkFraming(Document doc) {
+ return containsBase11Capability(doc)
+ && containsBase11Capability(sessionPreferences.getHelloMessage().getDocument());
+ }
- } catch (IllegalArgumentException | IllegalStateException e) {
- return false;
- }
- return true;
+ /**
+ * Remove special handlers for hello message. Insert regular netconf xml message (en|de)coders.
+ */
+ private void replaceHelloMessageHandlers() {
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder());
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, new NetconfMessageToXMLEncoder());
}
+ private static ChannelHandler replaceChannelHandler(Channel channel, String handlerKey, ChannelHandler decoder) {
+ return channel.pipeline().replace(handlerKey, handlerKey, decoder);
+ }
+
+ protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message);
+
private synchronized void changeState(final State newState) {
logger.debug("Changing state from : {} to : {}", state, newState);
Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state,
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.util.handler;
+
+import java.nio.ByteBuffer;
+
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/**
+ * Customized NetconfMessageToXMLEncoder that serializes additional header with
+ * session metadata along with
+ * {@link org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage}
+ * . Used by netconf clients to send information about the user, ip address,
+ * protocol etc.
+ * <p/>
+ * Hello message with header example:
+ * <p/>
+ *
+ * <pre>
+ * {@code
+ * [tomas;10.0.0.0/10000;tcp;1000;1000;;/home/tomas;;]
+ * <hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ * <capabilities>
+ * <capability>urn:ietf:params:netconf:base:1.0</capability>
+ * </capabilities>
+ * </hello>
+ * }
+ * </pre>
+ */
+public final class NetconfHelloMessageToXMLEncoder extends NetconfMessageToXMLEncoder {
+
+ @Override
+ protected ByteBuffer encodeMessage(NetconfMessage msg) {
+ Preconditions.checkState(msg instanceof NetconfHelloMessage, "Netconf message of type %s expected, was %s",
+ NetconfHelloMessage.class, msg.getClass());
+ Optional<NetconfHelloMessageAdditionalHeader> headerOptional = ((NetconfHelloMessage) msg)
+ .getAdditionalHeader();
+
+ // If additional header present, serialize it along with netconf hello
+ // message
+ if (headerOptional.isPresent()) {
+ byte[] bytesFromHeader = headerOptional.get().toFormattedString().getBytes(Charsets.UTF_8);
+ byte[] bytesFromMessage = xmlToString(msg.getDocument()).getBytes(Charsets.UTF_8);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(bytesFromHeader.length + bytesFromMessage.length)
+ .put(bytesFromHeader).put(bytesFromMessage);
+ byteBuffer.flip();
+ return byteBuffer;
+ }
+
+ return super.encodeMessage(msg);
+ }
+}
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
-public final class NetconfMessageToXMLEncoder extends MessageToByteEncoder<NetconfMessage> {
+public class NetconfMessageToXMLEncoder extends MessageToByteEncoder<NetconfMessage> {
private static final Logger LOG = LoggerFactory.getLogger(NetconfMessageToXMLEncoder.class);
private final Optional<String> clientId;
msg.getDocument().appendChild(comment);
}
- final ByteBuffer msgBytes;
- if(msg.getAdditionalHeader().isPresent()) {
- final String header = msg.getAdditionalHeader().get();
- LOG.trace("Header of netconf message parsed \n{}", header);
- // FIXME: this can be written in pieces
- msgBytes = Charsets.UTF_8.encode(header + xmlToString(msg.getDocument()));
- } else {
- msgBytes = Charsets.UTF_8.encode(xmlToString(msg.getDocument()));
- }
+ final ByteBuffer msgBytes = encodeMessage(msg);
LOG.trace("Putting message \n{}", xmlToString(msg.getDocument()));
out.writeBytes(msgBytes);
}
- private String xmlToString(Document doc) {
+ protected ByteBuffer encodeMessage(NetconfMessage msg) {
+ return Charsets.UTF_8.encode(xmlToString(msg.getDocument()));
+ }
+
+ protected String xmlToString(Document doc) {
return XmlUtil.toString(doc, false);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.util.handler;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.w3c.dom.Document;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Customized NetconfXMLToMessageDecoder that reads additional header with
+ * session metadata from
+ * {@link org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage}
+ * . Used by netconf server to retrieve information about session metadata.
+ */
+public class NetconfXMLToHelloMessageDecoder extends NetconfXMLToMessageDecoder {
+
+ private static final List<byte[]> POSSIBLE_ENDS = ImmutableList.of(
+ new byte[] { ']', '\n' },
+ new byte[] { ']', '\r', '\n' });
+ private static final List<byte[]> POSSIBLE_STARTS = ImmutableList.of(
+ new byte[] { '[' },
+ new byte[] { '\r', '\n', '[' },
+ new byte[] { '\n', '[' });
+
+ private String additionalHeaderCache;
+
+ @Override
+ protected byte[] preprocessMessageBytes(byte[] bytes) {
+ // Extract bytes containing header with additional metadata
+
+ if (startsWithAdditionalHeader(bytes)) {
+ // Auth information containing username, ip address... extracted for monitoring
+ int endOfAuthHeader = getAdditionalHeaderEndIndex(bytes);
+ if (endOfAuthHeader > -1) {
+ byte[] additionalHeaderBytes = Arrays.copyOfRange(bytes, 0, endOfAuthHeader + 2);
+ additionalHeaderCache = additionalHeaderToString(additionalHeaderBytes);
+ bytes = Arrays.copyOfRange(bytes, endOfAuthHeader + 2, bytes.length);
+ }
+ }
+
+ return bytes;
+ }
+
+ @Override
+ protected void cleanUpAfterDecode() {
+ additionalHeaderCache = null;
+ }
+
+ @Override
+ protected NetconfMessage buildNetconfMessage(Document doc) {
+ return new NetconfHelloMessage(doc, additionalHeaderCache == null ? null
+ : NetconfHelloMessageAdditionalHeader.fromString(additionalHeaderCache));
+ }
+
+ private int getAdditionalHeaderEndIndex(byte[] bytes) {
+ for (byte[] possibleEnd : POSSIBLE_ENDS) {
+ int idx = findByteSequence(bytes, possibleEnd);
+
+ if (idx != -1) {
+ return idx;
+ }
+ }
+
+ return -1;
+ }
+
+ private static int findByteSequence(final byte[] bytes, final byte[] sequence) {
+ if (bytes.length < sequence.length) {
+ throw new IllegalArgumentException("Sequence to be found is longer than the given byte array.");
+ }
+ if (bytes.length == sequence.length) {
+ if (Arrays.equals(bytes, sequence)) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ int j = 0;
+ for (int i = 0; i < bytes.length; i++) {
+ if (bytes[i] == sequence[j]) {
+ j++;
+ if (j == sequence.length) {
+ return i - j + 1;
+ }
+ } else {
+ j = 0;
+ }
+ }
+ return -1;
+ }
+
+ private boolean startsWithAdditionalHeader(byte[] bytes) {
+ for (byte[] possibleStart : POSSIBLE_STARTS) {
+ int i = 0;
+ for (byte b : possibleStart) {
+ if(bytes[i++] != b)
+ break;
+
+ if(i == possibleStart.length)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private String additionalHeaderToString(byte[] bytes) {
+ return Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
+ }
+
+}
*/
package org.opendaylight.controller.netconf.util.handler;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.List;
import org.opendaylight.controller.netconf.api.NetconfDeserializerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
-import org.xml.sax.SAXException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-public final class NetconfXMLToMessageDecoder extends ByteToMessageDecoder {
- private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToMessageDecoder.class);
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
- private static final List<byte[]> POSSIBLE_ENDS = ImmutableList.of(
- new byte[] { ']', '\n' },
- new byte[] { ']', '\r', '\n' });
- private static final List<byte[]> POSSIBLE_STARTS = ImmutableList.of(
- new byte[] { '[' },
- new byte[] { '\r', '\n', '[' },
- new byte[] { '\n', '[' });
+public class NetconfXMLToMessageDecoder extends ByteToMessageDecoder {
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToMessageDecoder.class);
@Override
@VisibleForTesting
logMessage(bytes);
- String additionalHeader = null;
-
- // FIXME: this has to be moved into the negotiator and explained as to what the heck
- // is going on. This is definitely not specified in NETCONF and has no place here. It
- // requires reading all data and incurs inefficiency by being unable to pipe the ByteBuf
- // directly into the XML decoder.
- if (startsWithAdditionalHeader(bytes)) {
- // Auth information containing username, ip address... extracted for monitoring
- int endOfAuthHeader = getAdditionalHeaderEndIndex(bytes);
- if (endOfAuthHeader > -1) {
- byte[] additionalHeaderBytes = Arrays.copyOfRange(bytes, 0, endOfAuthHeader + 2);
- additionalHeader = additionalHeaderToString(additionalHeaderBytes);
- bytes = Arrays.copyOfRange(bytes, endOfAuthHeader + 2, bytes.length);
- }
- }
+ bytes = preprocessMessageBytes(bytes);
NetconfMessage message;
try {
Document doc = XmlUtil.readXmlToDocument(new ByteArrayInputStream(bytes));
- message = new NetconfMessage(doc, additionalHeader);
- } catch (final SAXException | IOException | IllegalStateException e) {
+ message = buildNetconfMessage(doc);
+ } catch (Exception e) {
throw new NetconfDeserializerException("Could not parse message from " + new String(bytes), e);
}
out.add(message);
} finally {
in.discardReadBytes();
+ cleanUpAfterDecode();
}
}
- private int getAdditionalHeaderEndIndex(byte[] bytes) {
- for (byte[] possibleEnd : POSSIBLE_ENDS) {
- int idx = findByteSequence(bytes, possibleEnd);
-
- if (idx != -1) {
- return idx;
- }
- }
+ protected void cleanUpAfterDecode() {}
- return -1;
+ protected NetconfMessage buildNetconfMessage(Document doc) {
+ return new NetconfMessage(doc);
}
- private static int findByteSequence(final byte[] bytes, final byte[] sequence) {
- if (bytes.length < sequence.length) {
- throw new IllegalArgumentException("Sequence to be found is longer than the given byte array.");
- }
- if (bytes.length == sequence.length) {
- if (Arrays.equals(bytes, sequence)) {
- return 0;
- } else {
- return -1;
- }
- }
- int j = 0;
- for (int i = 0; i < bytes.length; i++) {
- if (bytes[i] == sequence[j]) {
- j++;
- if (j == sequence.length) {
- return i - j + 1;
- }
- } else {
- j = 0;
- }
- }
- return -1;
+ protected byte[] preprocessMessageBytes(byte[] bytes) {
+ return bytes;
}
- private boolean startsWithAdditionalHeader(byte[] bytes) {
- for (byte[] possibleStart : POSSIBLE_STARTS) {
- int i = 0;
- for (byte b : possibleStart) {
- if(bytes[i] != b)
- break;
-
- return true;
- }
- }
-
- return false;
- };
-
private void logMessage(byte[] bytes) {
String s = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
LOG.debug("Parsing message \n{}", s);
}
- private String additionalHeaderToString(byte[] bytes) {
- return Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
- }
-
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.util.messages;
+
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.w3c.dom.Document;
+
+import com.google.common.base.Optional;
+
+/**
+ * NetconfMessage that can carry additional header with session metadata. See {@link org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader}
+ */
+public final class NetconfHelloMessage extends NetconfMessage {
+
+ public static final String HELLO_TAG = "hello";
+
+ private final NetconfHelloMessageAdditionalHeader additionalHeader;
+
+ public NetconfHelloMessage(Document doc, NetconfHelloMessageAdditionalHeader additionalHeader) {
+ super(doc);
+ checkHelloMessage(doc);
+ this.additionalHeader = additionalHeader;
+ }
+
+ public NetconfHelloMessage(Document doc) {
+ this(doc, null);
+ }
+
+ public Optional<NetconfHelloMessageAdditionalHeader> getAdditionalHeader() {
+ return additionalHeader== null ? Optional.<NetconfHelloMessageAdditionalHeader>absent() : Optional.of(additionalHeader);
+ }
+
+ private static void checkHelloMessage(Document doc) {
+ try {
+ XmlElement.fromDomElementWithExpected(doc.getDocumentElement(), HELLO_TAG,
+ XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
+
+ } catch (IllegalArgumentException | IllegalStateException e) {
+ throw new IllegalArgumentException(String.format(
+ "Hello message invalid format, should contain %s tag from namespace %s, but is: %s", HELLO_TAG,
+ XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0, XmlUtil.toString(doc)), e);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.util.messages;
+
+import com.google.common.base.Preconditions;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Additional header can be used with hello message to carry information about
+ * session's connection. Provided information can be reported via netconf
+ * monitoring.
+ * <pre>
+ * It has pattern "[username; host-address:port; transport; session-identifier;]"
+ * username - name of account on a remote
+ * host-address - client's IP address
+ * port - port number
+ * transport - tcp, ssh
+ * session-identifier - persister, client
+ * Session-identifier is optional, others mandatory.
+ * </pre>
+ * This header is inserted in front of a netconf hello message followed by a newline.
+ */
+public class NetconfHelloMessageAdditionalHeader {
+
+ private static final String SC = ";";
+
+ private final String userName;
+ private final String hostAddress;
+ private final String port;
+ private final String transport;
+ private final String sessionIdentifier;
+
+ public NetconfHelloMessageAdditionalHeader(String userName, String hostAddress, String port, String transport, String sessionIdentifier) {
+ this.userName = userName;
+ this.hostAddress = hostAddress;
+ this.port = port;
+ this.transport = transport;
+ this.sessionIdentifier = sessionIdentifier;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getAddress() {
+ return hostAddress;
+ }
+
+ public String getPort() {
+ return port;
+ }
+
+ public String getTransport() {
+ return transport;
+ }
+
+ public String getSessionIdentifier() {
+ return sessionIdentifier;
+ }
+
+ /**
+ * Format additional header into a string suitable as a prefix for netconf hello message
+ */
+ public String toFormattedString() {
+ Preconditions.checkNotNull(userName);
+ Preconditions.checkNotNull(hostAddress);
+ Preconditions.checkNotNull(port);
+ Preconditions.checkNotNull(transport);
+ Preconditions.checkNotNull(sessionIdentifier);
+ return "[" + userName + SC + hostAddress + ":" + port + SC + transport + SC + sessionIdentifier + SC + "]"
+ + System.lineSeparator();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuffer sb = new StringBuffer("NetconfHelloMessageAdditionalHeader{");
+ sb.append("userName='").append(userName).append('\'');
+ sb.append(", hostAddress='").append(hostAddress).append('\'');
+ sb.append(", port='").append(port).append('\'');
+ sb.append(", transport='").append(transport).append('\'');
+ sb.append(", sessionIdentifier='").append(sessionIdentifier).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+
+ // TODO IPv6
+ private static final Pattern pattern = Pattern
+ .compile("\\[(?<username>[^;]+);(?<address>[0-9\\.]+)[:/](?<port>[0-9]+);(?<transport>[a-z]+)[^\\]]+\\]");
+ private static final Pattern customHeaderPattern = Pattern
+ .compile("\\[(?<username>[^;]+);(?<address>[0-9\\.]+)[:/](?<port>[0-9]+);(?<transport>[a-z]+);(?<sessionIdentifier>[a-z]+)[^\\]]+\\]");
+
+ /**
+ * Parse additional header from a formatted string
+ */
+ public static NetconfHelloMessageAdditionalHeader fromString(String additionalHeader) {
+ additionalHeader = additionalHeader.trim();
+ Matcher matcher = pattern.matcher(additionalHeader);
+ Matcher matcher2 = customHeaderPattern.matcher(additionalHeader);
+ Preconditions.checkArgument(matcher.matches(), "Additional header in wrong format %s, expected %s",
+ additionalHeader, pattern);
+
+ String username = matcher.group("username");
+ String address = matcher.group("address");
+ String port = matcher.group("port");
+ String transport = matcher.group("transport");
+ String sessionIdentifier = "client";
+ if (matcher2.matches()) {
+ sessionIdentifier = matcher2.group("sessionIdentifier");
+ }
+ return new NetconfHelloMessageAdditionalHeader(username, address, port, transport, sessionIdentifier);
+ }
+
+}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.util.messages;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-/**
- * Additional header can be used with hello message to carry information about
- * session's connection. Provided information can be reported via netconf
- * monitoring.
- * <pre>
- * It has pattern "[username; host-address:port; transport; session-identifier;]"
- * username - name of account on a remote
- * host-address - client's IP address
- * port - port number
- * transport - tcp, ssh
- * session-identifier - persister, client
- * Session-identifier is optional, others mandatory.
- * </pre>
- */
-public class NetconfMessageAdditionalHeader {
-
- private static final String SC = ";";
-
- public static String toString(String userName, String hostAddress, String port, String transport,
- Optional<String> sessionIdentifier) {
- Preconditions.checkNotNull(userName);
- Preconditions.checkNotNull(hostAddress);
- Preconditions.checkNotNull(port);
- Preconditions.checkNotNull(transport);
- String identifier = sessionIdentifier.isPresent() ? sessionIdentifier.get() : "";
- return "[" + userName + SC + hostAddress + ":" + port + SC + transport + SC + identifier + SC + "]"
- + System.lineSeparator();
- }
-}
import java.util.List;
import org.junit.Test;
+import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder;
import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import com.google.common.io.Files;
public class NetconfMessageFactoryTest {
@Test
public void testAuth() throws Exception {
- NetconfXMLToMessageDecoder parser = new NetconfXMLToMessageDecoder();
+ NetconfXMLToMessageDecoder parser = new NetconfXMLToHelloMessageDecoder();
File authHelloFile = new File(getClass().getResource("/netconfMessages/client_hello_with_auth.xml").getFile());
final List<Object> out = new ArrayList<>();
this.myAppData = ByteBuffer
.allocate(session.getApplicationBufferSize());
this.peerAppData = ByteBuffer.allocate(session
- .getApplicationBufferSize() * 2);
+ .getApplicationBufferSize() * 20);
this.myNetData = ByteBuffer.allocate(session.getPacketBufferSize());
- this.peerNetData = ByteBuffer.allocate(session.getPacketBufferSize() * 2);
+ this.peerNetData = ByteBuffer.allocate(session.getPacketBufferSize() * 20);
}
@Override
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
}
public List<String> getUserRoles() {
- return userRoles;
+ return userRoles == null ? Collections.<String> emptyList() : new ArrayList<String>(userRoles);
}
public void addUserRole(String string) {
package org.opendaylight.controller.usermanager;
+import java.util.Arrays;
+import java.util.List;
+
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.sal.authorization.UserLevel;
import org.springframework.security.core.GrantedAuthority;
-import java.util.Arrays;
-import java.util.List;
-
public class AuthenticatedUserTest {
static String[] roleArray;
user = new AuthenticatedUser("auser");
Assert.assertFalse(user.getAccessDate().isEmpty());
- Assert.assertNull(user.getUserRoles());
+ Assert.assertNotNull(user.getUserRoles());
}
@Test
var editButton = one.lib.dashlet.button.single("Edit Flow", one.f.flows.id.dashlet.edit, "btn-primary", "btn-mini");
var $editButton = one.lib.dashlet.button.button(editButton);
$editButton.click(function() {
- var $modal = one.f.flows.modal.initialize(true);
+ var install = flow['flow']['installInHw'];
+ var $modal = one.f.flows.modal.initialize(true,install);
$modal.modal().on('shown',function(){
var $port = $('#'+one.f.flows.id.modal.form.port);
$('#'+one.f.flows.id.modal.form.nodes).trigger("change");
return $p;
}
},
- initialize : function(edit) {
+ initialize : function(edit,install) {
var h3;
if(edit) {
h3 = "Edit Flow Entry";
if (edit) {
// bind edit flow button
$('#'+one.f.flows.id.modal.edit, $modal).click(function() {
- one.f.flows.modal.save($modal, 'true', true);
+ one.f.flows.modal.save($modal, install, true);
});
} else {
// bind add flow button
import org.opendaylight.controller.sal.match.MatchType;
import org.opendaylight.controller.sal.reader.FlowOnNode;
import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
+import org.opendaylight.controller.sal.reader.NodeDescription;
import org.opendaylight.controller.sal.utils.EtherTypes;
import org.opendaylight.controller.sal.utils.GlobalConstants;
import org.opendaylight.controller.sal.utils.HexEncode;
return userLevel.ordinal() <= AUTH_LEVEL.ordinal();
}
+ @RequestMapping(value = "/nodeInfo", method = RequestMethod.GET)
+ @ResponseBody
+ public NodeDescription getNodeInfo(HttpServletRequest request, @RequestParam(required = false) String container,
+ @RequestParam(required = true) String nodeId) {
+ List<Map<String, String>> lines = new ArrayList<Map<String, String>>();
+ String containerName = (container == null) ? GlobalConstants.DEFAULT.toString() : container;
+
+ // Derive the privilege this user has on the current container
+ String userName = request.getUserPrincipal().getName();
+ Privilege privilege = DaylightWebUtil.getContainerPrivilege(userName, containerName, this);
+
+ if (privilege != Privilege.NONE) {
+ IStatisticsManager statisticsManager = (IStatisticsManager) ServiceHelper
+ .getInstance(IStatisticsManager.class, containerName, this);
+ if(statisticsManager != null){
+ Node node = Node.fromString(nodeId);
+ NodeDescription nodeDesc = statisticsManager.getNodeDescription(node);
+ return nodeDesc;
+ }
+ }
+
+ return new NodeDescription();
+ }
+
@RequestMapping(value = "/existingNodes", method = RequestMethod.GET)
@ResponseBody
public TroubleshootingJsonBean getExistingNodes(HttpServletRequest request, @RequestParam(required = false) String container) {
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*
*/
/**Troubleshoot modules*/
one.f.troubleshooting = {
rootUrl: "/controller/web/troubleshoot",
- rightBottomDashlet: {
+ rightBottomDashlet: {
get: function() {
var $rightBottomDashlet = $("#right-bottom").find(".dashlet");
return $rightBottomDashlet;
},
setDashletHeader: function(label) {
- $("#right-bottom li a")[0].innerHTML = label;
+ $("#right-bottom li a")[0].innerHTML = label;
}
},
createTable: function(columnNames, body) {
portsDataGrid: "one_f_troubleshooting_existingNodes_id_portsDataGrid",
flowsDataGrid: "one_f_troubleshooting_existingNodes_id_flowsDataGrid",
refreshFlowsButton:"one_f_troubleshooting_existingNodes_id_refreshFlowsButton",
- refreshPortsButton:"one_f_troubleshooting_existingNodes_id_refreshPortsButton"
-
+ refreshPortsButton:"one_f_troubleshooting_existingNodes_id_refreshPortsButton",
+ modal : {
+ nodeInfo : "one_f_troubleshooting_existingNodes_id_modal_nodeInfo",
+ cancelButton : "one_f_troubleshooting_existingNodes_id_modal_cancelButton",
+ }
},
load: {
main: function($dashlet) {
$("#" + one.f.troubleshooting.existingNodes.id.portsDataGrid).datagrid({dataSource: dataSource});
});
} catch(e) {}
- }
+ }
},
ajax : function(url, callback) {
$.getJSON(url, function(data) {
data: data.nodeData,
formatter: function(items) {
$.each(items, function(index, item) {
- item["statistics"] = "<a href=\"javascript:one.f.troubleshooting.existingNodes.load.flows('" + item["nodeId"] + "');\">Flows</a>" +
+ item.nodeName = "<a href=\"javascript:one.f.troubleshooting.existingNodes.data.nodeInfo('"
+ + item.nodeId + "');\">" + item.nodeName + "</a>"
+ item["statistics"] = "<a href=\"javascript:one.f.troubleshooting.existingNodes.load.flows('" + item["nodeId"] + "');\">Flows</a>" +
" <a href=\"javascript:one.f.troubleshooting.existingNodes.load.ports('" + item["nodeId"] + "');\">Ports</a>";
});
},
result.push(tr);
});
return result;
+ },
+ nodeInfo : function(nodeId){
+ $.getJSON(one.main.constants.address.prefix + "/troubleshoot/nodeInfo?nodeId=" + nodeId, function(content) {
+ var h3 = 'Node Information'
+
+ var headers = [ 'Description','Specification'];
+
+ var attributes = ['table-striped', 'table-bordered', 'table-condensed'];
+ var $table = one.lib.dashlet.table.table(attributes);
+ var $thead = one.lib.dashlet.table.header(headers);
+ $table.append($thead);
+
+ var footer = [];
+
+ var cancelButton = one.lib.dashlet.button.single("Cancel",
+ one.f.troubleshooting.existingNodes.id.modal.nodeInfo, "", "");
+ var $cancelButton = one.lib.dashlet.button.button(cancelButton);
+ footer.push($cancelButton);
+
+ var body = []
+ $.each(content, function(key, value) {
+ var tr = {};
+ var entry = [];
+
+ entry.push(key);
+ entry.push(value);
+
+ tr.entry = entry;
+ body.push(tr);
+ });
+ var $tbody = one.lib.dashlet.table.body(body);
+ $table.append($tbody);
+
+ var $modal = one.lib.modal.spawn(one.f.troubleshooting.existingNodes.id.modal.nodeInfo, h3, $table , footer);
+ $modal.modal();
+
+ $('#'+one.f.troubleshooting.existingNodes.id.modal.nodeInfo, $modal).click(function() {
+ $modal.modal('hide');
+ });
+ });
}
}
};
$("#" + one.f.troubleshooting.uptime.id.datagrid).datagrid({dataSource: dataSource});
});
},
-
+
ajax : {
main : function(url, requestData, callback) {
$.getJSON(url, requestData, function(data) {
});
}
},
-
+
data: {
uptimeDataGrid: function(data) {
var source = new StaticDataSource({
var $p = $(document.createElement('p'));
$p.text('Please select a Flow or Ports statistics');
$p.addClass('text-center').addClass('text-info');
-
+
$dashlet.append($none)
.append($p);
}