/* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.messagebus.eventsources.netconf; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StreamNotificationTopicRegistration extends NotificationTopicRegistration { private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class); private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier( QName.create(CreateSubscriptionInput.QNAME, "stream")); private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath .create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier( QName.create(CreateSubscriptionInput.QNAME, "startTime")); final private DOMMountPoint domMountPoint; final private String nodeId; final private NetconfEventSource netconfEventSource; final private Stream stream; private Date lastEventTime; private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) { super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix); this.domMountPoint = netconfEventSource.getDOMMountPoint(); this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString(); this.netconfEventSource = netconfEventSource; this.stream = stream; this.lastEventTime = null; setReplaySupported(this.stream.isReplaySupport()); setActive(false); LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName()); } void activateNotificationSource() { if (isActive() == false) { LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId); final ContainerNode input = Builders.containerBuilder() .withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())).build(); CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get() .invokeRpc(CREATE_SUBSCRIPTION, input); try { csFuture.checkedGet(); setActive(true); } catch (DOMRpcException e) { LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId); setActive(false); return; } } else { LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId); } } void reActivateNotificationSource() { if (isActive()) { LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId); DataContainerNodeAttrBuilder inputBuilder = Builders.containerBuilder() .withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())); if (isReplaySupported() && this.getLastEventTime() != null) { inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime())); } final ContainerNode input = inputBuilder.build(); CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get() .invokeRpc(CREATE_SUBSCRIPTION, input); try { csFuture.checkedGet(); setActive(true); } catch (DOMRpcException e) { LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId); setActive(false); return; } } } @Override void deActivateNotificationSource() { // no operations need } private void closeStream() { if (isActive()) { for (ListenerRegistration reg : notificationRegistrationMap.values()) { reg.close(); } notificationRegistrationMap.clear(); notificationTopicMap.clear(); setActive(false); } } private String getStreamName() { return getSourceName(); } @Override ArrayList getNotificationTopicIds(SchemaPath notificationPath) { return notificationTopicMap.get(notificationPath); } @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) { if (checkNotificationPath(notificationPath) == false) { LOG.debug("Bad SchemaPath for notification try to register"); return false; } final Optional notifyService = domMountPoint.getService(DOMNotificationService.class); if (notifyService.isPresent() == false) { LOG.debug("DOMNotificationService is not present"); return false; } activateNotificationSource(); if (isActive() == false) { LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString()); return false; } ListenerRegistration registration = notifyService.get() .registerNotificationListener(this.netconfEventSource, notificationPath); notificationRegistrationMap.put(notificationPath, registration); ArrayList topicIds = getNotificationTopicIds(notificationPath); if (topicIds == null) { topicIds = new ArrayList<>(); topicIds.add(topicId); } else { if (topicIds.contains(topicId) == false) { topicIds.add(topicId); } } notificationTopicMap.put(notificationPath, topicIds); return true; } @Override synchronized void unRegisterNotificationTopic(TopicId topicId) { List notificationPathToRemove = new ArrayList<>(); for (SchemaPath notifKey : notificationTopicMap.keySet()) { ArrayList topicList = notificationTopicMap.get(notifKey); if (topicList != null) { topicList.remove(topicId); if (topicList.isEmpty()) { notificationPathToRemove.add(notifKey); } } } for (SchemaPath notifKey : notificationPathToRemove) { notificationTopicMap.remove(notifKey); ListenerRegistration reg = notificationRegistrationMap.remove(notifKey); if (reg != null) { reg.close(); } } } Optional getLastEventTime() { return Optional.fromNullable(lastEventTime); } void setLastEventTime(Date lastEventTime) { this.lastEventTime = lastEventTime; } @Override public void close() throws Exception { closeStream(); } }