Merge "OPNFLWPLUG-1000 : Execute reconciliation asynchronously when the user selected...
authorAnil Vishnoi <vishnoianil@gmail.com>
Wed, 6 Jun 2018 09:39:20 +0000 (09:39 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 6 Jun 2018 09:39:20 +0000 (09:39 +0000)
1  2 
applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/AdminReconciliationServiceImpl.java

index d381c396084280fb0a38dd4597fc74eb7e9dddfa,b1daa2e0f236cd314b6a58a9a469d33b56eaa44d..5bdab2451512437cd6e5434c73b5a92daf280a58
@@@ -17,6 -17,8 +17,8 @@@ import java.util.ArrayList
  import java.util.Collections;
  import java.util.List;
  import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
  import java.util.concurrent.Future;
  import java.util.stream.Collectors;
  import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@@ -48,13 -50,15 +50,15 @@@ import org.opendaylight.yangtools.yang.
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- public class AdminReconciliationServiceImpl implements AdminReconciliationService {
+ public class AdminReconciliationServiceImpl implements AdminReconciliationService, AutoCloseable {
  
      private static final Logger LOG = LoggerFactory.getLogger(AdminReconciliationServiceImpl.class);
      private final DataBroker broker;
      private final ReconciliationService reconciliationService;
      private final Long startCount = 1L;
      private final AlarmAgent alarmAgent;
+     private static final int THREAD_POOL_SIZE = 10;
+     private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
  
      public AdminReconciliationServiceImpl(final DataBroker broker, final ReconciliationService reconciliationService,
                                            final AlarmAgent alarmAgent) {
          this.alarmAgent = alarmAgent;
      }
  
+     @Override
+     public void close() {
+         if (executor != null) {
+             executor.shutdownNow();
+         }
+     }
  
      @Override
      public ListenableFuture<RpcResult<ReconcileOutput>> reconcile(ReconcileInput input) {
              if (!unresolvedNodes.isEmpty()) {
                  return buildErrorResponse("Node(s) not found: " + String.join(", ", unresolvedNodes.toString()));
              }
-             for (Long nodeId : nodesToReconcile) {
+             nodesToReconcile.parallelStream().forEach(nodeId -> {
                  alarmAgent.raiseAdminReconciliationAlarm(nodeId);
                  LOG.info("Executing admin reconciliation for node {}", nodeId);
-                 BigInteger node = new BigInteger(String.valueOf(nodeId));
                  NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId));
-                 ReconcileNodeInput reconInput = new ReconcileNodeInputBuilder()
-                         .setNodeId(node).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
-                                 .child(Node.class, nodeKey).build())).build();
-                 Future<RpcResult<ReconcileNodeOutput>> reconOutput = reconciliationService
-                         .reconcileNode(reconInput);
-                 try {
-                     RpcResult<ReconcileNodeOutput> rpcResult = reconOutput.get();
-                     if (rpcResult.isSuccessful()) {
-                         increaseReconcileCount(node, true);
-                         LOG.info("Reconciliation successfully completed for node {}", nodeId);
-                     } else {
-                         increaseReconcileCount(node, false);
-                         LOG.error("Reconciliation failed for node {} with error {}", nodeId, rpcResult.getErrors());
-                     }
-                 } catch (ExecutionException | InterruptedException e) {
-                     LOG.error("Error occurred while invoking reconcile RPC for node {}", nodeId, e);
-                 }
-                 alarmAgent.clearAdminReconciliationAlarm(nodeId);
-             }
+                 ReconciliationTask reconcileTask = new ReconciliationTask(nodeId, nodeKey);
+                 executor.execute(reconcileTask);
+             });
          } else {
              return buildErrorResponse("No node found");
          }
          ReadWriteTransaction tx = broker.newReadWriteTransaction();
          Optional<ReconcileCounter> optional = readReconcileCounterFromDS(tx, instanceIdentifier, nodeId);
          ReconcileCounterBuilder counterBuilder = new ReconcileCounterBuilder()
 -                .setKey(new ReconcileCounterKey(nodeId)).setNodeId(nodeId)
 +                .withKey(new ReconcileCounterKey(nodeId)).setNodeId(nodeId)
                  .setLastRequestTime(LocalDateTime.now().toString());
          if (reconcileState) {
              counterBuilder.setSuccessCount(startCount);
          }
          return Optional.absent();
      }
+     private final class ReconciliationTask implements Runnable {
+         private final NodeKey nodeKey;
+         private final Long nodeId;
+         private ReconciliationTask(Long nodeId, NodeKey nodeKey) {
+             this.nodeId = nodeId;
+             this.nodeKey = nodeKey;
+         }
+         @Override
+         public void run() {
+             BigInteger node = new BigInteger(String.valueOf(nodeId));
+             ReconcileNodeInput reconInput = new ReconcileNodeInputBuilder()
+                     .setNodeId(node).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
+                             .child(Node.class, nodeKey).build())).build();
+             Future<RpcResult<ReconcileNodeOutput>> reconOutput = reconciliationService
+                     .reconcileNode(reconInput);
+             try {
+                 RpcResult<ReconcileNodeOutput> rpcResult = reconOutput.get();
+                 if (rpcResult.isSuccessful()) {
+                     increaseReconcileCount(node, true);
+                     LOG.info("Reconciliation successfully completed for node {}", nodeId);
+                 } else {
+                     increaseReconcileCount(node, false);
+                     LOG.error("Reconciliation failed for node {} with error {}", nodeId, rpcResult.getErrors());
+                 }
+             } catch (ExecutionException | InterruptedException e) {
+                 LOG.error("Error occurred while invoking reconcile RPC for node {}", nodeId, e);
+             }
+             alarmAgent.clearAdminReconciliationAlarm(nodeId);
+         }
+     }
  }