X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopology.java;h=934056dcab81f2eeb81f387a6d6986c31ec5b629;hp=3aa470b10ad9940729043fd918a9727ffbf5ab84;hb=f9814cf027886294b74fb6c8748f4a3e0a545e86;hpb=cce450550bec259d4f925389bafd007676f2186f diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 3aa470b10a..934056dcab 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -8,14 +8,17 @@ package org.opendaylight.controller.messagebus.app.impl; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.messagebus.app.util.Util; import org.opendaylight.controller.messagebus.spi.EventSource; import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; @@ -26,6 +29,8 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; @@ -52,7 +57,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); @@ -86,64 +90,63 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR eventSourceService = rpcRegistry.getRpcService(EventSourceService.class); final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); - final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); + final TopologyTypes1 topologyTypeAugment = + new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment); LOG.info("EventSourceRegistry has been initialized"); } private void putData(final LogicalDatastoreType store, final InstanceIdentifier path, - final T data){ + final T data) { final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction(); tx.put(store, path, data, true); - Futures.addCallback( tx.submit(), new FutureCallback(){ - + Futures.addCallback(tx.submit(), new FutureCallback() { @Override public void onSuccess(final Void result) { LOG.trace("Data has put into datastore {} {}", store, path); } @Override - public void onFailure(final Throwable t) { - LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, t); + public void onFailure(final Throwable ex) { + LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, ex); } - }); - + }, MoreExecutors.directExecutor()); } - private void deleteData(final LogicalDatastoreType store, final InstanceIdentifier path){ + private void deleteData(final LogicalDatastoreType store, + final InstanceIdentifier path) { final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction(); tx.delete(OPERATIONAL, path); - Futures.addCallback( tx.submit(), new FutureCallback(){ - + Futures.addCallback(tx.submit(), new FutureCallback() { @Override public void onSuccess(final Void result) { LOG.trace("Data has deleted from datastore {} {}", store, path); } @Override - public void onFailure(final Throwable t) { - LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, t); + public void onFailure(final Throwable ex) { + LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, ex); } - - }); + }, MoreExecutors.directExecutor()); } private void insert(final KeyedInstanceIdentifier sourcePath) { final NodeKey nodeKey = sourcePath.getKey(); final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); - final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build(); + final Node1 nodeAgument = new Node1Builder().setEventSourceNode( + new NodeId(nodeKey.getNodeId().getValue())).build(); putData(OPERATIONAL, augmentPath, nodeAgument); } - private void remove(final KeyedInstanceIdentifier sourcePath){ + private void remove(final KeyedInstanceIdentifier sourcePath) { final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); deleteData(OPERATIONAL, augmentPath); } @Override - public Future> createTopic(final CreateTopicInput input) { + public ListenableFuture> createTopic(final CreateTopicInput input) { LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", input.getNotificationPattern(), input.getNodeIdPattern()); @@ -167,40 +170,39 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR } @Override - public Future> destroyTopic(final DestroyTopicInput input) { + public ListenableFuture> destroyTopic(final DestroyTopicInput input) { final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId()); - if(topicToDestroy != null){ + if (topicToDestroy != null) { topicToDestroy.close(); } - return Util.resultRpcSuccessFor((Void) null); + return Util.resultRpcSuccessFor(new DestroyTopicOutputBuilder().build()); } @Override public void close() { aggregatorRpcReg.close(); - for(final EventSourceTopic est : eventSourceTopicMap.values()){ - est.close(); - } + eventSourceTopicMap.values().forEach(EventSourceTopic::close); } - public void register(final EventSource eventSource){ + public void register(final EventSource eventSource) { final NodeKey nodeKey = eventSource.getSourceNodeKey(); final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); - final RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource); + final RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation( + EventSourceService.class, eventSource); reg.registerPath(NodeContext.class, sourcePath); routedRpcRegistrations.put(nodeKey,reg); insert(sourcePath); } - public void unRegister(final EventSource eventSource){ + public void unRegister(final EventSource eventSource) { final NodeKey nodeKey = eventSource.getSourceNodeKey(); final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); final RoutedRpcRegistration removeRegistration = routedRpcRegistrations.remove(nodeKey); - if(removeRegistration != null){ + if (removeRegistration != null) { removeRegistration.close(); - remove(sourcePath); + remove(sourcePath); } } @@ -218,5 +220,14 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR EventSourceService getEventSourceService() { return eventSourceService; } -} + @VisibleForTesting + Map> getRoutedRpcRegistrations() { + return routedRpcRegistrations; + } + + @VisibleForTesting + Map getEventSourceTopicMap() { + return eventSourceTopicMap; + } +}