import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
- final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
+ final TopologyTypes1 topologyTypeAugment =
+ new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
LOG.info("EventSourceRegistry has been initialized");
}
private <T extends DataObject> void putData(final LogicalDatastoreType store,
final InstanceIdentifier<T> path,
- final T data){
+ final T data) {
final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
tx.put(store, path, data, true);
- Futures.addCallback( tx.submit(), new FutureCallback<Void>(){
-
+ Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
LOG.trace("Data has put into datastore {} {}", store, path);
}
@Override
- public void onFailure(final Throwable t) {
- LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, t);
+ public void onFailure(final Throwable ex) {
+ LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
}
- });
-
+ }, MoreExecutors.directExecutor());
}
- private <T extends DataObject> void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
+ private <T extends DataObject> void deleteData(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path) {
final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
tx.delete(OPERATIONAL, path);
- Futures.addCallback( tx.submit(), new FutureCallback<Void>(){
-
+ Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
LOG.trace("Data has deleted from datastore {} {}", store, path);
}
@Override
- public void onFailure(final Throwable t) {
- LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, t);
+ public void onFailure(final Throwable ex) {
+ LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
}
-
- });
+ }, MoreExecutors.directExecutor());
}
private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
final NodeKey nodeKey = sourcePath.getKey();
final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
- final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
+ final Node1 nodeAgument = new Node1Builder().setEventSourceNode(
+ new NodeId(nodeKey.getNodeId().getValue())).build();
putData(OPERATIONAL, augmentPath, nodeAgument);
}
- private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath){
+ private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
deleteData(OPERATIONAL, augmentPath);
}
@Override
public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
- if(topicToDestroy != null){
+ if (topicToDestroy != null) {
topicToDestroy.close();
}
return Util.resultRpcSuccessFor((Void) null);
@Override
public void close() {
aggregatorRpcReg.close();
- for(final EventSourceTopic est : eventSourceTopicMap.values()){
- est.close();
- }
+ eventSourceTopicMap.values().forEach(EventSourceTopic::close);
}
- public void register(final EventSource eventSource){
+ public void register(final EventSource eventSource) {
final NodeKey nodeKey = eventSource.getSourceNodeKey();
final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
- final RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
+ final RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(
+ EventSourceService.class, eventSource);
reg.registerPath(NodeContext.class, sourcePath);
routedRpcRegistrations.put(nodeKey,reg);
insert(sourcePath);
}
- public void unRegister(final EventSource eventSource){
+ public void unRegister(final EventSource eventSource) {
final NodeKey nodeKey = eventSource.getSourceNodeKey();
final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
- if(removeRegistration != null){
+ if (removeRegistration != null) {
removeRegistration.close();
- remove(sourcePath);
+ remove(sourcePath);
}
}