* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.messagebus.app.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.messagebus.app.util.Util;
import org.opendaylight.controller.messagebus.spi.EventSource;
import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Deprecated(forRemoval = true)
public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
- InstanceIdentifier.create(NetworkTopology.class)
- .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
+ InstanceIdentifier.create(NetworkTopology.class).child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
- private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH =
- EVENT_SOURCE_TOPOLOGY_PATH
- .child(TopologyTypes.class)
- .augmentation(TopologyTypes1.class);
+ private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH = EVENT_SOURCE_TOPOLOGY_PATH
+ .child(TopologyTypes.class).augmentation(TopologyTypes1.class);
- private final Map<TopicId,EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
- private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
- new ConcurrentHashMap<>();
+ private final Map<TopicId, EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
+ private final Map<NodeKey, Registration> routedRpcRegistrations = new ConcurrentHashMap<>();
private final DataBroker dataBroker;
- private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
+ private final ObjectRegistration<EventSourceTopology> aggregatorRpcReg;
private final EventSourceService eventSourceService;
- private final RpcProviderRegistry rpcRegistry;
+ private final RpcProviderService rpcRegistry;
- public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
+ public EventSourceTopology(final DataBroker dataBroker, final RpcProviderService providerService,
+ final RpcConsumerRegistry rpcService) {
this.dataBroker = dataBroker;
- this.rpcRegistry = rpcRegistry;
- aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
- eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
+ this.rpcRegistry = providerService;
+ aggregatorRpcReg = providerService.registerRpcImplementation(EventAggregatorService.class, this);
+ eventSourceService = rpcService.getRpcService(EventSourceService.class);
final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
final TopologyTypes1 topologyTypeAugment =
LOG.info("EventSourceRegistry has been initialized");
}
- private <T extends DataObject> void putData(final LogicalDatastoreType store,
+ private <T extends DataObject> void putData(final LogicalDatastoreType store,
final InstanceIdentifier<T> path,
final T data) {
final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
- tx.put(store, path, data, true);
- Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+ tx.mergeParentStructurePut(store, path, data);
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final CommitInfo result) {
LOG.trace("Data has put into datastore {} {}", store, path);
}
@Override
public void onFailure(final Throwable ex) {
- LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
+ LOG.error("Can not put data into datastore [store: {}] [path: {}]", store, path, ex);
}
}, MoreExecutors.directExecutor());
}
final InstanceIdentifier<T> path) {
final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
tx.delete(OPERATIONAL, path);
- Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final CommitInfo result) {
LOG.trace("Data has deleted from datastore {} {}", store, path);
}
@Override
public void onFailure(final Throwable ex) {
- LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
+ LOG.error("Can not delete data from datastore [store: {}] [path: {}]", store, path, ex);
}
}, MoreExecutors.directExecutor());
}
}
public void register(final EventSource eventSource) {
-
final NodeKey nodeKey = eventSource.getSourceNodeKey();
final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
- final RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(
- EventSourceService.class, eventSource);
- reg.registerPath(NodeContext.class, sourcePath);
- routedRpcRegistrations.put(nodeKey,reg);
+ final Registration reg = rpcRegistry.registerRpcImplementation(EventSourceService.class, eventSource,
+ Collections.singleton(sourcePath));
+ routedRpcRegistrations.put(nodeKey, reg);
insert(sourcePath);
-
}
public void unRegister(final EventSource eventSource) {
final NodeKey nodeKey = eventSource.getSourceNodeKey();
final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
- final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
+ final Registration removeRegistration = routedRpcRegistrations.remove(nodeKey);
if (removeRegistration != null) {
removeRegistration.close();
remove(sourcePath);
}
@VisibleForTesting
- Map<NodeKey, RoutedRpcRegistration<EventSourceService>> getRoutedRpcRegistrations() {
+ Map<NodeKey, Registration> getRoutedRpcRegistrations() {
return routedRpcRegistrations;
}