// If the combined connection attempt failed, set the node to connection failed
LOG.debug("Futures aggregation failed");
naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
- // FIXME disconnect those which succeeded
- // just issue a delete on delegateTopologyHandler that gets handled on lower level
}
}, TypedActor.context().dispatcher());
@Override
public void onFailure(final Throwable t) {
- // FIXME unable to disconnect all the connections, what do we do now ?
+
}
});
// If the combined connection attempt failed, set the node to connection failed
LOG.debug("Futures aggregation failed");
naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
- // FIXME disconnect those which succeeded
- // just issue a delete on delegateTopologyHandler that gets handled on lower level
}
});
return;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-//TODO make a global TransactionProvider for all Netconf sessions instead of each session having one.
public class TransactionProvider implements AutoCloseable{
private static final Logger LOG = LoggerFactory.getLogger(TransactionProvider.class);
Preconditions.checkState(dataBroker != null);
final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
tx.put(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(NetconfState.class), state);
- // FIXME first attempt (right after we register to binding broker) always fails
- // Is it due to the fact that we are writing from the onSessionInitiated callback ?
try {
tx.submit().checkedGet();
LOG.debug("Netconf state updated successfully");
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
/**
* Topic registration on event-source-status-notification.
*/
-public class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration {
+class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionNotificationTopicRegistration.class);
private static final String XMLNS_URI = "http://www.w3.org/2000/xmlns/";
private final DOMNotificationListener domNotificationListener;
- private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) {
super(NotificationSourceType.ConnectionStatusChange, SourceName,
}
@Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
- if (checkNotificationPath(notificationPath) == false) {
+ if (!checkNotificationPath(notificationPath)) {
LOG.debug("Bad SchemaPath for notification try to register");
return false;
}
- ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
- if (topicIds == null) {
- topicIds = new ArrayList<>();
- topicIds.add(topicId);
- } else {
- if (topicIds.contains(topicId) == false) {
- topicIds.add(topicId);
- }
- }
+ Set<TopicId> topicIds = getTopicsForNotification(notificationPath);
+ topicIds.add(topicId);
notificationTopicMap.put(notificationPath, topicIds);
return true;
}
- @Override ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
- return notificationTopicMap.get(notificationPath);
- }
-
@Override synchronized void unRegisterNotificationTopic(TopicId topicId) {
List<SchemaPath> notificationPathToRemove = new ArrayList<>();
for (SchemaPath notifKey : notificationTopicMap.keySet()) {
- ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
+ Set<TopicId> topicList = notificationTopicMap.get(notifKey);
if (topicList != null) {
topicList.remove(topicId);
if (topicList.isEmpty()) {
import static com.google.common.util.concurrent.Futures.immediateFuture;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
+import javax.annotation.Nullable;
import javax.xml.stream.XMLStreamException;
import javax.xml.transform.dom.DOMResult;
import javax.xml.transform.dom.DOMSource;
import org.opendaylight.controller.config.util.xml.XmlUtil;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.MountPoint;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMEvent;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.messagebus.app.util.TopicDOMNotification;
import org.opendaylight.controller.messagebus.app.util.Util;
import org.opendaylight.controller.messagebus.spi.EventSource;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
/**
* NetconfEventSource serves as proxy between nodes and messagebus. Subscribers can join topic stream from this source.
- * Then they will receive notifications that matches pattern specified by topic.
+ * Then they will receive notifications from device that matches pattern specified by topic.
*/
public class NetconfEventSource implements EventSource, DOMNotificationListener {
QName.create(TopicNotification.QNAME, "payload"));
private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource";
- private final String nodeId;
- private final Node node;
-
- private final DOMMountPoint netconfMount;
- private final MountPoint mountPoint;
private final DOMNotificationPublishService domPublish;
private final Map<String, String> urnPrefixToStreamMap; // key = urnPrefix, value = StreamName
- private final List<NotificationTopicRegistration> notificationTopicRegistrationList = new ArrayList<>();
+
+ /**
+ * Map notification uri -> registrations
+ */
+ private final Multimap<String, NotificationTopicRegistration>
+ notificationTopicRegistrations = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
+ private final NetconfEventSourceMount mount;
/**
* Creates new NetconfEventSource for node. Topic notifications will be published via provided {@link DOMNotificationPublishService}
- * @param node node
* @param streamMap netconf streams from device
- * @param netconfMount
- * @param mountPoint
* @param publishService publish service
*/
- public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount,
- final MountPoint mountPoint, final DOMNotificationPublishService publishService) {
- this.netconfMount = Preconditions.checkNotNull(netconfMount);
- this.mountPoint = Preconditions.checkNotNull(mountPoint);
- this.node = Preconditions.checkNotNull(node);
+ public NetconfEventSource(final Map<String, String> streamMap, NetconfEventSourceMount mount, final DOMNotificationPublishService publishService) {
+ this.mount = mount;
this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap);
this.domPublish = Preconditions.checkNotNull(publishService);
- this.nodeId = node.getNodeId().getValue();
this.initializeNotificationTopicRegistrationList();
- LOG.info("NetconfEventSource [{}] created.", this.nodeId);
+ LOG.info("NetconfEventSource [{}] created.", mount.getNodeId());
}
+ /**
+ * Creates {@link ConnectionNotificationTopicRegistration} for connection. Also creates
+ * {@link StreamNotificationTopicRegistration} for every prefix and available stream as defined in config file.
+ */
private void initializeNotificationTopicRegistrationList() {
- notificationTopicRegistrationList
- .add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
- Optional<Map<String, Stream>> streamMap = getAvailableStreams();
- if (streamMap.isPresent()) {
- LOG.debug("Stream configuration compare...");
- for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
- final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
- LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
- if (streamMap.get().containsKey(streamName)) {
- LOG.debug("Stream containig on device");
- notificationTopicRegistrationList
- .add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName), urnPrefix, this));
- }
+ final ConnectionNotificationTopicRegistration cntr = new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this);
+ notificationTopicRegistrations
+ .put(cntr.getNotificationUrnPrefix(), cntr);
+ Map<String, Stream> availableStreams = getAvailableStreams();
+ LOG.debug("Stream configuration compare...");
+ for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
+ final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
+ LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
+ if (availableStreams.containsKey(streamName)) {
+ LOG.debug("Stream containig on device");
+ notificationTopicRegistrations
+ .put(urnPrefix, new StreamNotificationTopicRegistration(availableStreams.get(streamName), urnPrefix, this));
}
}
}
- private Optional<Map<String, Stream>> getAvailableStreams() {
-
- Map<String, Stream> streamMap = null;
- InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
- Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
-
- if (dataBroker.isPresent()) {
- LOG.debug("GET Available streams ...");
- ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
- CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx
- .read(LogicalDatastoreType.OPERATIONAL, pathStream);
-
- try {
- Optional<Streams> streams = checkFeature.checkedGet();
- if (streams.isPresent()) {
- streamMap = new HashMap<>();
- for (Stream stream : streams.get().getStream()) {
- LOG.debug("*** find stream {}", stream.getName().getValue());
- streamMap.put(stream.getName().getValue(), stream);
- }
+ private Map<String, Stream> getAvailableStreams() {
+ Map<String, Stream> streamMap = new HashMap<>();
+ final List<Stream> availableStreams;
+ try {
+ availableStreams = mount.getAvailableStreams();
+ streamMap = Maps.uniqueIndex(availableStreams, new Function<Stream, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable Stream input) {
+ return input.getName().getValue();
}
- } catch (ReadFailedException e) {
- LOG.warn("Can not read streams for node {}", this.nodeId);
- }
-
- } else {
- LOG.warn("No databroker on node {}", this.nodeId);
+ });
+ } catch (ReadFailedException e) {
+ LOG.warn("Can not read streams for node {}", mount.getNodeId());
}
-
- return Optional.fromNullable(streamMap);
+ return streamMap;
}
@Override public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
- LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId);
+ LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), mount.getNodeId());
final NotificationPattern notificationPattern = input.getNotificationPattern();
final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
return registerTopic(input.getTopicId(), matchingNotifications);
}
@Override public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
- for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+ for (NotificationTopicRegistration reg : notificationTopicRegistrations.values()) {
reg.unRegisterNotificationTopic(input.getTopicId());
}
return Util.resultRpcSuccessFor((Void) null);
private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId,
final List<SchemaPath> notificationsToSubscribe) {
+ Preconditions.checkNotNull(notificationsToSubscribe);
LOG.debug("Join topic {} - register", topicId);
JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
- if (notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false) {
- LOG.debug("Notifications to subscribe has found - count {}", notificationsToSubscribe.size());
- final Optional<DOMNotificationService> notifyService = getDOMMountPoint()
- .getService(DOMNotificationService.class);
- if (notifyService.isPresent()) {
- int registeredNotificationCount = 0;
- for (SchemaPath schemaNotification : notificationsToSubscribe) {
- for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
- LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(),
- schemaNotification.getLastComponent().getLocalName());
- if (reg.checkNotificationPath(schemaNotification)) {
- LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(),
- topicId.getValue());
- boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
- if (regSuccess) {
- registeredNotificationCount = registeredNotificationCount + 1;
- }
- }
- }
- }
- if (registeredNotificationCount > 0) {
- joinTopicStatus = JoinTopicStatus.Up;
+
+ LOG.debug("Notifications to subscribe has found - count {}", notificationsToSubscribe.size());
+ int registeredNotificationCount = 0;
+ for (SchemaPath schemaPath : notificationsToSubscribe) {
+ final Collection<NotificationTopicRegistration> topicRegistrations =
+ notificationTopicRegistrations.get(schemaPath.getLastComponent().getNamespace().toString());
+ for (NotificationTopicRegistration reg : topicRegistrations) {
+ LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(),
+ topicId.getValue());
+ boolean regSuccess = reg.registerNotificationTopic(schemaPath, topicId);
+ if (regSuccess) {
+ registeredNotificationCount = registeredNotificationCount + 1;
}
- } else {
- LOG.warn("NO DOMNotification service on node {}", this.nodeId);
}
- } else {
- LOG.debug("Notifications to subscribe has NOT found");
}
-
+ if (registeredNotificationCount > 0) {
+ joinTopicStatus = JoinTopicStatus.Up;
+ }
final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
return immediateFuture(RpcResultBuilder.success(output).build());
}
public void reActivateStreams() {
- for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
- LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId);
+ for (NotificationTopicRegistration reg : notificationTopicRegistrations.values()) {
+ LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), mount.getNodeId());
reg.reActivateNotificationSource();
}
}
public void deActivateStreams() {
- for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
- LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
+ for (NotificationTopicRegistration reg : notificationTopicRegistrations.values()) {
+ LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), mount.getNodeId());
reg.deActivateNotificationSource();
}
}
if (notification instanceof DOMEvent) {
notificationEventTime = ((DOMEvent) notification).getEventTime();
}
- for (NotificationTopicRegistration notifReg : notificationTopicRegistrationList) {
- ArrayList<TopicId> topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath);
- if (topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false) {
-
- if (notifReg instanceof StreamNotificationTopicRegistration) {
- StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration) notifReg;
- streamReg.setLastEventTime(notificationEventTime);
- }
-
- for (TopicId topicId : topicIdsForNotification) {
- publishNotification(notification, topicId);
- LOG.debug("Notification {} has been published for TopicId {}", notification.getType(),
- topicId.getValue());
- }
-
+ final String namespace = notification.getType().getLastComponent().getNamespace().toString();
+ for (NotificationTopicRegistration notifReg : notificationTopicRegistrations.get(namespace)) {
+ notifReg.setLastEventTime(notificationEventTime);
+ Set<TopicId> topicIdsForNotification = notifReg.getTopicsForNotification(notificationPath);
+ for (TopicId topicId : topicIdsForNotification) {
+ publishNotification(notification, topicId);
+ LOG.debug("Notification {} has been published for TopicId {}", notification.getType(),
+ topicId.getValue());
}
}
}
private void publishNotification(final DOMNotification notification, TopicId topicId) {
final ContainerNode topicNotification = Builders.containerBuilder().withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
.withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId))
- .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId)).withChild(encapsulate(notification))
+ .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, mount.getNodeId())).withChild(encapsulate(notification))
.build();
try {
domPublish.putNotification(new TopicDOMNotification(topicNotification));
final DOMResult result = new DOMResult(element);
- final SchemaContext context = getDOMMountPoint().getSchemaContext();
+ final SchemaContext context = mount.getSchemaContext();
final SchemaPath schemaPath = body.getType();
try {
NetconfUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
final Pattern pattern = Pattern.compile(regex);
List<SchemaPath> availableNotifications = getAvailableNotifications();
- if (availableNotifications == null || availableNotifications.isEmpty()) {
- return null;
- }
return Util.expandQname(availableNotifications, pattern);
}
@Override public void close() throws Exception {
- for (NotificationTopicRegistration streamReg : notificationTopicRegistrationList) {
+ for (NotificationTopicRegistration streamReg : notificationTopicRegistrations.values()) {
streamReg.close();
}
}
@Override public NodeKey getSourceNodeKey() {
- return getNode().getKey();
+ return mount.getNode().getKey();
}
@Override public List<SchemaPath> getAvailableNotifications() {
// add Event Source Connection status notification
availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
- // FIXME: use SchemaContextListener to get changes asynchronously
- final Set<NotificationDefinition> availableNotifications = getDOMMountPoint().getSchemaContext()
+ final Set<NotificationDefinition> availableNotifications = mount.getSchemaContext()
.getNotifications();
// add all known notifications from netconf device
for (final NotificationDefinition nd : availableNotifications) {
return availNotifList;
}
- public Node getNode() {
- return node;
- }
-
- DOMMountPoint getDOMMountPoint() {
- return netconfMount;
- }
-
- MountPoint getMountPoint() {
- return mountPoint;
- }
-
- NetconfNode getNetconfNode() {
- return node.getAugmentation(NetconfNode.class);
+ NetconfEventSourceMount getMount() {
+ return mount;
}
}
private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>();
private final DOMNotificationPublishService publishService;
private final DOMMountPointService domMounts;
- private final MountPointService mountPointService;
private ListenerRegistration<DataChangeListener> listenerRegistration;
private final EventSourceRegistry eventSourceRegistry;
Preconditions.checkNotNull(domPublish);
Preconditions.checkNotNull(domMount);
- Preconditions.checkNotNull(bindingMount);
Preconditions.checkNotNull(eventSourceRegistry);
Preconditions.checkNotNull(namespaceMapping);
this.streamMap = namespaceToStreamMapping(namespaceMapping);
this.domMounts = domMount;
- this.mountPointService = bindingMount;
this.publishService = domPublish;
this.eventSourceRegistry = eventSourceRegistry;
}
private void nodeCreated(final InstanceIdentifier<?> key, final Node node) {
Preconditions.checkNotNull(key);
- if (validateNode(node) == false) {
+ if (!validateNode(node)) {
LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString());
return;
}
private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
Preconditions.checkNotNull(key);
- if (validateNode(node) == false) {
+ if (!validateNode(node)) {
LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString());
return;
}
return eventSourceRegistry;
}
- MountPointService getMountPointService() {
- return mountPointService;
- }
-
private boolean isNetconfNode(final Node node) {
return node.getAugmentation(NetconfNode.class) != null;
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.messagebus.eventsources.netconf;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import javassist.ClassPool;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DOMService;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
+import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Facade of mounted netconf device
+ */
+class NetconfEventSourceMount {
+
+ private static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY;
+ private static final YangInstanceIdentifier STREAMS_PATH = YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build();
+ private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath
+ .create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
+
+ static{
+ final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
+ moduleInfoBackedContext.addModuleInfos(Collections.singletonList(org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.$YangModuleInfoImpl.getInstance()));
+ final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
+ Preconditions.checkState(schemaContextOptional.isPresent());
+ SchemaContext NOTIFICATIONS_SCHEMA_CTX = schemaContextOptional.get();
+
+ final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
+ CODEC_REGISTRY = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
+ CODEC_REGISTRY.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, NOTIFICATIONS_SCHEMA_CTX));
+ }
+
+ private final DOMMountPoint mountPoint;
+ private final DOMRpcService rpcService;
+ private final DOMNotificationService notificationService;
+ private final DOMDataBroker dataBroker;
+ private final Node node;
+ private final String nodeId;
+
+ public NetconfEventSourceMount(final Node node, final DOMMountPoint mountPoint) {
+ this.mountPoint = mountPoint;
+ this.node = node;
+ this.nodeId = node.getNodeId().getValue();
+ this.rpcService = getService(mountPoint, DOMRpcService.class);
+ this.notificationService = getService(mountPoint, DOMNotificationService.class);
+ this.dataBroker = getService(mountPoint, DOMDataBroker.class);
+ }
+
+ private static <T extends DOMService> T getService(DOMMountPoint mountPoint, Class<T> service) {
+ final Optional<T> optional = mountPoint.getService(service);
+ Preconditions.checkState(optional.isPresent(), "Service not present on mount point: %s", service.getName());
+ return optional.get();
+ }
+
+ Node getNode() {
+ return node;
+ }
+
+ String getNodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Invokes create-subscription rpc on mounted device stream. If lastEventTime is provided and stream supports replay,
+ * rpc will be invoked with start time parameter.
+ * @param stream stream
+ * @param lastEventTime last event time
+ * @return rpc result
+ */
+ CheckedFuture<DOMRpcResult, DOMRpcException> invokeCreateSubscription(final Stream stream, final Optional<Date> lastEventTime) {
+ final CreateSubscriptionInputBuilder inputBuilder = new CreateSubscriptionInputBuilder()
+ .setStream(stream.getName());
+ if(lastEventTime.isPresent() && stream.isReplaySupport()) {
+ final ZonedDateTime dateTime = lastEventTime.get().toInstant().atZone(ZoneId.systemDefault());
+ final String formattedDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(dateTime);
+ inputBuilder.setStartTime(new DateAndTime(formattedDate));
+ }
+ final CreateSubscriptionInput input = inputBuilder.build();
+ final ContainerNode nnInput = CODEC_REGISTRY.toNormalizedNodeRpcData(input);
+ return rpcService.invokeRpc(CREATE_SUBSCRIPTION, nnInput);
+ }
+
+ /**
+ * Invokes create-subscription rpc on mounted device stream.
+ * @param stream stream
+ * @return rpc result
+ */
+ CheckedFuture<DOMRpcResult, DOMRpcException> invokeCreateSubscription(final Stream stream) {
+ return invokeCreateSubscription(stream, Optional.absent());
+ }
+
+ /**
+ * Returns list of streams avaliable on device
+ * @return list of streams
+ * @throws ReadFailedException if data read fails
+ */
+ List<Stream> getAvailableStreams() throws ReadFailedException {
+ DOMDataReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> checkFeature = tx
+ .read(LogicalDatastoreType.OPERATIONAL, STREAMS_PATH);
+ Optional<NormalizedNode<?, ?>> streams = checkFeature.checkedGet();
+ if (streams.isPresent()) {
+ Streams s = (Streams) CODEC_REGISTRY.fromNormalizedNode(STREAMS_PATH, streams.get()).getValue();
+ return s.getStream();
+ }
+ return Collections.emptyList();
+ }
+
+ SchemaContext getSchemaContext() {
+ return mountPoint.getSchemaContext();
+ }
+
+ /**
+ * Registers notification listener to receive a set of notifications.
+ * @see DOMNotificationService#registerNotificationListener(DOMNotificationListener, SchemaPath...)
+ * @param listener listener
+ * @param notificationPath notification path
+ * @return
+ */
+ ListenerRegistration<DOMNotificationListener> registerNotificationListener(DOMNotificationListener listener, SchemaPath notificationPath) {
+ return notificationService.registerNotificationListener(listener, notificationPath);
+ }
+
+}
private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification";
private final Node node;
- private final InstanceIdentifier<?> instanceIdent;
private final NetconfEventSourceManager netconfEventSourceManager;
private ConnectionStatus currentNetconfConnStatus;
private EventSourceRegistration<NetconfEventSource> eventSourceRegistration;
Preconditions.checkNotNull(instanceIdent);
Preconditions.checkNotNull(node);
Preconditions.checkNotNull(netconfEventSourceManager);
- if (isEventSource(node) == false) {
+ if (!isEventSource(node)) {
return null;
}
- NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node,
- netconfEventSourceManager);
+ NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(node, netconfEventSourceManager);
nesr.updateStatus();
LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...", node.getNodeId().getValue());
return nesr;
return false;
}
- private NetconfEventSourceRegistration(final InstanceIdentifier<?> instanceIdent, final Node node,
- final NetconfEventSourceManager netconfEventSourceManager) {
- this.instanceIdent = instanceIdent;
+ private NetconfEventSourceRegistration(final Node node, final NetconfEventSourceManager netconfEventSourceManager) {
this.node = node;
this.netconfEventSourceManager = netconfEventSourceManager;
this.eventSourceRegistration = null;
- }
-
- public Node getNode() {
- return node;
+ this.currentNetconfConnStatus = ConnectionStatus.Connecting;
}
Optional<EventSourceRegistration<NetconfEventSource>> getEventSourceRegistration() {
}
private boolean checkConnectionStatusType(ConnectionStatus status) {
- if (status == ConnectionStatus.Connected || status == ConnectionStatus.Connecting
- || status == ConnectionStatus.UnableToConnect) {
- return true;
- }
- return false;
+ return status == ConnectionStatus.Connected || status == ConnectionStatus.Connecting
+ || status == ConnectionStatus.UnableToConnect;
}
private void changeStatus(ConnectionStatus newStatus) {
Preconditions.checkNotNull(newStatus);
- if (checkConnectionStatusType(newStatus) == false) {
+ Preconditions.checkState(this.currentNetconfConnStatus != null);
+ if (!checkConnectionStatusType(newStatus)) {
throw new IllegalStateException("Unknown new Netconf Connection Status");
}
- if (this.currentNetconfConnStatus == null) {
- if (newStatus == ConnectionStatus.Connected) {
- registrationEventSource();
- }
- } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting) {
- if (newStatus == ConnectionStatus.Connected) {
- if (this.eventSourceRegistration == null) {
- registrationEventSource();
- } else {
- // reactivate stream on registered event source (invoke publish notification about connection)
- this.eventSourceRegistration.getInstance().reActivateStreams();
+ switch (this.currentNetconfConnStatus) {
+ case Connecting:
+ case UnableToConnect:
+ if (newStatus == ConnectionStatus.Connected) {
+ if (this.eventSourceRegistration == null) {
+ registrationEventSource();
+ } else {
+ // reactivate stream on registered event source (invoke publish notification about connection)
+ this.eventSourceRegistration.getInstance().reActivateStreams();
+ }
}
- }
- } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) {
-
- if (newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect) {
- // deactivate streams on registered event source (invoke publish notification about connection)
- this.eventSourceRegistration.getInstance().deActivateStreams();
- }
- } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect) {
- if (newStatus == ConnectionStatus.Connected) {
- if (this.eventSourceRegistration == null) {
- registrationEventSource();
- } else {
- // reactivate stream on registered event source (invoke publish notification about connection)
- this.eventSourceRegistration.getInstance().reActivateStreams();
+ break;
+ case Connected:
+ if (newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect) {
+ // deactivate streams on registered event source (invoke publish notification about connection)
+ this.eventSourceRegistration.getInstance().deActivateStreams();
}
- }
- } else {
- throw new IllegalStateException("Unknown current Netconf Connection Status");
+ break;
+ default:
+ throw new IllegalStateException("Unknown current Netconf Connection Status");
}
this.currentNetconfConnStatus = newStatus;
}
private void registrationEventSource() {
- final Optional<MountPoint> mountPoint = netconfEventSourceManager.getMountPointService()
- .getMountPoint(instanceIdent);
final Optional<DOMMountPoint> domMountPoint = netconfEventSourceManager.getDomMounts()
.getMountPoint(domMountPath(node.getNodeId()));
EventSourceRegistration<NetconfEventSource> registration = null;
- if (domMountPoint.isPresent() && mountPoint.isPresent()) {
- final NetconfEventSource netconfEventSource = new NetconfEventSource(node,
- netconfEventSourceManager.getStreamMap(), domMountPoint.get(), mountPoint.get(),
+ if (domMountPoint.isPresent()/* && mountPoint.isPresent()*/) {
+ NetconfEventSourceMount mount = new NetconfEventSourceMount(node, domMountPoint.get());
+ final NetconfEventSource netconfEventSource = new NetconfEventSource(
+ netconfEventSourceManager.getStreamMap(),
+ mount,
netconfEventSourceManager.getPublishService());
registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource);
LOG.info("Event source {} has been registered", node.getNodeId().getValue());
*/
package org.opendaylight.netconf.messagebus.eventsources.netconf;
-import java.util.ArrayList;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
/**
* Notification topic registration.
*/
-public abstract class NotificationTopicRegistration implements AutoCloseable {
+abstract class NotificationTopicRegistration implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NotificationTopicRegistration.class);
public enum NotificationSourceType {
NetconfDeviceStream,
- ConnectionStatusChange;
+ ConnectionStatusChange
}
private boolean active;
private final String sourceName;
private final String notificationUrnPrefix;
private boolean replaySupported;
+ private Date lastEventTime;
+ protected final ConcurrentHashMap<SchemaPath, Set<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName,
String notificationUrnPrefix) {
return notificationUrnPrefix;
}
+ /**
+ * Returns registered topics for given notification path.
+ * @param notificationPath path
+ * @return topicIds
+ */
+ Set<TopicId> getTopicsForNotification(SchemaPath notificationPath) {
+ final Set<TopicId> topicIds = notificationTopicMap.get(notificationPath);
+ return topicIds != null ? topicIds : Sets.newHashSet();
+ }
+
/**
* Checks, if notification is from namespace belonging to this registration.
* @param notificationPath path
* @return true, if notification belongs to registration namespace
*/
- public boolean checkNotificationPath(SchemaPath notificationPath) {
+ boolean checkNotificationPath(SchemaPath notificationPath) {
if (notificationPath == null) {
return false;
}
return nameSpace.startsWith(getNotificationUrnPrefix());
}
+ Optional<Date> getLastEventTime() {
+ return Optional.fromNullable(lastEventTime);
+ }
+
+ void setLastEventTime(Date lastEventTime) {
+ this.lastEventTime = lastEventTime;
+ }
+
abstract void activateNotificationSource();
abstract void deActivateNotificationSource();
*/
abstract void unRegisterNotificationTopic(TopicId topicId);
- /**
- * Returns registered topics for given path.
- * @param notificationPath path
- * @return topicIds
- */
- abstract ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath);
-
public boolean isReplaySupported() {
return replaySupported;
}
*/
package org.opendaylight.netconf.messagebus.eventsources.netconf;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Topic registration for notification stream.
+ * Topic registration for notification with specified namespace from stream.
*/
-public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
+class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
- private static final NodeIdentifier STREAM_QNAME = NodeIdentifier.create(
- QName.create(CreateSubscriptionInput.QNAME, "stream"));
- private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath
- .create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
- private static final NodeIdentifier START_TIME_SUBSCRIPTION = NodeIdentifier.create(
- QName.create(CreateSubscriptionInput.QNAME, "startTime"));
- private static final NodeIdentifier CREATE_SUBSCRIPTION_INPUT = NodeIdentifier.create(
- CreateSubscriptionInput.QNAME);
-
- final private DOMMountPoint domMountPoint;
- final private String nodeId;
- final private NetconfEventSource netconfEventSource;
- final private Stream stream;
- private Date lastEventTime;
-
- private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
- private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
+
+ private final String nodeId;
+ private final NetconfEventSource netconfEventSource;
+ private final NetconfEventSourceMount mountPoint;
+ private final ConcurrentHashMap<SchemaPath, ListenerRegistration<DOMNotificationListener>> notificationRegistrationMap = new ConcurrentHashMap<>();
+ private final Stream stream;
/**
* Creates registration to notification stream.
public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
NetconfEventSource netconfEventSource) {
super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
- this.domMountPoint = netconfEventSource.getDOMMountPoint();
- this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString();
this.netconfEventSource = netconfEventSource;
+ this.mountPoint = netconfEventSource.getMount();
+ this.nodeId = mountPoint.getNode().getNodeId().getValue();
this.stream = stream;
- this.lastEventTime = null;
- setReplaySupported(this.stream.isReplaySupport());
+ setReplaySupported(stream.isReplaySupport());
setActive(false);
LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
}
* Subscribes to notification stream associated with this registration.
*/
void activateNotificationSource() {
- if (isActive() == false) {
+ if (!isActive()) {
LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
- final ContainerNode input = Builders.containerBuilder()
- .withNodeIdentifier(CREATE_SUBSCRIPTION_INPUT)
- .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())).build();
- CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get()
- .invokeRpc(CREATE_SUBSCRIPTION, input);
+ final CheckedFuture<DOMRpcResult, DOMRpcException> result = mountPoint.invokeCreateSubscription(stream);
try {
- csFuture.checkedGet();
+ result.checkedGet();
setActive(true);
} catch (DOMRpcException e) {
LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
setActive(false);
- return;
}
} else {
LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
void reActivateNotificationSource() {
if (isActive()) {
LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
- DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> inputBuilder = Builders.containerBuilder()
- .withNodeIdentifier(CREATE_SUBSCRIPTION_INPUT)
- .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()));
- if (isReplaySupported() && this.getLastEventTime() != null) {
- inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime()));
- }
- final ContainerNode input = inputBuilder.build();
- CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get()
- .invokeRpc(CREATE_SUBSCRIPTION, input);
+ final CheckedFuture<DOMRpcResult, DOMRpcException> result;
+ result = mountPoint.invokeCreateSubscription(stream, getLastEventTime());
try {
- csFuture.checkedGet();
+ result.checkedGet();
setActive(true);
} catch (DOMRpcException e) {
LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
setActive(false);
- return;
}
}
}
private void closeStream() {
if (isActive()) {
- for (ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()) {
+ for (ListenerRegistration<DOMNotificationListener> reg : notificationRegistrationMap.values()) {
reg.close();
}
notificationRegistrationMap.clear();
return getSourceName();
}
- @Override ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
- return notificationTopicMap.get(notificationPath);
- }
-
@Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
-
- if (checkNotificationPath(notificationPath) == false) {
+ if (!checkNotificationPath(notificationPath)) {
LOG.debug("Bad SchemaPath for notification try to register");
return false;
}
- final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
- if (notifyService.isPresent() == false) {
- LOG.debug("DOMNotificationService is not present");
- return false;
- }
-
activateNotificationSource();
- if (isActive() == false) {
+ if (!isActive()) {
LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(),
notificationPath.toString());
return false;
}
- ListenerRegistration<NetconfEventSource> registration = notifyService.get()
- .registerNotificationListener(this.netconfEventSource, notificationPath);
+ ListenerRegistration<DOMNotificationListener> registration = mountPoint.registerNotificationListener(netconfEventSource, notificationPath);
notificationRegistrationMap.put(notificationPath, registration);
- ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
- if (topicIds == null) {
- topicIds = new ArrayList<>();
- topicIds.add(topicId);
- } else {
- if (topicIds.contains(topicId) == false) {
- topicIds.add(topicId);
- }
- }
+ Set<TopicId> topicIds = getTopicsForNotification(notificationPath);
+ topicIds.add(topicId);
notificationTopicMap.put(notificationPath, topicIds);
return true;
@Override synchronized void unRegisterNotificationTopic(TopicId topicId) {
List<SchemaPath> notificationPathToRemove = new ArrayList<>();
for (SchemaPath notifKey : notificationTopicMap.keySet()) {
- ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
+ Set<TopicId> topicList = notificationTopicMap.get(notifKey);
if (topicList != null) {
topicList.remove(topicId);
if (topicList.isEmpty()) {
}
for (SchemaPath notifKey : notificationPathToRemove) {
notificationTopicMap.remove(notifKey);
- ListenerRegistration<NetconfEventSource> reg = notificationRegistrationMap.remove(notifKey);
+ ListenerRegistration<DOMNotificationListener> reg = notificationRegistrationMap.remove(notifKey);
if (reg != null) {
reg.close();
}
}
}
- Optional<Date> getLastEventTime() {
- return Optional.fromNullable(lastEventTime);
- }
-
- void setLastEventTime(Date lastEventTime) {
- this.lastEventTime = lastEventTime;
- }
-
@Override public void close() throws Exception {
closeStream();
}
import static org.hamcrest.CoreMatchers.hasItems;
import static org.mockito.Mockito.verify;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.Set;
import javax.xml.transform.dom.DOMSource;
import org.junit.Assert;
import org.junit.Before;
final TopicId topic1 = registerTopic("topic1");
final TopicId topic2 = registerTopic("topic2");
final TopicId topic3 = registerTopic("topic3");
- final ArrayList<TopicId> notificationTopicIds = registration.getNotificationTopicIds(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
+ final Set<TopicId> notificationTopicIds = registration.getTopicsForNotification(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
Assert.assertNotNull(notificationTopicIds);
Assert.assertThat(notificationTopicIds, hasItems(topic1, topic2, topic3));
registration.unRegisterNotificationTopic(topic3);
- final ArrayList<TopicId> afterUnregister = registration.getNotificationTopicIds(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
+ final Set<TopicId> afterUnregister = registration.getTopicsForNotification(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
Assert.assertNotNull(afterUnregister);
Assert.assertThat(afterUnregister, hasItems(topic1, topic2));
Assert.assertFalse(afterUnregister.contains(topic3));
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.junit.Test;
import org.opendaylight.controller.config.yang.messagebus.netconf.NamespaceToStream;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.MountPoint;
import org.opendaylight.controller.md.sal.binding.api.MountPointService;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.messagebus.spi.EventSource;
-import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class NetconfEventSourceManagerTest {
DataBroker dataBrokerMock = mock(DataBroker.class);
DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
domMountPointServiceMock = mock(DOMMountPointService.class);
- mountPointServiceMock = mock(MountPointService.class);
eventSourceTopologyMock = mock(EventSourceRegistry.class);
rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
eventSourceRegistry = mock(EventSourceRegistry.class);
doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(
AsyncDataBroker.DataChangeScope.SUBTREE));
- Optional<DOMMountPoint> optionalDomMountServiceMock = (Optional<DOMMountPoint>) mock(Optional.class);
- doReturn(true).when(optionalDomMountServiceMock).isPresent();
- doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
-
DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
- doReturn(domMountPointMock).when(optionalDomMountServiceMock).get();
-
-
- Optional optionalBindingMountMock = mock(Optional.class);
- doReturn(true).when(optionalBindingMountMock).isPresent();
-
- MountPoint mountPointMock = mock(MountPoint.class);
- doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
- doReturn(mountPointMock).when(optionalBindingMountMock).get();
-
- Optional optionalMpDataBroker = mock(Optional.class);
- DataBroker mpDataBroker = mock(DataBroker.class);
- doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
- doReturn(true).when(optionalMpDataBroker).isPresent();
- doReturn(mpDataBroker).when(optionalMpDataBroker).get();
+ Optional<DOMMountPoint> optionalDomMountServiceMock = Optional.of(domMountPointMock);
+ doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
+ DOMDataBroker mpDataBroker = mock(DOMDataBroker.class);
+ doReturn(Optional.of(mpDataBroker)).when(domMountPointMock).getService(DOMDataBroker.class);
+ doReturn(Optional.of(mock(DOMRpcService.class))).when(domMountPointMock).getService(DOMRpcService.class);
+ doReturn(Optional.of(mock(DOMNotificationService.class))).when(domMountPointMock).getService(DOMNotificationService.class);
- ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
+ DOMDataReadOnlyTransaction rtx = mock(DOMDataReadOnlyTransaction.class);
doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
- CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
- InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
- doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
- Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
- doReturn(avStreams).when(checkFeature).checkedGet();
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> checkFeature = Futures.immediateCheckedFuture(Optional.of(NetconfTestUtils.getStreamsNode("stream-1")));
- EventSourceRegistration esrMock = mock(EventSourceRegistration.class);
+ YangInstanceIdentifier pathStream = YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build();
+ doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
netconfEventSourceManager =
NetconfEventSourceManager
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.messagebus.eventsources.netconf;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class NetconfEventSourceMountTest {
+
+ public static final String STREAM_1 = "stream-1";
+ public static final String STREAM_2 = "stream-2";
+ @Mock
+ private DOMMountPoint domMountPoint;
+ @Mock
+ DOMDataBroker dataBroker;
+ @Mock
+ DOMRpcService rpcService;
+ @Mock
+ private DOMDataReadOnlyTransaction tx;
+ private NetconfEventSourceMount mount;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ doReturn(Optional.of(dataBroker)).when(domMountPoint).getService(DOMDataBroker.class);
+ doReturn(Optional.of(rpcService)).when(domMountPoint).getService(DOMRpcService.class);
+ doReturn(Optional.of(mock(DOMNotificationService.class))).when(domMountPoint).getService(DOMNotificationService.class);
+ doReturn(tx).when(dataBroker).newReadOnlyTransaction();
+ final YangInstanceIdentifier path = YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build();
+ final NormalizedNode<?, ?> streamsNode = NetconfTestUtils.getStreamsNode(STREAM_1, STREAM_2);
+ doReturn(Futures.immediateCheckedFuture(Optional.of(streamsNode))).when(tx).read(LogicalDatastoreType.OPERATIONAL, path);
+ mount = new NetconfEventSourceMount(NetconfTestUtils.getNode("node-1"), domMountPoint);
+ }
+
+ @Test
+ public void testInvokeCreateSubscription() throws Exception {
+ Stream stream = new StreamBuilder()
+ .setName(new StreamNameType(STREAM_1))
+ .build();
+ mount.invokeCreateSubscription(stream, Optional.absent());
+ final SchemaPath type = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
+ ArgumentCaptor<ContainerNode> captor = ArgumentCaptor.forClass(ContainerNode.class);
+ verify(rpcService).invokeRpc(eq(type), captor.capture());
+ Assert.assertEquals(STREAM_1, getStreamName(captor.getValue()));
+ }
+
+ @Test
+ public void testInvokeCreateSubscription1() throws Exception {
+ Stream stream = new StreamBuilder()
+ .setName(new StreamNameType(STREAM_1))
+ .setReplaySupport(true)
+ .build();
+ final Date date = new Date();
+ mount.invokeCreateSubscription(stream, Optional.of(date));
+ final SchemaPath type = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
+ ArgumentCaptor<ContainerNode> captor = ArgumentCaptor.forClass(ContainerNode.class);
+ verify(rpcService).invokeRpc(eq(type), captor.capture());
+ Assert.assertEquals(STREAM_1, getStreamName(captor.getValue()));
+ final String expDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(date.toInstant().atZone(ZoneId.systemDefault()));
+ final Optional<LeafNode> actual = (Optional<LeafNode>) getDate(captor.getValue());
+ Assert.assertTrue(actual.isPresent());
+ String actualDate = (String) actual.get().getValue();
+ Assert.assertEquals(expDate, actualDate);
+ }
+
+ @Test
+ public void testInvokeCreateSubscription2() throws Exception {
+ Stream stream = new StreamBuilder()
+ .setName(new StreamNameType(STREAM_1))
+ .setReplaySupport(true)
+ .build();
+ mount.invokeCreateSubscription(stream, Optional.absent());
+ final SchemaPath type = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
+ ArgumentCaptor<ContainerNode> captor = ArgumentCaptor.forClass(ContainerNode.class);
+ verify(rpcService).invokeRpc(eq(type), captor.capture());
+ Assert.assertEquals(STREAM_1, getStreamName(captor.getValue()));
+ final Optional<LeafNode> date = (Optional<LeafNode>) getDate(captor.getValue());
+ Assert.assertFalse(date.isPresent());
+
+ }
+
+ @Test
+ public void testGetAvailableStreams() throws Exception {
+ final List<Stream> availableStreams = mount.getAvailableStreams();
+ Assert.assertEquals(2, availableStreams.size());
+ final List<String> streamNames = Lists.transform(availableStreams, new Function<Stream, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable Stream input) {
+ return input.getName().getValue();
+ }
+ });
+ streamNames.contains(STREAM_1);
+ streamNames.contains(STREAM_2);
+ }
+
+ private String getStreamName(ContainerNode value) {
+ YangInstanceIdentifier.NodeIdentifier STREAM = new YangInstanceIdentifier.NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME, "stream"));
+ return (String) value.getChild(STREAM).get().getValue();
+ }
+
+ private Optional<?> getDate(ContainerNode value) {
+ YangInstanceIdentifier.NodeIdentifier START_TIME = new YangInstanceIdentifier.NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME, "startTime"));
+ return value.getChild(START_TIME);
+ }
+}
\ No newline at end of file
*/
package org.opendaylight.netconf.messagebus.eventsources.netconf;
-import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.only;
import static org.mockito.Mockito.verify;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import java.net.URI;
-import java.util.Collections;
+import com.google.common.util.concurrent.Futures;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import org.opendaylight.controller.md.sal.binding.api.BindingService;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.MountPoint;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.md.sal.dom.api.DOMService;
import org.opendaylight.controller.messagebus.app.util.TopicDOMNotification;
-import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
public class NetconfEventSourceTest {
+
+ private static final SchemaPath notification1Path = SchemaPath.create(true, QName.create("ns1", "1970-01-15", "not1"));
+ private static final SchemaPath notification2Path = SchemaPath.create(true, QName.create("ns2", "1980-02-18", "not2"));
+
NetconfEventSource netconfEventSource;
- DOMMountPoint domMountPointMock;
- MountPoint mountPointMock;
- JoinTopicInput joinTopicInputMock;
+
+ @Mock
DOMNotificationPublishService domNotificationPublishServiceMock;
- DOMNotification notification;
+ @Mock
+ DOMNotification matchnigNotification;
+ @Mock
+ DOMNotification nonMachtingNotification;
+ @Mock
+ NetconfEventSourceMount mount;
@Before
public void setUp() throws Exception {
- Map<String, String> streamMap = new HashMap<>();
- streamMap.put("uriStr1", "string2");
- domMountPointMock = mock(DOMMountPoint.class);
- mountPointMock = mock(MountPoint.class);
- domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
- RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
- Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
- NotificationsService notificationsServiceMock = mock(NotificationsService.class);
- doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
-
- final NotificationDefinition notificationDefinitionMock = getNotificationDefinitionMock("urn:cisco:params:xml:ns:yang:messagebus:eventsource", "2014-12-02", "event-source-status");
- Set<NotificationDefinition> notifications = Collections.singleton(notificationDefinitionMock);
- ContainerNode node = Builders.containerBuilder()
- .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(QName.create("notification-namespace", "2016-02-17", "name")))
- .build();
- notification = mock(DOMNotification.class);
- doReturn(node).when(notification).getBody();
- doReturn(notificationDefinitionMock.getPath()).when(notification).getType();
- SchemaContext schema = mock(SchemaContext.class);
- doReturn(notifications).when(schema).getNotifications();
- doReturn(schema).when(domMountPointMock).getSchemaContext();
- doReturn(Optional.of(mock(DOMNotificationService.class))).when(domMountPointMock).getService(DOMNotificationService.class);
-
- Optional<DataBroker> optionalMpDataBroker = (Optional<DataBroker>) mock(Optional.class);
- DataBroker mpDataBroker = mock(DataBroker.class);
- doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
- doReturn(true).when(optionalMpDataBroker).isPresent();
- doReturn(mpDataBroker).when(optionalMpDataBroker).get();
-
- ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
- doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
- CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
- InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
- doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
- Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
- doReturn(avStreams).when(checkFeature).checkedGet();
+ MockitoAnnotations.initMocks(this);
+ //init notification mocks
+ doReturn(notification1Path).when(matchnigNotification).getType();
+ doReturn(notification2Path).when(nonMachtingNotification).getType();
+ DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> body = Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("ns1", "1970-01-15", "not1data")));
+ doReturn(body.build()).when(matchnigNotification).getBody();
+ //init schema context mock
+ Set<NotificationDefinition> notifications = new HashSet<>();
+ notifications.add(getNotificationDefinitionMock(notification1Path.getLastComponent()));
+ notifications.add(getNotificationDefinitionMock(notification2Path.getLastComponent()));
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ doReturn(notifications).when(schemaContext).getNotifications();
+ //init mount point mock
+ List<Stream> streams = new ArrayList<>();
+ streams.add(createStream("stream-1"));
+ streams.add(createStream("stream-2"));
+ doReturn(streams).when(mount).getAvailableStreams();
+ doReturn(schemaContext).when(mount).getSchemaContext();
+ doReturn(Futures.immediateCheckedFuture(null)).when(mount).invokeCreateSubscription(any(), any());
+ doReturn(Futures.immediateCheckedFuture(null)).when(mount).invokeCreateSubscription(any());
+ doReturn(mock(ListenerRegistration.class)).when(mount).registerNotificationListener(any(), any());
+ final Node nodeId1 = NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected, NetconfTestUtils.notification_capability_prefix);
+ doReturn(nodeId1).when(mount).getNode();
+ Map<String, String> streamMap = new HashMap<>();
+ streamMap.put(notification1Path.getLastComponent().getNamespace().toString(), "stream-1");
netconfEventSource = new NetconfEventSource(
- NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected,
- NetconfTestUtils.notification_capability_prefix),
streamMap,
- domMountPointMock,
- mountPointMock ,
+ mount,
domNotificationPublishServiceMock);
}
@Test
- public void joinTopicTest() throws Exception{
- joinTopicTestHelper();
- assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
- }
-
- @Test
- public void testOnNotification() throws Exception {
+ public void testJoinTopicOnNotification() throws Exception {
final JoinTopicInput topic1 = new JoinTopicInputBuilder()
.setTopicId(TopicId.getDefaultInstance("topic1"))
- .setNotificationPattern(NotificationPattern.getDefaultInstance(".*"))
+ .setNotificationPattern(NotificationPattern.getDefaultInstance(".*ns1"))
.build();
netconfEventSource.joinTopic(topic1);
-
ArgumentCaptor<DOMNotification> captor = ArgumentCaptor.forClass(DOMNotification.class);
- netconfEventSource.onNotification(notification);
+ //handle notification matching topic namespace
+ netconfEventSource.onNotification(matchnigNotification);
+ //handle notification that does not match topic namespace
+ netconfEventSource.onNotification(nonMachtingNotification);
+ //only matching notification should be published
verify(domNotificationPublishServiceMock).putNotification(captor.capture());
final TopicDOMNotification value = (TopicDOMNotification) captor.getValue();
- final Object actualTopicId = value.getBody().getChild(new YangInstanceIdentifier.NodeIdentifier(QName.create("urn:cisco:params:xml:ns:yang:messagebus:eventaggregator", "2014-12-02", "topic-id"))).get().getValue();
+ final QName qname = TopicNotification.QNAME;
+ final YangInstanceIdentifier.NodeIdentifier topicIdNode =
+ new YangInstanceIdentifier.NodeIdentifier(QName.create(qname.getNamespace().toString(), qname.getFormattedRevision(), "topic-id"));
+ final Object actualTopicId = value.getBody().getChild(topicIdNode).get().getValue();
Assert.assertEquals(topic1.getTopicId(), actualTopicId);
}
- private void joinTopicTestHelper() throws Exception{
- joinTopicInputMock = mock(JoinTopicInput.class);
- TopicId topicId = new TopicId("topicID007");
- doReturn(topicId).when(joinTopicInputMock).getTopicId();
- NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
- doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
- doReturn("uriStr1").when(notificationPatternMock).getValue();
-
- SchemaContext schemaContextMock = mock(SchemaContext.class);
- doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
- Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
- NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
- notificationDefinitionSet.add(notificationDefinitionMock);
-
- URI uri = new URI("uriStr1");
- QName qName = new QName(uri, "localName1");
- org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
- doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
- doReturn(schemaPath).when(notificationDefinitionMock).getPath();
-
- Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
- doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
- doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
-
- DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
- doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
- ListenerRegistration<NetconfEventSource> listenerRegistrationMock = (ListenerRegistration<NetconfEventSource>)mock(ListenerRegistration.class);
- doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(SchemaPath.class));
-
- Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
- doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
- doReturn(true).when(optionalMock).isPresent();
- DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
- doReturn(domRpcServiceMock).when(optionalMock).get();
- CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
- doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
+ @Test
+ public void testDisjoinTopicOnNotification() throws Exception {
+ final TopicId topicId = TopicId.getDefaultInstance("topic1");
+ final JoinTopicInput topic1 = new JoinTopicInputBuilder()
+ .setTopicId(topicId)
+ .setNotificationPattern(NotificationPattern.getDefaultInstance(".*ns1"))
+ .build();
+ netconfEventSource.joinTopic(topic1);
+
+ //handle notification matching topic namespace
+ netconfEventSource.onNotification(matchnigNotification);
+ //disjoin topic
+ DisJoinTopicInput disjoinTopic = new DisJoinTopicInputBuilder().setTopicId(topicId).build();
+ netconfEventSource.disJoinTopic(disjoinTopic);
+ netconfEventSource.onNotification(matchnigNotification);
+ //topic notification published only once before disjoin
+ verify(domNotificationPublishServiceMock, only()).putNotification(any());
+ }
+ private Stream createStream(String name) {
+ return new StreamBuilder()
+ .setName(new StreamNameType(name))
+ .setReplaySupport(true)
+ .build();
}
- private NotificationDefinition getNotificationDefinitionMock(String namespace, String revision, String name) {
+ private NotificationDefinition getNotificationDefinitionMock(QName qName) {
NotificationDefinition notification = mock(NotificationDefinition.class);
- final QName qName = QName.create(namespace, revision, name);
doReturn(qName).when(notification).getQName();
doReturn(SchemaPath.create(true, qName)).when(notification).getPath();
return notification;
import com.google.common.base.Optional;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
public final class NetconfTestUtils {
return Optional.of(streams);
}
+ public static NormalizedNode<?, ?> getStreamsNode(String... streamName) {
+ QName nameNode = QName.create(Stream.QNAME, "name");
+ Set<MapEntryNode> streamSet = new HashSet<>();
+ for (String s : streamName) {
+ MapEntryNode stream = Builders.mapEntryBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifierWithPredicates(Stream.QNAME, nameNode, s))
+ .withChild(Builders.leafBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(nameNode))
+ .withValue(s)
+ .build())
+ .build();
+ streamSet.add(stream);
+ }
+
+ CollectionNodeBuilder<MapEntryNode, MapNode> streams = Builders.mapBuilder().withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(Stream.QNAME));
+ for (MapEntryNode mapEntryNode : streamSet) {
+ streams.withChild(mapEntryNode);
+ }
+ return Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(Streams.QNAME))
+ .withChild(streams.build())
+ .build();
+ }
+
}
import static org.hamcrest.CoreMatchers.hasItems;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
-import java.util.ArrayList;
import java.util.Date;
+import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
public class StreamNotificationTopicRegistrationTest {
- private static final String NS = CreateSubscriptionInput.QNAME.getNamespace().toString();
- private static final String REV = CreateSubscriptionInput.QNAME.getFormattedRevision();
- private static final YangInstanceIdentifier.NodeIdentifier STREAM = new YangInstanceIdentifier.NodeIdentifier(QName.create(NS, REV, "stream"));
- private static final YangInstanceIdentifier.NodeIdentifier START_TIME = new YangInstanceIdentifier.NodeIdentifier(QName.create(NS, REV, "startTime"));
private static final String STREAM_NAME = "stream-1";
- private static final SchemaPath createSubscription = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
private static final String PREFIX = ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString();
@Mock
private NetconfEventSource source;
@Mock
- private DOMMountPoint mountPoint;
- @Mock
- private DOMRpcService service;
+ private NetconfEventSourceMount mount;
@Mock
private DOMNotificationService reference;
@Mock
private ListenerRegistration<DOMNotificationListener> listenerRegistration;
private StreamNotificationTopicRegistration registration;
+ private Stream stream;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
Node node = new NodeBuilder().setNodeId(NodeId.getDefaultInstance("node-id")).build();
- when(source.getNode()).thenReturn(node);
- when(source.getDOMMountPoint()).thenReturn(mountPoint);
-
- when(mountPoint.getService(DOMRpcService.class)).thenReturn(Optional.of(service));
- when(mountPoint.getService(DOMNotificationService.class)).thenReturn(Optional.of(reference));
- when(reference.registerNotificationListener(any(), eq(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH)))
+ when(mount.getNode()).thenReturn(node);
+ when(mount.registerNotificationListener(source, ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH))
.thenReturn(listenerRegistration);
- when(service.invokeRpc(eq(createSubscription), any())).thenReturn(Futures.immediateCheckedFuture(null));
+ when(mount.invokeCreateSubscription(any(), any())).thenReturn(Futures.immediateCheckedFuture(null));
+ when(mount.invokeCreateSubscription(any())).thenReturn(Futures.immediateCheckedFuture(null));
- Stream stream = new StreamBuilder().setName(StreamNameType.getDefaultInstance(STREAM_NAME)).setReplaySupport(true).build();
+ when(source.getMount()).thenReturn(mount);
+ stream = new StreamBuilder().setName(StreamNameType.getDefaultInstance(STREAM_NAME)).setReplaySupport(true).build();
registration = new StreamNotificationTopicRegistration(stream, PREFIX, source);
}
@Test
public void testActivateNotificationSource() throws Exception {
registration.activateNotificationSource();
-
- ArgumentCaptor<ContainerNode> captor = ArgumentCaptor.forClass(ContainerNode.class);
Assert.assertTrue(registration.isActive());
- verify(service).invokeRpc(eq(createSubscription), captor.capture());
- checkStreamName(captor.getValue());
+ verify(mount).invokeCreateSubscription(stream);
+
}
@Test
registration.setActive(true);
registration.reActivateNotificationSource();
- ArgumentCaptor<ContainerNode> captor = ArgumentCaptor.forClass(ContainerNode.class);
Assert.assertTrue(registration.isActive());
- verify(service).invokeRpc(eq(createSubscription), captor.capture());
- checkStreamName(captor.getValue());
- checkDate(captor.getValue(), Optional.absent());
+ verify(mount).invokeCreateSubscription(stream, Optional.absent());
}
@Test
registration.setLastEventTime(lastEventTime);
registration.reActivateNotificationSource();
- ArgumentCaptor<ContainerNode> captor = ArgumentCaptor.forClass(ContainerNode.class);
Assert.assertTrue(registration.isActive());
- verify(service).invokeRpc(eq(createSubscription), captor.capture());
- checkStreamName(captor.getValue());
- checkDate(captor.getValue(), Optional.of(lastEventTime));
+ verify(mount).invokeCreateSubscription(stream, Optional.of(lastEventTime));
}
@Test
final TopicId topic1 = registerTopic("topic1");
final TopicId topic2 = registerTopic("topic2");
final TopicId topic3 = registerTopic("topic3");
- final ArrayList<TopicId> notificationTopicIds = registration.getNotificationTopicIds(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
+ final Set<TopicId> notificationTopicIds = registration.getTopicsForNotification(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
Assert.assertNotNull(notificationTopicIds);
Assert.assertThat(notificationTopicIds, hasItems(topic1, topic2, topic3));
registration.unRegisterNotificationTopic(topic3);
- final ArrayList<TopicId> afterUnregister = registration.getNotificationTopicIds(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
+ final Set<TopicId> afterUnregister = registration.getTopicsForNotification(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
Assert.assertNotNull(afterUnregister);
Assert.assertThat(afterUnregister, hasItems(topic1, topic2));
Assert.assertFalse(afterUnregister.contains(topic3));
return topic;
}
- private void checkStreamName(ContainerNode value) {
- final String streamName = (String) value.getChild(STREAM).get().getValue();
- Assert.assertEquals(STREAM_NAME, streamName);
- }
-
- private void checkDate(ContainerNode value, Optional<Date> lastEventTime) {
- final Optional<Date> startTime = (Optional<Date>) value.getChild(START_TIME).get().getValue();
- Assert.assertEquals(lastEventTime, startTime);
- }
}
\ No newline at end of file
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration.NetconfClientProtocol;
import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
import org.opendaylight.protocol.framework.NeverReconnectStrategy;
private final long sessionId;
public TestingNetconfClient(String clientLabel,
- NetconfClientDispatcher netconfClientDispatcher, final NetconfClientConfiguration config) throws InterruptedException {
+ NetconfClientDispatcher netconfClientDispatcher, final NetconfClientConfiguration config) throws InterruptedException {
this.label = clientLabel;
sessionListener = config.getSessionListener();
Future<NetconfClientSession> clientFuture = netconfClientDispatcher.createClient(config);
- clientSession = get(clientFuture);//TODO: make static
+ clientSession = get(clientFuture);
this.sessionId = clientSession.getSessionId();
}
- private NetconfClientSession get(Future<NetconfClientSession> clientFuture) throws InterruptedException {
+ private static NetconfClientSession get(Future<NetconfClientSession> clientFuture) throws InterruptedException {
try {
return clientFuture.get();
} catch (CancellationException e) {
- throw new RuntimeException("Cancelling " + this, e);
+ throw new RuntimeException("Cancelling " + TestingNetconfClient.class.getSimpleName(), e);
} catch (ExecutionException e) {
- throw new IllegalStateException("Unable to create " + this, e);
+ throw new IllegalStateException("Unable to create " + TestingNetconfClient.class.getSimpleName(), e);
}
}
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
import org.opendaylight.netconf.nettyutil.AbstractNetconfSession;
import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder;
import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
-import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
-import org.opendaylight.netconf.api.NetconfMessage;
-import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.NetconfTcp;
public final class NetconfServerSession extends AbstractNetconfSession<NetconfServerSession, NetconfServerSessionListener> implements NetconfManagementSession {
private static final Logger LOG = LoggerFactory.getLogger(NetconfServerSession.class);
+ private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
private final NetconfHelloMessageAdditionalHeader header;
- private Date loginTime;
+ private ZonedDateTime loginTime;
private long inRpcSuccess, inRpcFail, outRpcError;
private volatile boolean delayedClose;
@Override
protected void sessionUp() {
Preconditions.checkState(loginTime == null, "Session is already up");
- this.loginTime = new Date();
+ this.loginTime = Instant.now().atZone(ZoneId.systemDefault());
super.sessionUp();
}
outRpcError++;
}
- public static final String ISO_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
-
private static final String dateTimePatternString = DateAndTime.PATTERN_CONSTANTS.get(0);
private static final Pattern dateTimePattern = Pattern.compile(dateTimePatternString);
builder.setSourceHost(new Host(new DomainName(header.getAddress())));
Preconditions.checkState(DateAndTime.PATTERN_CONSTANTS.size() == 1);
- String formattedDateTime = formatDateTime(loginTime);
+ String formattedDateTime = dateFormatter.format(loginTime);
Matcher matcher = dateTimePattern.matcher(formattedDateTime);
Preconditions.checkState(matcher.matches(), "Formatted datetime %s does not match pattern %s", formattedDateTime, dateTimePattern);
}
}
- private static String formatDateTime(final Date loginTime) {
- // FIXME: thread-local cache?
- SimpleDateFormat dateFormat = new SimpleDateFormat(ISO_DATE_FORMAT);
- return dateFormat.format(loginTime);
- }
-
@Override
protected NetconfServerSession thisInstance() {
return this;
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.impl;
-
-public class NetconfMonitoringServiceImplTest {
-
- // TODO redo test
-}
import com.google.common.base.Optional;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
private static final String TEST_MODULE_REV = "1970-01-01";
private static final Uri TEST_MODULE_NAMESPACE = new Uri("testModuleNamespace");
private static final String TEST_MODULE_NAME = "testModule";
- private static final Date TEST_MODULE_DATE = new Date(0);
+ private static final Date TEST_MODULE_DATE;
+
+ static {
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(1970, Calendar.JANUARY, 1);
+ TEST_MODULE_DATE = calendar.getTime();
+ }
private final Set<Capability> CAPABILITIES = new HashSet<>();
public AsyncSshHandler(final AuthenticationHandler authenticationHandler, final SshClient sshClient) throws IOException {
this.authenticationHandler = Preconditions.checkNotNull(authenticationHandler);
this.sshClient = Preconditions.checkNotNull(sshClient);
- // Start just in case
- sshClient.start();
}
public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException {
import config { prefix config; revision-date 2013-04-05; }
- description "TODO";
+ description
+ "This module contains the base YANG definitions for
+ netconf northbound notifications API";
revision "2015-08-06" {
description "Initial revision.";
operationElement.checkName(CREATE_SUBSCRIPTION);
operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
// FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
- // Waiting ofr https://git.opendaylight.org/gerrit/#/c/13763/
+ // Binding doesn't support anyxml nodes yet, so filter could not be retrieved
+ // xml -> normalized node -> CreateSubscriptionInput conversion could be slower than current approach
// FIXME filter could be supported same way as netconf server filters get and get-config results
final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
import config { prefix config; revision-date 2013-04-05; }
import netconf-northbound-notification { prefix nnn; revision-date 2015-08-06; }
- description "TODO";
+ description
+ "This module contains the base YANG definitions for
+ netconf northbound notifications implementation";
revision "2015-08-07"{
description "Initial revision.";
protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
final NetconfNode node) {
- //setup default values since default value is not supported yet in mdsal
- // TODO remove this when mdsal starts supporting default values
+ //setup default values since default value is not supported in mdsal
final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, NetconfNode node) {
- //setup default values since default value is not supported yet in mdsal
- // TODO remove this when mdsal starts supporting default values
+ //setup default values since default value is not supported in mdsal
final long clientConnectionTimeoutMillis = node.getConnectionTimeoutMillis() == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : node.getConnectionTimeoutMillis();
final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-// TODO maybe rename to NetconfTopologyDispatcher?
public interface NetconfTopology {
String getTopologyId();
@Override
protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
final NetconfNode node) {
- //setup default values since default value is not supported yet in mdsal
- // TODO remove this when mdsal starts supporting default values
+ //setup default values since default value is not supported in mdsal
final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
}
private void setUpSchema(final DeviceSources result) {
- processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, listener));
+ processingExecutor.submit(new SchemaSetup(result, remoteSessionCapabilities, listener));
}
@Override
/**
* Schema builder that tries to build schema context from provided sources or biggest subset of it.
*/
- private final class RecursiveSchemaSetup implements Runnable {
+ private final class SchemaSetup implements Runnable {
private final DeviceSources deviceSources;
private final NetconfSessionPreferences remoteSessionCapabilities;
private final RemoteDeviceCommunicator<NetconfMessage> listener;
private final NetconfDeviceCapabilities capabilities;
- public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ public SchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceCommunicator<NetconfMessage> listener) {
this.deviceSources = deviceSources;
this.remoteSessionCapabilities = remoteSessionCapabilities;
this.listener = listener;
}
/**
- * Recursively build schema context, in case of success or final failure notify device
+ * Build schema context, in case of success or final failure notify device
*/
- // FIXME reimplement without recursion
- private void setUpSchema(final Collection<SourceIdentifier> requiredSources) {
- LOG.trace("{}: Trying to build schema context from {}", id, requiredSources);
-
- // If no more sources, fail
- if(requiredSources.isEmpty()) {
- final IllegalStateException cause = new IllegalStateException(id + ": No more sources for schema context");
- handleSalInitializationFailure(cause, listener);
- salFacade.onDeviceFailed(cause);
- return;
- }
-
- final CheckedFuture<SchemaContext, SchemaResolutionException> schemaBuilderFuture = schemaContextFactory.createSchemaContext(requiredSources);
-
- final FutureCallback<SchemaContext> RecursiveSchemaBuilderCallback = new FutureCallback<SchemaContext>() {
-
- @Override
- public void onSuccess(final SchemaContext result) {
+ private void setUpSchema(Collection<SourceIdentifier> requiredSources) {
+ while (!requiredSources.isEmpty()) {
+ LOG.trace("{}: Trying to build schema context from {}", id, requiredSources);
+ try {
+ final CheckedFuture<SchemaContext, SchemaResolutionException> schemaBuilderFuture = schemaContextFactory.createSchemaContext(requiredSources);
+ final SchemaContext result = schemaBuilderFuture.checkedGet();
LOG.debug("{}: Schema context built successfully from {}", id, requiredSources);
final Collection<QName> filteredQNames = Sets.difference(deviceSources.getRequiredSourcesQName(), capabilities.getUnresolvedCapabilites().keySet());
capabilities.addCapabilities(filteredQNames);
capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps());
handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result));
- }
-
- @Override
- public void onFailure(final Throwable t) {
- // In case source missing, try without it
- if (t instanceof MissingSchemaSourceException) {
+ return;
+ } catch (Throwable t) {
+ if (t instanceof MissingSchemaSourceException){
+ // In case source missing, try without it
final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId();
LOG.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource);
LOG.debug("{}: Unable to build schema context, missing source {}, will reattempt without it", t);
if (!qNameOfMissingSource.isEmpty()) {
capabilities.addUnresolvedCapabilities(qNameOfMissingSource, UnavailableCapability.FailureReason.MissingSource);
}
- setUpSchema(stripMissingSource(requiredSources, missingSource));
-
- // In case resolution error, try only with resolved sources
+ requiredSources = stripMissingSource(requiredSources, missingSource);
} else if (t instanceof SchemaResolutionException) {
- // TODO check for infinite loop
- final SchemaResolutionException resolutionException = (SchemaResolutionException) t;
+ // In case resolution error, try only with resolved sources
+ SchemaResolutionException resolutionException = (SchemaResolutionException) t;
final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve);
LOG.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports());
LOG.debug("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", resolutionException);
- setUpSchema(resolutionException.getResolvedSources());
- // unknown error, fail
+ requiredSources = resolutionException.getResolvedSources();
} else {
+ // unknown error, fail
handleSalInitializationFailure(t, listener);
+ return;
}
}
- };
-
- Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback);
+ }
+ // No more sources, fail
+ final IllegalStateException cause = new IllegalStateException(id + ": No more sources for schema context");
+ handleSalInitializationFailure(cause, listener);
+ salFacade.onDeviceFailed(cause);
}
+
protected NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
}
* reconnecting strategy runs out of reconnection attempts
*/
public ListenableFuture<NetconfDeviceCapabilities> initializeRemoteConnection(final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
- // TODO 2313 extract listener from configuration
if(config instanceof NetconfReconnectingClientConfiguration) {
initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
} else {
@Override
public void onFailure(@Nonnull final Throwable t) {
- // User/Application RPC failed (The RPC did not reach the remote device or .. TODO what other reasons could cause this ?)
+ // User/Application RPC failed (The RPC did not reach the remote device.
// There is no point in keeping this session. Reconnect.
LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, t);
reconnect();
private static final Function<RpcDefinition, DOMRpcIdentifier> RPC_TO_RPC_IDENTIFIER = new Function<RpcDefinition, DOMRpcIdentifier>() {
@Override
public DOMRpcIdentifier apply(final RpcDefinition input) {
- // TODO add support for routed rpcs ... is it necessary in this case ?
return DOMRpcIdentifier.create(input.getPath());
}
};
final ListenableFuture<DOMRpcResult> future;
if (isFilterPresent(filterPath)) {
- // FIXME the source node has to be wrapped in a choice
final DataContainerChild<?, ?> node = toFilterStructure(filterPath.get(), schemaContext);
future = rpc.invokeRpc(toPath(NETCONF_GET_CONFIG_QNAME),
NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME, getSourceNode(datastore), node));
</excludes>
</filter>
</filters>
- <artifactSet>
- <excludes>
- <exclude>com.ning</exclude>
- </excludes>
- </artifactSet>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.opendaylight.netconf.test.tool.Main</mainClass>
- </transformer>
- </transformers>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <shadedClassifierName>executable</shadedClassifierName>
- </configuration>
- </execution>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.opendaylight.netconf.test.tool.Main</mainClass>
+ </transformer>
+ </transformers>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>executable</shadedClassifierName>
+ </configuration>
+ </execution>
- <execution>
- <id>stress-client</id>
- <goals>
- <goal>shade</goal>
- </goals>
- <phase>package</phase>
- <configuration>
- <shadedArtifactId>stress-client</shadedArtifactId>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- <exclude>org.opendaylight.netconf.test.tool.client.http</exclude>
- <exclude>org.opendaylight.netconf.test.tool.rpc</exclude>
- <exclude>AcceptingAuthProvider</exclude>
- <exclude>org.opendaylight.netconf.test.tool.DummyMonitoringService</exclude>
- <exclude>org.opendaylight.netconf.test.tool.FakeCapability</exclude>
- <exclude>org.opendaylight.netconf.test.tool.Main</exclude>
- <exclude>org.opendaylight.netconf.test.tool.NetconfDeviceSimulator</exclude>
- </excludes>
- </filter>
- </filters>
- <artifactSet>
- <excludes>
- <exclude>org.bouncycastle:*</exclude>
- </excludes>
- </artifactSet>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <manifestEntries>
- <Main-Class>org.opendaylight.netconf.test.tool.client.stress.StressClient</Main-Class>
- <Class-Path>. lib lib/bcprov-jdk15on.jar lib/bcpkix-jdk15on.jar</Class-Path>
- </manifestEntries>
- </transformer>
- </transformers>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <shadedClassifierName>stress-client</shadedClassifierName>
- </configuration>
- </execution>
+ <execution>
+ <id>stress-client</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <shadedArtifactId>stress-client</shadedArtifactId>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.client.http</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.rpc</exclude>
+ <exclude>AcceptingAuthProvider</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.DummyMonitoringService</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.FakeCapability</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.Main</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.NetconfDeviceSimulator</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <artifactSet>
+ <excludes>
+ <exclude>org.bouncycastle:*</exclude>
+ </excludes>
+ </artifactSet>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <manifestEntries>
+ <Main-Class>org.opendaylight.netconf.test.tool.client.stress.StressClient</Main-Class>
+ <Class-Path>. lib lib/bcprov-jdk15on.jar lib/bcpkix-jdk15on.jar</Class-Path>
+ </manifestEntries>
+ </transformer>
+ </transformers>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>stress-client</shadedClassifierName>
+ </configuration>
+ </execution>
- <execution>
- <id>restconf-perf-client</id>
- <goals>
- <goal>shade</goal>
- </goals>
- <phase>package</phase>
- <configuration>
- <shadedArtifactId>rest-perf-client</shadedArtifactId>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- <exclude>org.opendaylight.netconf.test.tool.rpc</exclude>
- <exclude>AcceptingAuthProvider</exclude>
- <exclude>org.opendaylight.netconf.test.tool.DummyMonitoringService</exclude>
- <exclude>org.opendaylight.netconf.test.tool.FakeCapability</exclude>
- <exclude>org.opendaylight.netconf.test.tool.Main</exclude>
- <exclude>org.opendaylight.netconf.test.tool.NetconfDeviceSimulator</exclude>
- </excludes>
- </filter>
- </filters>
- <artifactSet>
- <excludes>
- <exclude>org.bouncycastle:*</exclude>
- <exclude>com.google:*</exclude>
- <exclude>org.opendaylight.yangtools</exclude>
- <exclude>org.opendaylight.yang</exclude>
- </excludes>
- </artifactSet>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.opendaylight.netconf.test.tool.client.http.perf.RestPerfClient</mainClass>
- </transformer>
- </transformers>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <shadedClassifierName>rest-perf-client</shadedClassifierName>
- </configuration>
- </execution>
+ <execution>
+ <id>restconf-perf-client</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <shadedArtifactId>rest-perf-client</shadedArtifactId>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.rpc</exclude>
+ <exclude>AcceptingAuthProvider</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.DummyMonitoringService</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.FakeCapability</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.Main</exclude>
+ <exclude>org.opendaylight.netconf.test.tool.NetconfDeviceSimulator</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <artifactSet>
+ <excludes>
+ <exclude>org.bouncycastle:*</exclude>
+ <exclude>com.google:*</exclude>
+ <exclude>org.opendaylight.yangtools</exclude>
+ <exclude>org.opendaylight.yang</exclude>
+ </excludes>
+ </artifactSet>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.opendaylight.netconf.test.tool.client.http.perf.RestPerfClient</mainClass>
+ </transformer>
+ </transformers>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>rest-perf-client</shadedClassifierName>
+ </configuration>
+ </execution>
<execution>
<id>scale-util</id>
<shadedClassifierName>scale-util</shadedClassifierName>
</configuration>
</execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/stress-client.xml</descriptor>
- </descriptors>
- <finalName>stress-client-${project.version}-package</finalName>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/stress-client.xml</descriptor>
+ </descriptors>
+ <finalName>stress-client-${project.version}-package</finalName>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
-</project>
+ </project>
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.netconf.test.tool;
+
+import com.ning.http.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+
+public class Execution implements Callable<Void> {
+
+ private final ArrayList<Request> payloads;
+ private final AsyncHttpClient asyncHttpClient;
+ private static final Logger LOG = LoggerFactory.getLogger(Execution.class);
+ private final boolean invokeAsync;
+ private final Semaphore semaphore;
+ private final int throttle;
+
+ static final class DestToPayload {
+
+ private final String destination;
+ private final String payload;
+
+ public DestToPayload(String destination, String payload) {
+ this.destination = destination;
+ this.payload = payload;
+ }
+
+ public String getDestination() {
+ return destination;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+ }
+
+ public Execution(TesttoolParameters params, ArrayList<DestToPayload> payloads) {
+ this.invokeAsync = params.async;
+ this.throttle = params.throttle / params.threadAmount;
+
+ if (params.async && params.threadAmount > 1) {
+ LOG.info("Throttling per thread: {}", this.throttle);
+ }
+ this.semaphore = new Semaphore(this.throttle);
+
+ this.asyncHttpClient = new AsyncHttpClient(new AsyncHttpClientConfig.Builder()
+ .setConnectTimeout(Integer.MAX_VALUE)
+ .setRequestTimeout(Integer.MAX_VALUE)
+ .setAllowPoolingConnections(true)
+ .build());
+
+ this.payloads = new ArrayList<>();
+ for (DestToPayload payload : payloads) {
+ AsyncHttpClient.BoundRequestBuilder requestBuilder = asyncHttpClient.preparePut(payload.getDestination())
+ .addHeader("Content-Type", "application/xml")
+ .addHeader("Accept", "application/xml")
+ .setBody(payload.getPayload())
+ .setRequestTimeout(Integer.MAX_VALUE);
+
+ if (params.auth != null) {
+ requestBuilder.setRealm(new Realm.RealmBuilder()
+ .setScheme(Realm.AuthScheme.BASIC)
+ .setPrincipal(params.auth.get(0))
+ .setPassword(params.auth.get(1))
+ .setMethodName("PUT")
+ .setUsePreemptiveAuth(true)
+ .build());
+ }
+ this.payloads.add(requestBuilder.build());
+ }
+ }
+
+ private void invokeSync() {
+ LOG.info("Begin sending sync requests");
+ for (Request request : payloads) {
+ try {
+ Response response = asyncHttpClient.executeRequest(request).get();
+ if (response.getStatusCode() != 200 && response.getStatusCode() != 204) {
+ LOG.warn("Status code: {}", response.getStatusCode());
+ LOG.warn("url: {}", request.getUrl());
+ LOG.warn(response.getResponseBody());
+ }
+ } catch (InterruptedException | ExecutionException | IOException e) {
+ LOG.warn(e.toString());
+ }
+ }
+ LOG.info("End sending sync requests");
+ }
+
+ private void invokeAsync() {
+ final ArrayList<ListenableFuture<Response>> futures = new ArrayList<>();
+ LOG.info("Begin sending async requests");
+
+ for (final Request request : payloads) {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ LOG.warn("Semaphore acquire interrupted");
+ }
+ futures.add(asyncHttpClient.executeRequest(request, new AsyncCompletionHandler<Response>() {
+ @Override
+ public STATE onStatusReceived(HttpResponseStatus status) throws Exception {
+ super.onStatusReceived(status);
+ if (status.getStatusCode() != 200 && status.getStatusCode() != 204) {
+ LOG.warn("Request failed, status code: {}", status.getStatusCode() + status.getStatusText());
+ LOG.warn("request: {}", request.toString());
+ }
+ return STATE.CONTINUE;
+ }
+
+ @Override
+ public Response onCompleted(Response response) throws Exception {
+ semaphore.release();
+ return response;
+ }
+ }));
+ }
+ LOG.info("Requests sent, waiting for responses");
+
+ try {
+ semaphore.acquire(this.throttle);
+ } catch (InterruptedException e) {
+ LOG.warn("Semaphore acquire interrupted");
+ }
+
+ LOG.info("Responses received, ending...");
+ }
+
+ @Override
+ public Void call() throws Exception {
+ if (invokeAsync) {
+ this.invokeAsync();
+ } else {
+ this.invokeSync();
+ }
+ return null;
+ }
+}
package org.opendaylight.netconf.test.tool;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import ch.qos.logback.classic.Level;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.io.CharStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import net.sourceforge.argparse4j.ArgumentParsers;
-import net.sourceforge.argparse4j.annotation.Arg;
-import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
import org.opendaylight.controller.config.util.xml.XmlElement;
import org.opendaylight.controller.config.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
+
public final class Main {
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
- public static class Params {
-
- @Arg(dest = "schemas-dir")
- public File schemasDir;
-
- @Arg(dest = "devices-count")
- public int deviceCount;
-
- @Arg(dest = "devices-per-port")
- public int devicesPerPort;
-
- @Arg(dest = "starting-port")
- public int startingPort;
-
- @Arg(dest = "generate-config-connection-timeout")
- public int generateConfigsTimeout;
-
- @Arg(dest = "generate-config-address")
- public String generateConfigsAddress;
-
- @Arg(dest = "distro-folder")
- public File distroFolder;
-
- @Arg(dest = "generate-configs-batch-size")
- public int generateConfigBatchSize;
-
- @Arg(dest = "ssh")
- public boolean ssh;
-
- @Arg(dest = "exi")
- public boolean exi;
-
- @Arg(dest = "debug")
- public boolean debug;
-
- @Arg(dest = "notification-file")
- public File notificationFile;
-
- @Arg(dest = "md-sal")
- public boolean mdSal;
-
- @Arg(dest = "initial-config-xml-file")
- public File initialConfigXMLFile;
-
- static ArgumentParser getParser() {
- final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf testool");
-
- parser.description("Netconf device simulator. Detailed info can be found at https://wiki.opendaylight.org/view/OpenDaylight_Controller:Netconf:Testtool#Building_testtool");
-
- parser.addArgument("--device-count")
- .type(Integer.class)
- .setDefault(1)
- .help("Number of simulated netconf devices to spin. This is the number of actual ports open for the devices.")
- .dest("devices-count");
-
- parser.addArgument("--devices-per-port")
- .type(Integer.class)
- .setDefault(1)
- .help("Amount of config files generated per port to spoof more devices then are actually running")
- .dest("devices-per-port");
-
- parser.addArgument("--schemas-dir")
- .type(File.class)
- .help("Directory containing yang schemas to describe simulated devices. Some schemas e.g. netconf monitoring and inet types are included by default")
- .dest("schemas-dir");
-
- parser.addArgument("--notification-file")
- .type(File.class)
- .help("Xml file containing notifications that should be sent to clients after create subscription is called")
- .dest("notification-file");
-
- parser.addArgument("--initial-config-xml-file")
- .type(File.class)
- .help("Xml file containing initial simulatted configuration to be returned via get-config rpc")
- .dest("initial-config-xml-file");
-
- parser.addArgument("--starting-port")
- .type(Integer.class)
- .setDefault(17830)
- .help("First port for simulated device. Each other device will have previous+1 port number")
- .dest("starting-port");
-
- parser.addArgument("--generate-config-connection-timeout")
- .type(Integer.class)
- .setDefault((int)TimeUnit.MINUTES.toMillis(30))
- .help("Timeout to be generated in initial config files")
- .dest("generate-config-connection-timeout");
-
- parser.addArgument("--generate-config-address")
- .type(String.class)
- .setDefault("127.0.0.1")
- .help("Address to be placed in generated configs")
- .dest("generate-config-address");
-
- parser.addArgument("--generate-configs-batch-size")
- .type(Integer.class)
- .setDefault(4000)
- .help("Number of connector configs per generated file")
- .dest("generate-configs-batch-size");
-
- parser.addArgument("--distribution-folder")
- .type(File.class)
- .help("Directory where the karaf distribution for controller is located")
- .dest("distro-folder");
-
- parser.addArgument("--ssh")
- .type(Boolean.class)
- .setDefault(true)
- .help("Whether to use ssh for transport or just pure tcp")
- .dest("ssh");
-
- parser.addArgument("--exi")
- .type(Boolean.class)
- .setDefault(true)
- .help("Whether to use exi to transport xml content")
- .dest("exi");
-
- parser.addArgument("--debug")
- .type(Boolean.class)
- .setDefault(false)
- .help("Whether to use debug log level instead of INFO")
- .dest("debug");
-
- parser.addArgument("--md-sal")
- .type(Boolean.class)
- .setDefault(false)
- .help("Whether to use md-sal datastore instead of default simulated datastore.")
- .dest("md-sal");
-
- return parser;
- }
-
- void validate() {
- checkArgument(deviceCount > 0, "Device count has to be > 0");
- checkArgument(startingPort > 1023, "Starting port has to be > 1023");
- checkArgument(devicesPerPort > 0, "Atleast one device per port needed");
-
- if(schemasDir != null) {
- checkArgument(schemasDir.exists(), "Schemas dir has to exist");
- checkArgument(schemasDir.isDirectory(), "Schemas dir has to be a directory");
- checkArgument(schemasDir.canRead(), "Schemas dir has to be readable");
- }
- }
- }
-
public static void main(final String[] args) {
- final Params params = parseArgs(args, Params.getParser());
+ final TesttoolParameters params = TesttoolParameters.parseArgs(args, TesttoolParameters.getParser());
params.validate();
-
final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
root.setLevel(params.debug ? Level.DEBUG : Level.INFO);
LOG.error("Failed to start any simulated devices, exiting...");
System.exit(1);
}
- if(params.distroFolder != null) {
+ if (params.controllerDestination != null) {
+ final ArrayList<ArrayList<Execution.DestToPayload>> allThreadsPayloads = params.getThreadsPayloads(openDevices);
+ final ArrayList<Execution> executions = new ArrayList<>();
+ for (ArrayList<Execution.DestToPayload> payloads : allThreadsPayloads) {
+ executions.add(new Execution(params, payloads));
+ }
+ final ExecutorService executorService = Executors.newFixedThreadPool(params.threadAmount);
+ final Stopwatch time = Stopwatch.createStarted();
+ List<Future<Void>> futures = executorService.invokeAll(executions, params.timeOut, TimeUnit.SECONDS);
+ int threadNum = 0;
+ for(Future<Void> future : futures){
+ threadNum++;
+ if (future.isCancelled()) {
+ LOG.info("{}. thread timed out.",threadNum);
+ } else {
+ try {
+ future.get();
+ } catch (final ExecutionException e) {
+ LOG.info("{}. thread failed.", threadNum, e);
+ }
+ }
+ }
+ time.stop();
+ LOG.info("Time spent with configuration of devices: {}.",time);
+ }
+
+ if (params.distroFolder != null) {
final ConfigGenerator configGenerator = new ConfigGenerator(params.distroFolder, openDevices);
final List<File> generated = configGenerator.generate(
params.ssh, params.generateConfigBatchSize,
}
}
- private static Params parseArgs(final String[] args, final ArgumentParser parser) {
- final Params opt = new Params();
- try {
- parser.parseArgs(args, opt);
- return opt;
- } catch (final ArgumentParserException e) {
- parser.handleError(e);
- }
-
- System.exit(1);
- return null;
- }
-
static class ConfigGenerator {
public static final String NETCONF_CONNECTOR_XML = "/99-netconf-connector-simulated.xml";
public static final String SIM_DEVICE_SUFFIX = "-sim-device";
public List<File> generate(final boolean useSsh, final int batchSize,
final int generateConfigsTimeout, final String address,
final int devicesPerPort) {
- if(configDir.exists() == false) {
+ if (configDir.exists() == false) {
Preconditions.checkState(configDir.mkdirs(), "Unable to create directory " + configDir);
}
Preconditions.checkState(file.delete(), "Unable to clean previous generated file %s", file);
}
- try(InputStream stream = Main.class.getResourceAsStream(NETCONF_CONNECTOR_XML)) {
+ try (InputStream stream = Main.class.getResourceAsStream(NETCONF_CONNECTOR_XML)) {
checkNotNull(stream, "Cannot load %s", NETCONF_CONNECTOR_XML);
String configBlueprint = CharStreams.toString(new InputStreamReader(stream, Charsets.UTF_8));
final List<File> generatedConfigs = Lists.newArrayList();
for (final Integer openDevice : openDevices) {
- if(batchStart == null) {
+ if (batchStart == null) {
batchStart = openDevice;
}
b.append(configContent);
connectorCount++;
- if(connectorCount == batchSize) {
+ if (connectorCount == batchSize) {
b.append(after);
final File to = new File(configDir, String.format(SIM_DEVICE_CFG_PREFIX + "%d-%d.xml", batchStart, openDevice));
generatedConfigs.add(to);
}
// Write remaining
- if(connectorCount != 0) {
+ if (connectorCount != 0) {
b.append(after);
final File to = new File(configDir, String.format(SIM_DEVICE_CFG_PREFIX + "%d-%d.xml", batchStart, openDevices.get(openDevices.size() - 1)));
generatedConfigs.add(to);
}
}
- Files.write(XmlUtil.toString(document), featureFile,Charsets.UTF_8);
+ Files.write(XmlUtil.toString(document), featureFile, Charsets.UTF_8);
LOG.info("Feature file {} updated", featureFile);
}
} catch (final IOException e) {
public void changeLoadOrder() {
try {
- Files.write(ByteStreams.toByteArray(getClass().getResourceAsStream("/" +ORG_OPS4J_PAX_URL_MVN_CFG)), loadOrderCfgFile);
+ Files.write(ByteStreams.toByteArray(getClass().getResourceAsStream("/" + ORG_OPS4J_PAX_URL_MVN_CFG)), loadOrderCfgFile);
LOG.info("Load order changed to prefer local bundles/features by rewriting file {}", loadOrderCfgFile);
} catch (IOException e) {
throw new RuntimeException("Unable to rewrite features file " + loadOrderCfgFile, e);
return new NetconfServerDispatcherImpl(serverChannelInitializer, nettyThreadgroup, nettyThreadgroup);
}
- public List<Integer> start(final Main.Params params) {
+ public List<Integer> start(final TesttoolParameters params) {
LOG.info("Starting {}, {} simulated devices starting on port {}", params.deviceCount, params.ssh ? "SSH" : "TCP", params.startingPort);
final SharedSchemaRepository schemaRepo = new SharedSchemaRepository("netconf-simulator");
}
}
- private Set<Capability> parseSchemasToModuleCapabilities(final Main.Params params, final SharedSchemaRepository consumer) {
+ private Set<Capability> parseSchemasToModuleCapabilities(final TesttoolParameters params, final SharedSchemaRepository consumer) {
final Set<SourceIdentifier> loadedSources = Sets.newHashSet();
consumer.registerSchemaSourceListener(TextToASTTransformer.create(consumer, consumer));
package org.opendaylight.netconf.test.tool;
import ch.qos.logback.classic.Level;
-import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.io.CharStreams;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig.Builder;
import com.ning.http.client.Request;
import com.ning.http.client.Response;
-import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.util.regex.Pattern;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
-import org.opendaylight.netconf.test.tool.Main.Params;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Semaphore semaphore = new Semaphore(0);
public static void main(final String[] args) {
- final Params params = parseArgs(args, Params.getParser());
+ final TesttoolParameters params = TesttoolParameters.parseArgs(args, TesttoolParameters.getParser());
root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
root.setLevel(params.debug ? Level.DEBUG : Level.INFO);
}
}
- private static void cleanup(final Runtime runtime, final Params params) {
+ private static void cleanup(final Runtime runtime, final TesttoolParameters params) {
try {
stopKaraf(runtime, params);
deleteFolder(new File(params.distroFolder.getAbsoluteFile() + "/data"));
}
}
- private static void stopKaraf(final Runtime runtime, final Params params) throws IOException, InterruptedException {
+ private static void stopKaraf(final Runtime runtime, final TesttoolParameters params) throws IOException, InterruptedException {
root.info("Stopping karaf and sleeping for 10 sec..");
String controllerPid = "";
do {
folder.delete();
}
- private static Params parseArgs(final String[] args, final ArgumentParser parser) {
- final Params parameters = new Params();
+ private static TesttoolParameters parseArgs(final String[] args, final ArgumentParser parser) {
+ final TesttoolParameters parameters = new TesttoolParameters();
try {
parser.parseArgs(args, parameters);
return parameters;
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.test.tool;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.CharStreams;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.annotation.Arg;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+
+public class TesttoolParameters {
+
+ private static final String HOST_KEY = "{HOST}";
+ private static final String PORT_KEY = "{PORT}";
+ private static final String SSH = "{SSH}";
+ private static final String ADDRESS_PORT = "{ADDRESS:PORT}";
+ private static final String dest = "http://{ADDRESS:PORT}/restconf/config/network-topology:network-topology/topology/topology-netconf/node/{PORT}-sim-device";
+
+ private static final String RESOURCE = "/config-template.xml";
+ private InputStream stream;
+
+ @Arg(dest = "edit-content")
+ public File editContent;
+
+ @Arg(dest = "async")
+ public boolean async;
+
+ @Arg(dest = "thread-amount")
+ public int threadAmount;
+
+ @Arg(dest = "throttle")
+ public int throttle;
+
+ @Arg(dest = "auth")
+ public ArrayList<String> auth;
+
+ @Arg(dest = "controller-destination")
+ public String controllerDestination;
+
+ @Arg(dest = "schemas-dir")
+ public File schemasDir;
+
+ @Arg(dest = "devices-count")
+ public int deviceCount;
+
+ @Arg(dest = "devices-per-port")
+ public int devicesPerPort;
+
+ @Arg(dest = "starting-port")
+ public int startingPort;
+
+ @Arg(dest = "generate-config-connection-timeout")
+ public int generateConfigsTimeout;
+
+ @Arg(dest = "generate-config-address")
+ public String generateConfigsAddress;
+
+ @Arg(dest = "distro-folder")
+ public File distroFolder;
+
+ @Arg(dest = "generate-configs-batch-size")
+ public int generateConfigBatchSize;
+
+ @Arg(dest = "ssh")
+ public boolean ssh;
+
+ @Arg(dest = "exi")
+ public boolean exi;
+
+ @Arg(dest = "debug")
+ public boolean debug;
+
+ @Arg(dest = "notification-file")
+ public File notificationFile;
+
+ @Arg(dest = "md-sal")
+ public boolean mdSal;
+
+ @Arg(dest = "initial-config-xml-file")
+ public File initialConfigXMLFile;
+
+ @Arg(dest = "time-out")
+ public long timeOut;
+
+ static ArgumentParser getParser() {
+ final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf testtool");
+
+ parser.description("netconf testtool");
+
+ parser.addArgument("--edit-content")
+ .type(String.class)
+ .dest("edit-content");
+
+ parser.addArgument("--async-requests")
+ .type(Boolean.class)
+ .setDefault(false)
+ .dest("async");
+
+ parser.addArgument("--thread-amount")
+ .type(Integer.class)
+ .setDefault(1)
+ .dest("thread-amount");
+
+ parser.addArgument("--throttle")
+ .type(Integer.class)
+ .setDefault(5000)
+ .help("Maximum amount of async requests that can be open at a time, " +
+ "with mutltiple threads this gets divided among all threads")
+ .dest("throttle");
+
+ parser.addArgument("--auth")
+ .nargs(2)
+ .help("Username and password for HTTP basic authentication in order username password.")
+ .dest("auth");
+
+ parser.addArgument("--controller-destination")
+ .type(String.class)
+ .help("Ip address and port of controller. Must be in following format <ip>:<port> "+
+ "if available it will be used for spawning netconf connectors via topology configuration as "+
+ "a part of URI. Example (http://<controller destination>/restconf/config/network-topology:network-topology/topology/topology-netconf/node/<node-id>)"+
+ "otherwise it will just start simulated devices and skip the execution of PUT requests")
+ .dest("controller-destination");
+
+ parser.addArgument("--device-count")
+ .type(Integer.class)
+ .setDefault(1)
+ .help("Number of simulated netconf devices to spin. This is the number of actual ports open for the devices.")
+ .dest("devices-count");
+
+ parser.addArgument("--devices-per-port")
+ .type(Integer.class)
+ .setDefault(1)
+ .help("Amount of config files generated per port to spoof more devices then are actually running")
+ .dest("devices-per-port");
+
+ parser.addArgument("--schemas-dir")
+ .type(File.class)
+ .help("Directory containing yang schemas to describe simulated devices. Some schemas e.g. netconf monitoring and inet types are included by default")
+ .dest("schemas-dir");
+
+ parser.addArgument("--notification-file")
+ .type(File.class)
+ .help("Xml file containing notifications that should be sent to clients after create subscription is called")
+ .dest("notification-file");
+
+ parser.addArgument("--initial-config-xml-file")
+ .type(File.class)
+ .help("Xml file containing initial simulatted configuration to be returned via get-config rpc")
+ .dest("initial-config-xml-file");
+
+ parser.addArgument("--starting-port")
+ .type(Integer.class)
+ .setDefault(17830)
+ .help("First port for simulated device. Each other device will have previous+1 port number")
+ .dest("starting-port");
+
+ parser.addArgument("--generate-config-connection-timeout")
+ .type(Integer.class)
+ .setDefault((int) TimeUnit.MINUTES.toMillis(30))
+ .help("Timeout to be generated in initial config files")
+ .dest("generate-config-connection-timeout");
+
+ parser.addArgument("--generate-config-address")
+ .type(String.class)
+ .setDefault("127.0.0.1")
+ .help("Address to be placed in generated configs")
+ .dest("generate-config-address");
+
+ parser.addArgument("--generate-configs-batch-size")
+ .type(Integer.class)
+ .setDefault(4000)
+ .help("Number of connector configs per generated file")
+ .dest("generate-configs-batch-size");
+
+ parser.addArgument("--distribution-folder")
+ .type(File.class)
+ .help("Directory where the karaf distribution for controller is located")
+ .dest("distro-folder");
+
+ parser.addArgument("--ssh")
+ .type(Boolean.class)
+ .setDefault(true)
+ .help("Whether to use ssh for transport or just pure tcp")
+ .dest("ssh");
+
+ parser.addArgument("--exi")
+ .type(Boolean.class)
+ .setDefault(true)
+ .help("Whether to use exi to transport xml content")
+ .dest("exi");
+
+ parser.addArgument("--debug")
+ .type(Boolean.class)
+ .setDefault(false)
+ .help("Whether to use debug log level instead of INFO")
+ .dest("debug");
+
+ parser.addArgument("--md-sal")
+ .type(Boolean.class)
+ .setDefault(false)
+ .help("Whether to use md-sal datastore instead of default simulated datastore.")
+ .dest("md-sal");
+
+ parser.addArgument("--time-out")
+ .type(long.class)
+ .setDefault(20)
+ .help("the maximum time in seconds for executing each PUT request")
+ .dest("time-out");
+
+ return parser;
+ }
+
+ public static TesttoolParameters parseArgs(final String[] args, final ArgumentParser parser) {
+ final TesttoolParameters opt = new TesttoolParameters();
+ try {
+ parser.parseArgs(args, opt);
+ return opt;
+ } catch (final ArgumentParserException e) {
+ parser.handleError(e);
+ }
+
+ System.exit(1);
+ return null;
+ }
+
+ void validate() {
+ if (editContent == null) {
+ stream = TesttoolParameters.class.getResourceAsStream(RESOURCE);
+ } else {
+ Preconditions.checkArgument(!editContent.isDirectory(), "Edit content file is a dir");
+ Preconditions.checkArgument(editContent.canRead(), "Edit content file is unreadable");
+ }
+
+ if (controllerDestination != null) {
+ Preconditions.checkArgument(controllerDestination.contains(":"), "Controller Destination needs to be in a following format <ip>:<port>");
+ String[] parts = controllerDestination.split(Pattern.quote(":"));
+ Preconditions.checkArgument(Integer.parseInt(parts[1]) > 0, "Port =< 0");
+ }
+
+ checkArgument(deviceCount > 0, "Device count has to be > 0");
+ checkArgument(startingPort > 1023, "Starting port has to be > 1023");
+ checkArgument(devicesPerPort > 0, "Atleast one device per port needed");
+
+ if (schemasDir != null) {
+ checkArgument(schemasDir.exists(), "Schemas dir has to exist");
+ checkArgument(schemasDir.isDirectory(), "Schemas dir has to be a directory");
+ checkArgument(schemasDir.canRead(), "Schemas dir has to be readable");
+ }
+ }
+
+ public ArrayList<ArrayList<Execution.DestToPayload>> getThreadsPayloads(List<Integer> openDevices) {
+ final String editContentString;
+ try {
+ if(stream == null)
+ {
+ editContentString = Files.toString(editContent, Charsets.UTF_8);
+ } else {
+ editContentString = CharStreams.toString(new InputStreamReader(stream, Charsets.UTF_8));
+ }
+ } catch (final IOException e) {
+ throw new IllegalArgumentException("Cannot read content of " + editContent);
+ }
+
+ final ArrayList<ArrayList<Execution.DestToPayload>> allThreadsPayloads = new ArrayList<>();
+ for (int i = 0; i < threadAmount; i++) {
+ final ArrayList<Execution.DestToPayload> payloads = new ArrayList<>();
+ for (int j = 0; j < openDevices.size(); j++) {
+ final StringBuilder destBuilder = new StringBuilder(dest);
+ destBuilder.replace(destBuilder.indexOf(ADDRESS_PORT), destBuilder.indexOf(ADDRESS_PORT) + ADDRESS_PORT.length(), controllerDestination)
+ .replace(destBuilder.indexOf(PORT_KEY), destBuilder.indexOf(PORT_KEY) + PORT_KEY.length(), Integer.toString(openDevices.get(j)));
+ payloads.add(new Execution.DestToPayload(destBuilder.toString(), prepareMessage(openDevices.get(j), editContentString)));
+ }
+ allThreadsPayloads.add(payloads);
+ }
+
+ return allThreadsPayloads;
+ }
+
+ private String prepareMessage(final int openDevice, final String editContentString) {
+ StringBuilder messageBuilder = new StringBuilder(editContentString);
+
+ if (editContentString.contains(HOST_KEY)) {
+ messageBuilder.replace(messageBuilder.indexOf(HOST_KEY), messageBuilder.indexOf(HOST_KEY) + HOST_KEY.length(), generateConfigsAddress);
+ }
+ if (editContentString.contains(PORT_KEY)) {
+ while (messageBuilder.indexOf(PORT_KEY) != -1)
+ messageBuilder.replace(messageBuilder.indexOf(PORT_KEY), messageBuilder.indexOf(PORT_KEY) + PORT_KEY.length(), Integer.toString(openDevice));
+ }
+ if (editContentString.contains(SSH)) {
+ messageBuilder.replace(messageBuilder.indexOf(SSH), messageBuilder.indexOf(SSH) + SSH.length(), Boolean.toString(ssh));
+ }
+ return messageBuilder.toString();
+ }
+}
--- /dev/null
+<node xmlns="urn:TBD:params:xml:ns:yang:network-topology">
+ <node-id>{PORT}-sim-device</node-id>
+ <host xmlns="urn:opendaylight:netconf-node-topology">{HOST}</host>
+ <port xmlns="urn:opendaylight:netconf-node-topology">{PORT}</port>
+ <username xmlns="urn:opendaylight:netconf-node-topology">admin</username>
+ <password xmlns="urn:opendaylight:netconf-node-topology">admin</password>
+ <tcp-only xmlns="urn:opendaylight:netconf-node-topology">{SSH}</tcp-only>
+ <keepalive-delay xmlns="urn:opendaylight:netconf-node-topology">0</keepalive-delay>
+</node>
\ No newline at end of file