*/
package org.opendaylight.netconf.messagebus.eventsources.netconf;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
* @param netconfEventSource event source
*/
StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
- NetconfEventSource netconfEventSource) {
+ final NetconfEventSource netconfEventSource) {
super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
this.netconfEventSource = netconfEventSource;
this.mountPoint = netconfEventSource.getMount();
/**
* Subscribes to notification stream associated with this registration.
*/
+ @Override
void activateNotificationSource() {
if (!isActive()) {
LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
- final CheckedFuture<DOMRpcResult, DOMRpcException> result = mountPoint.invokeCreateSubscription(stream);
+ final FluentFuture<DOMRpcResult> result = mountPoint.invokeCreateSubscription(stream);
try {
- result.checkedGet();
+ result.get();
setActive(true);
- } catch (DOMRpcException e) {
- LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId, e);
setActive(false);
}
} else {
* from last
* received event time will be requested.
*/
+ @Override
void reActivateNotificationSource() {
if (isActive()) {
LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
- final CheckedFuture<DOMRpcResult, DOMRpcException> result;
- result = mountPoint.invokeCreateSubscription(stream, getLastEventTime());
+ final FluentFuture<DOMRpcResult> result = mountPoint.invokeCreateSubscription(stream, getLastEventTime());
try {
- result.checkedGet();
+ result.get();
setActive(true);
- } catch (DOMRpcException e) {
- LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId, e);
setActive(false);
}
}
}
@Override
- boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
+ boolean registerNotificationTopic(final SchemaPath notificationPath, final TopicId topicId) {
if (!checkNotificationPath(notificationPath)) {
LOG.debug("Bad SchemaPath for notification try to register");
return false;
}
@Override
- synchronized void unRegisterNotificationTopic(TopicId topicId) {
+ synchronized void unRegisterNotificationTopic(final TopicId topicId) {
List<SchemaPath> notificationPathToRemove = new ArrayList<>();
for (SchemaPath notifKey : notificationTopicMap.keySet()) {
Set<TopicId> topicList = notificationTopicMap.get(notifKey);