Merge "Bug 2194: Modify FindPrimary to check for leader"
authorMoiz Raja <moraja@cisco.com>
Wed, 25 Mar 2015 16:15:58 +0000 (16:15 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 25 Mar 2015 16:15:59 +0000 (16:15 +0000)
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java

index 603f34bac994d9ff44c8e37ed9f99bca11b17c47..076d1b2fc7e9c1d42fee6e0ece1d7c9adc4a3228 100644 (file)
@@ -8,16 +8,11 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.regex.Pattern;
 
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
@@ -57,7 +52,10 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.regex.Pattern;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 
 public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
@@ -82,11 +80,10 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
     private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
     private final EventSourceService eventSourceService;
     private final RpcProviderRegistry rpcRegistry;
-    private final ExecutorService executorService;
 
     public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
+
         this.dataBroker = dataBroker;
-        this.executorService = Executors.newCachedThreadPool();
         this.rpcRegistry = rpcRegistry;
         aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
         eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
@@ -94,14 +91,17 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
         final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
         final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
         putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
+
     }
 
     private <T extends DataObject>  void putData(final LogicalDatastoreType store,
-            final InstanceIdentifier<T> path, final T data) {
+                                                 final InstanceIdentifier<T> path,
+                                                 final T data){
 
         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
         tx.put(store, path, data, true);
         tx.submit();
+
     }
 
     private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
@@ -112,7 +112,34 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
     }
 
     private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
-        executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic));
+
+        final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
+
+        final CheckedFuture<Optional<Topology>, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH);
+
+        Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
+
+            @Override
+            public void onSuccess(Optional<Topology> data) {
+                if(data.isPresent()) {
+                     final List<Node> nodes = data.get().getNode();
+                     for (final Node node : nodes) {
+                         if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
+                             eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+                         }
+                     }
+                }
+                tx.close();
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.error("Can not notify existing nodes {}", t);
+                tx.close();
+            }
+
+        });
+
     }
 
     @Override
@@ -164,45 +191,4 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
         // FIXME: Return registration object.
     }
 
-    private class NotifyAllNodeExecutor implements Runnable {
-
-        private final EventSourceTopic topic;
-        private final DataBroker dataBroker;
-        private final Pattern nodeIdPatternRegex;
-
-        public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) {
-            this.topic = topic;
-            this.dataBroker = dataBroker;
-            this.nodeIdPatternRegex = nodeIdPatternRegex;
-        }
-
-        @Override
-        public void run() {
-            //# Code reader note: Context of Node type is NetworkTopology
-            final List<Node> nodes = snapshot();
-            for (final Node node : nodes) {
-                if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
-                    topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
-                }
-            }
-        }
-
-        private List<Node> snapshot() {
-            try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) {
-
-                final Optional<Topology> data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet();
-
-                if(data.isPresent()) {
-                    final List<Node> nodeList = data.get().getNode();
-                    if(nodeList != null) {
-                        return nodeList;
-                    }
-                }
-                return Collections.emptyList();
-            } catch (final ReadFailedException e) {
-                LOG.error("Unable to retrieve node list.", e);
-                return Collections.emptyList();
-            }
-        }
-    }
 }