import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- public class AdminReconciliationServiceImpl implements AdminReconciliationService {
+ public class AdminReconciliationServiceImpl implements AdminReconciliationService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AdminReconciliationServiceImpl.class);
private final DataBroker broker;
private final ReconciliationService reconciliationService;
private final Long startCount = 1L;
private final AlarmAgent alarmAgent;
+ private static final int THREAD_POOL_SIZE = 10;
+ private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
public AdminReconciliationServiceImpl(final DataBroker broker, final ReconciliationService reconciliationService,
final AlarmAgent alarmAgent) {
this.alarmAgent = alarmAgent;
}
+ @Override
+ public void close() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
@Override
public ListenableFuture<RpcResult<ReconcileOutput>> reconcile(ReconcileInput input) {
if (!unresolvedNodes.isEmpty()) {
return buildErrorResponse("Node(s) not found: " + String.join(", ", unresolvedNodes.toString()));
}
- for (Long nodeId : nodesToReconcile) {
+ nodesToReconcile.parallelStream().forEach(nodeId -> {
alarmAgent.raiseAdminReconciliationAlarm(nodeId);
LOG.info("Executing admin reconciliation for node {}", nodeId);
- BigInteger node = new BigInteger(String.valueOf(nodeId));
NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId));
- ReconcileNodeInput reconInput = new ReconcileNodeInputBuilder()
- .setNodeId(node).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
- .child(Node.class, nodeKey).build())).build();
- Future<RpcResult<ReconcileNodeOutput>> reconOutput = reconciliationService
- .reconcileNode(reconInput);
- try {
- RpcResult<ReconcileNodeOutput> rpcResult = reconOutput.get();
- if (rpcResult.isSuccessful()) {
- increaseReconcileCount(node, true);
- LOG.info("Reconciliation successfully completed for node {}", nodeId);
- } else {
- increaseReconcileCount(node, false);
- LOG.error("Reconciliation failed for node {} with error {}", nodeId, rpcResult.getErrors());
- }
- } catch (ExecutionException | InterruptedException e) {
- LOG.error("Error occurred while invoking reconcile RPC for node {}", nodeId, e);
- }
- alarmAgent.clearAdminReconciliationAlarm(nodeId);
- }
+ ReconciliationTask reconcileTask = new ReconciliationTask(nodeId, nodeKey);
+ executor.execute(reconcileTask);
+ });
} else {
return buildErrorResponse("No node found");
}
ReadWriteTransaction tx = broker.newReadWriteTransaction();
Optional<ReconcileCounter> optional = readReconcileCounterFromDS(tx, instanceIdentifier, nodeId);
ReconcileCounterBuilder counterBuilder = new ReconcileCounterBuilder()
- .setKey(new ReconcileCounterKey(nodeId)).setNodeId(nodeId)
+ .withKey(new ReconcileCounterKey(nodeId)).setNodeId(nodeId)
.setLastRequestTime(LocalDateTime.now().toString());
if (reconcileState) {
counterBuilder.setSuccessCount(startCount);
}
return Optional.absent();
}
+
+ private final class ReconciliationTask implements Runnable {
+ private final NodeKey nodeKey;
+ private final Long nodeId;
+
+ private ReconciliationTask(Long nodeId, NodeKey nodeKey) {
+ this.nodeId = nodeId;
+ this.nodeKey = nodeKey;
+ }
+
+ @Override
+ public void run() {
+ BigInteger node = new BigInteger(String.valueOf(nodeId));
+ ReconcileNodeInput reconInput = new ReconcileNodeInputBuilder()
+ .setNodeId(node).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class, nodeKey).build())).build();
+ Future<RpcResult<ReconcileNodeOutput>> reconOutput = reconciliationService
+ .reconcileNode(reconInput);
+ try {
+ RpcResult<ReconcileNodeOutput> rpcResult = reconOutput.get();
+ if (rpcResult.isSuccessful()) {
+ increaseReconcileCount(node, true);
+ LOG.info("Reconciliation successfully completed for node {}", nodeId);
+ } else {
+ increaseReconcileCount(node, false);
+ LOG.error("Reconciliation failed for node {} with error {}", nodeId, rpcResult.getErrors());
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("Error occurred while invoking reconcile RPC for node {}", nodeId, e);
+ }
+ alarmAgent.clearAdminReconciliationAlarm(nodeId);
+ }
+ }
}