X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fmessagebus-netconf%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fmessagebus%2Feventsources%2Fnetconf%2FStreamNotificationTopicRegistration.java;h=f2edf047aff1eb64712f6cfdf665955f542841d0;hb=3b5e41839346a83c1d8d54b3d869e19d9e417063;hp=01966c207262489f2ad7761586685ea73b522e58;hpb=df1a4dbb37e0fb187c6d50d3bab1f9d88b888928;p=netconf.git diff --git a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java index 01966c2072..f2edf047af 100644 --- a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java +++ b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java @@ -7,14 +7,14 @@ */ package org.opendaylight.netconf.messagebus.eventsources.netconf; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import java.util.concurrent.ExecutionException; +import org.opendaylight.mdsal.dom.api.DOMNotificationListener; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -44,13 +44,13 @@ class StreamNotificationTopicRegistration extends NotificationTopicRegistration * @param netconfEventSource event source */ StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, - NetconfEventSource netconfEventSource) { + final NetconfEventSource netconfEventSource) { super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix); this.netconfEventSource = netconfEventSource; this.mountPoint = netconfEventSource.getMount(); this.nodeId = mountPoint.getNode().getNodeId().getValue(); this.stream = stream; - setReplaySupported(stream.isReplaySupport()); + setReplaySupported(stream.getReplaySupport()); setActive(false); LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName()); } @@ -58,15 +58,16 @@ class StreamNotificationTopicRegistration extends NotificationTopicRegistration /** * Subscribes to notification stream associated with this registration. */ + @Override void activateNotificationSource() { if (!isActive()) { LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId); - final CheckedFuture result = mountPoint.invokeCreateSubscription(stream); + final ListenableFuture result = mountPoint.invokeCreateSubscription(stream); try { - result.checkedGet(); + result.get(); setActive(true); - } catch (DOMRpcException e) { - LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId, e); setActive(false); } } else { @@ -79,16 +80,17 @@ class StreamNotificationTopicRegistration extends NotificationTopicRegistration * from last * received event time will be requested. */ + @Override void reActivateNotificationSource() { if (isActive()) { LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId); - final CheckedFuture result; - result = mountPoint.invokeCreateSubscription(stream, getLastEventTime()); + final ListenableFuture result = mountPoint.invokeCreateSubscription(stream, + getLastEventTime()); try { - result.checkedGet(); + result.get(); setActive(true); - } catch (DOMRpcException e) { - LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId, e); setActive(false); } } @@ -115,7 +117,7 @@ class StreamNotificationTopicRegistration extends NotificationTopicRegistration } @Override - boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) { + boolean registerNotificationTopic(final SchemaPath notificationPath, final TopicId topicId) { if (!checkNotificationPath(notificationPath)) { LOG.debug("Bad SchemaPath for notification try to register"); return false; @@ -124,7 +126,7 @@ class StreamNotificationTopicRegistration extends NotificationTopicRegistration activateNotificationSource(); if (!isActive()) { LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), - notificationPath.toString()); + notificationPath); return false; } @@ -139,7 +141,7 @@ class StreamNotificationTopicRegistration extends NotificationTopicRegistration } @Override - synchronized void unRegisterNotificationTopic(TopicId topicId) { + synchronized void unRegisterNotificationTopic(final TopicId topicId) { List notificationPathToRemove = new ArrayList<>(); for (SchemaPath notifKey : notificationTopicMap.keySet()) { Set topicList = notificationTopicMap.get(notifKey);