X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopology.java;h=b22eee233e612679da6b766126792f299e426fba;hp=50b8e4f45962532af5a1c792fb6be8da99f09ad0;hb=5f587c3e2bfabc09fec49463d04a6fbeba414e9c;hpb=634dfac8eead60f443bf75e749c70d1f2bb29198 diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 50b8e4f459..b22eee233e 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -5,26 +5,25 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.messagebus.app.impl; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.messagebus.app.util.Util; import org.opendaylight.controller.messagebus.spi.EventSource; import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; +import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder; @@ -41,7 +40,6 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; @@ -50,6 +48,8 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes; +import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; @@ -57,6 +57,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Deprecated(forRemoval = true) public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); @@ -65,29 +66,26 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL; static final InstanceIdentifier EVENT_SOURCE_TOPOLOGY_PATH = - InstanceIdentifier.create(NetworkTopology.class) - .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY); + InstanceIdentifier.create(NetworkTopology.class).child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY); - private static final InstanceIdentifier TOPOLOGY_TYPE_PATH = - EVENT_SOURCE_TOPOLOGY_PATH - .child(TopologyTypes.class) - .augmentation(TopologyTypes1.class); + private static final InstanceIdentifier TOPOLOGY_TYPE_PATH = EVENT_SOURCE_TOPOLOGY_PATH + .child(TopologyTypes.class).augmentation(TopologyTypes1.class); - private final Map eventSourceTopicMap = new ConcurrentHashMap<>(); - private final Map> routedRpcRegistrations = - new ConcurrentHashMap<>(); + private final Map eventSourceTopicMap = new ConcurrentHashMap<>(); + private final Map routedRpcRegistrations = new ConcurrentHashMap<>(); private final DataBroker dataBroker; - private final RpcRegistration aggregatorRpcReg; + private final ObjectRegistration aggregatorRpcReg; private final EventSourceService eventSourceService; - private final RpcProviderRegistry rpcRegistry; + private final RpcProviderService rpcRegistry; - public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) { + public EventSourceTopology(final DataBroker dataBroker, final RpcProviderService providerService, + final RpcConsumerRegistry rpcService) { this.dataBroker = dataBroker; - this.rpcRegistry = rpcRegistry; - aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this); - eventSourceService = rpcRegistry.getRpcService(EventSourceService.class); + this.rpcRegistry = providerService; + aggregatorRpcReg = providerService.registerRpcImplementation(EventAggregatorService.class, this); + eventSourceService = rpcService.getRpcService(EventSourceService.class); final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); final TopologyTypes1 topologyTypeAugment = @@ -96,15 +94,15 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR LOG.info("EventSourceRegistry has been initialized"); } - private void putData(final LogicalDatastoreType store, + private void putData(final LogicalDatastoreType store, final InstanceIdentifier path, final T data) { final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction(); - tx.put(store, path, data, true); - Futures.addCallback(tx.submit(), new FutureCallback() { + tx.mergeParentStructurePut(store, path, data); + tx.commit().addCallback(new FutureCallback() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final CommitInfo result) { LOG.trace("Data has put into datastore {} {}", store, path); } @@ -119,9 +117,9 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR final InstanceIdentifier path) { final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction(); tx.delete(OPERATIONAL, path); - Futures.addCallback(tx.submit(), new FutureCallback() { + tx.commit().addCallback(new FutureCallback() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final CommitInfo result) { LOG.trace("Data has deleted from datastore {} {}", store, path); } @@ -185,21 +183,18 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR } public void register(final EventSource eventSource) { - final NodeKey nodeKey = eventSource.getSourceNodeKey(); final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); - final RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation( - EventSourceService.class, eventSource); - reg.registerPath(NodeContext.class, sourcePath); - routedRpcRegistrations.put(nodeKey,reg); + final Registration reg = rpcRegistry.registerRpcImplementation(EventSourceService.class, eventSource, + Collections.singleton(sourcePath)); + routedRpcRegistrations.put(nodeKey, reg); insert(sourcePath); - } public void unRegister(final EventSource eventSource) { final NodeKey nodeKey = eventSource.getSourceNodeKey(); final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); - final RoutedRpcRegistration removeRegistration = routedRpcRegistrations.remove(nodeKey); + final Registration removeRegistration = routedRpcRegistrations.remove(nodeKey); if (removeRegistration != null) { removeRegistration.close(); remove(sourcePath); @@ -222,7 +217,7 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR } @VisibleForTesting - Map> getRoutedRpcRegistrations() { + Map getRoutedRpcRegistrations() { return routedRpcRegistrations; }