* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.messagebus.eventsources.netconf;
import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static java.util.Objects.requireNonNull;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import javax.xml.stream.XMLStreamException;
import javax.xml.transform.dom.DOMResult;
import javax.xml.transform.dom.DOMSource;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMEvent;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.messagebus.app.util.TopicDOMNotification;
import org.opendaylight.controller.messagebus.app.util.Util;
import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.mdsal.dom.api.DOMEvent;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
import org.opendaylight.netconf.api.xml.XmlUtil;
import org.opendaylight.netconf.util.NetconfUtil;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DOMSourceAnyxmlNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* NetconfEventSource serves as proxy between nodes and messagebus. Subscribers can join topic stream from this source.
* Then they will receive notifications from device that matches pattern specified by topic.
*/
+@Deprecated(forRemoval = true)
public class NetconfEventSource implements EventSource, DOMNotificationListener {
private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
final NetconfEventSourceMount mount,
final DOMNotificationPublishService publishService) {
this.mount = mount;
- this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap);
- this.domPublish = Preconditions.checkNotNull(publishService);
+ this.urnPrefixToStreamMap = requireNonNull(streamMap);
+ this.domPublish = requireNonNull(publishService);
this.initializeNotificationTopicRegistrationList();
LOG.info("NetconfEventSource [{}] created.", mount.getNodeId());
}
private Map<String, Stream> getAvailableStreams() {
- Map<String, Stream> streamMap = new HashMap<>();
- final List<Stream> availableStreams;
+ final Collection<Stream> availableStreams;
try {
availableStreams = mount.getAvailableStreams();
- streamMap = Maps.uniqueIndex(availableStreams, input -> input.getName().getValue());
- } catch (ReadFailedException e) {
- LOG.warn("Can not read streams for node {}", mount.getNodeId());
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Can not read streams for node {}", mount.getNodeId(), e);
+ return ImmutableMap.of();
}
- return streamMap;
+
+ return Maps.uniqueIndex(availableStreams, input -> input.getName().getValue());
}
@Override
private synchronized ListenableFuture<RpcResult<JoinTopicOutput>> registerTopic(
final TopicId topicId,
final List<SchemaPath> notificationsToSubscribe) {
- Preconditions.checkNotNull(notificationsToSubscribe);
+ requireNonNull(notificationsToSubscribe);
LOG.debug("Join topic {} - register", topicId);
JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
@Override
public void onNotification(final DOMNotification notification) {
- SchemaPath notificationPath = notification.getType();
- Date notificationEventTime = null;
+ Instant notificationEventTime = null;
if (notification instanceof DOMEvent) {
- notificationEventTime = ((DOMEvent) notification).getEventTime();
+ notificationEventTime = ((DOMEvent) notification).getEventInstant();
}
- final String namespace = notification.getType().getLastComponent().getNamespace().toString();
+ final String namespace = notification.getType().lastNodeIdentifier().getNamespace().toString();
for (NotificationTopicRegistration notifReg : notificationTopicRegistrations.get(namespace)) {
notifReg.setLastEventTime(notificationEventTime);
- Set<TopicId> topicIdsForNotification = notifReg.getTopicsForNotification(notificationPath);
- for (TopicId topicId : topicIdsForNotification) {
+ for (TopicId topicId : notifReg.getTopicsForNotification(notification.getType().asSchemaPath())) {
publishNotification(notification, topicId);
LOG.debug("Notification {} has been published for TopicId {}", notification.getType(),
topicId.getValue());
}
}
- private AnyXmlNode encapsulate(final DOMNotification body) {
+ private DOMSourceAnyxmlNode encapsulate(final DOMNotification body) {
// FIXME: Introduce something like YangModeledAnyXmlNode in Yangtools
final Document doc = XmlUtil.newDocument();
final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
final DOMResult result = new DOMResult(element);
- final SchemaContext context = mount.getSchemaContext();
- final SchemaPath schemaPath = body.getType();
try {
- NetconfUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
+ NetconfUtil.writeNormalizedNode(body.getBody(), result, body.getType().asSchemaPath(),
+ mount.getSchemaContext());
return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG).withValue(new DOMSource(element)).build();
} catch (IOException | XMLStreamException e) {
LOG.error("Unable to encapsulate notification.", e);
final List<SchemaPath> availNotifList = new ArrayList<>();
// add Event Source Connection status notification
- availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
+ availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH.asSchemaPath());
- final Set<NotificationDefinition> availableNotifications = mount.getSchemaContext()
- .getNotifications();
// add all known notifications from netconf device
- for (final NotificationDefinition nd : availableNotifications) {
+ for (final NotificationDefinition nd : mount.getSchemaContext().getNotifications()) {
availNotifList.add(nd.getPath());
}
return availNotifList;