/* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.messagebus.eventsources.netconf; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; public class StreamNotificationTopicRegistration extends NotificationTopicRegistration { private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class); private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime")); final private DOMMountPoint domMountPoint; final private String nodeId; final private NetconfEventSource netconfEventSource; final private Stream stream; private Date lastEventTime; private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) { super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix); this.domMountPoint = netconfEventSource.getDOMMountPoint(); this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString(); this.netconfEventSource = netconfEventSource; this.stream = stream; this.lastEventTime= null; setReplaySupported(this.stream.isReplaySupport()); setActive(false); LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName()); } void activateNotificationSource() { if(isActive() == false){ LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId); final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())) .build(); CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); try { csFuture.checkedGet(); setActive(true); } catch (DOMRpcException e) { LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId); setActive(false); return; } } else { LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId); } } void reActivateNotificationSource(){ if(isActive()){ LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId); DataContainerNodeAttrBuilder inputBuilder = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())); if(isReplaySupported() && this.getLastEventTime() != null){ inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime())); } final ContainerNode input = inputBuilder.build(); CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); try { csFuture.checkedGet(); setActive(true); } catch (DOMRpcException e) { LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId); setActive(false); return; } } } @Override void deActivateNotificationSource() { // no operations need } private void closeStream() { if(isActive()){ for(ListenerRegistration reg : notificationRegistrationMap.values()){ reg.close(); } notificationRegistrationMap.clear(); notificationTopicMap.clear(); setActive(false); } } private String getStreamName() { return getSourceName(); } @Override ArrayList getNotificationTopicIds(SchemaPath notificationPath){ return notificationTopicMap.get(notificationPath); } @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){ if(checkNotificationPath(notificationPath) == false){ LOG.debug("Bad SchemaPath for notification try to register"); return false; } final Optional notifyService = domMountPoint.getService(DOMNotificationService.class); if(notifyService.isPresent() == false){ LOG.debug("DOMNotificationService is not present"); return false; } activateNotificationSource(); if(isActive() == false){ LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString()); return false; } ListenerRegistration registration = notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath); notificationRegistrationMap.put(notificationPath, registration); ArrayList topicIds = getNotificationTopicIds(notificationPath); if(topicIds == null){ topicIds = new ArrayList<>(); topicIds.add(topicId); } else { if(topicIds.contains(topicId) == false){ topicIds.add(topicId); } } notificationTopicMap.put(notificationPath, topicIds); return true; } @Override synchronized void unRegisterNotificationTopic(TopicId topicId) { List notificationPathToRemove = new ArrayList<>(); for(SchemaPath notifKey : notificationTopicMap.keySet()){ ArrayList topicList = notificationTopicMap.get(notifKey); if(topicList != null){ topicList.remove(topicId); if(topicList.isEmpty()){ notificationPathToRemove.add(notifKey); } } } for(SchemaPath notifKey : notificationPathToRemove){ notificationTopicMap.remove(notifKey); ListenerRegistration reg = notificationRegistrationMap.remove(notifKey); if(reg != null){ reg.close(); } } } Optional getLastEventTime() { return Optional.fromNullable(lastEventTime); } void setLastEventTime(Date lastEventTime) { this.lastEventTime = lastEventTime; } @Override public void close() throws Exception { closeStream(); } }