X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fsouthbound-cli%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Fsouthboundcli%2FReconciliationServiceImpl.java;h=d20ba88a75700b86f3b9f8833dcb46d8da106623;hb=777c94332871b8c34f56f7f2010de1536cb759ba;hp=9c0112aaa94d5adb4a4a3eaad4febe7551595f0c;hpb=d9da5c836b10f219dafd9289967a016443b8b881;p=openflowplugin.git diff --git a/applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/ReconciliationServiceImpl.java b/applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/ReconciliationServiceImpl.java index 9c0112aaa9..d20ba88a75 100644 --- a/applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/ReconciliationServiceImpl.java +++ b/applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/ReconciliationServiceImpl.java @@ -5,27 +5,37 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.applications.southboundcli; -import com.google.common.base.Optional; +import static java.util.Objects.requireNonNull; +import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.COMPLETED; +import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.FAILED; +import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.STARTED; + +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.math.BigInteger; +import java.text.SimpleDateFormat; import java.time.LocalDateTime; -import java.util.ArrayList; +import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.ReadWriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager; +import org.opendaylight.openflowplugin.api.openflow.ReconciliationState; import org.opendaylight.openflowplugin.applications.southboundcli.alarm.AlarmAgent; import org.opendaylight.openflowplugin.applications.southboundcli.util.OFNode; import org.opendaylight.openflowplugin.applications.southboundcli.util.ShellUtil; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; @@ -37,48 +47,58 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationCounter; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounter; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.ErrorTag; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.common.Uint32; +import org.opendaylight.yangtools.yang.common.Uint64; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ReconciliationServiceImpl implements ReconciliationService, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(ReconciliationServiceImpl.class); + private final DataBroker broker; private final FrmReconciliationService frmReconciliationService; - private final Long startCount = 1L; private final AlarmAgent alarmAgent; - private static final int THREAD_POOL_SIZE = 10; - private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); + private final NodeListener nodeListener; + private final int threadPoolSize = 10; + private final Map reconciliationStates; + + private ExecutorService executor = Executors.newWorkStealingPool(threadPoolSize); public ReconciliationServiceImpl(final DataBroker broker, final FrmReconciliationService frmReconciliationService, - final AlarmAgent alarmAgent) { + final AlarmAgent alarmAgent, final NodeListener nodeListener, + final FlowGroupCacheManager flowGroupCacheManager) { this.broker = broker; this.frmReconciliationService = frmReconciliationService; this.alarmAgent = alarmAgent; + this.nodeListener = requireNonNull(nodeListener, "NodeListener cannot be null!"); + reconciliationStates = flowGroupCacheManager.getReconciliationStates(); } @Override public void close() { if (executor != null) { executor.shutdownNow(); + executor = null; } } @Override - public ListenableFuture> reconcile(ReconcileInput input) { - boolean reconcileAllNodes = input.isReconcileAllNodes(); - List inputNodes = input.getNodes(); + public ListenableFuture> reconcile(final ReconcileInput input) { + boolean reconcileAllNodes = input.getReconcileAllNodes(); + Set inputNodes = input.getNodes(); if (inputNodes == null) { - inputNodes = new ArrayList<>(); + inputNodes = Set.of(); } if (reconcileAllNodes && inputNodes.size() > 0) { return buildErrorResponse("Error executing command reconcile. " @@ -90,7 +110,7 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo SettableFuture> result = SettableFuture.create(); List nodeList = getAllNodes(); List nodesToReconcile = reconcileAllNodes ? nodeList : - inputNodes.stream().distinct().map(node -> node.longValue()).collect(Collectors.toList()); + inputNodes.stream().distinct().map(Uint64::longValue).collect(Collectors.toList()); if (nodesToReconcile.size() > 0) { List unresolvedNodes = nodesToReconcile.stream().filter(node -> !nodeList.contains(node)).collect(Collectors.toList()); @@ -98,110 +118,136 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo return buildErrorResponse("Error executing command reconcile. " + "Node(s) not found: " + String.join(", ", unresolvedNodes.toString())); } + ImmutableSet.Builder inprogressNodes = ImmutableSet.builder(); nodesToReconcile.parallelStream().forEach(nodeId -> { - alarmAgent.raiseNodeReconciliationAlarm(nodeId); - LOG.info("Executing reconciliation for node {}", nodeId); - NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId)); - ReconciliationTask reconcileTask = new ReconciliationTask(nodeId, nodeKey); - executor.execute(reconcileTask); + ReconciliationState state = getReconciliationState(nodeId); + if (state != null && state.getState().equals(STARTED)) { + inprogressNodes.add(Uint64.valueOf(nodeId)); + } else { + alarmAgent.raiseNodeReconciliationAlarm(nodeId); + LOG.info("Executing reconciliation for node {} with state ", nodeId); + NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId)); + ReconciliationTask reconcileTask = new ReconciliationTask(Uint64.valueOf(nodeId), nodeKey); + executor.execute(reconcileTask); + } }); + ReconcileOutput reconcilingInProgress = new ReconcileOutputBuilder() + .setInprogressNodes(inprogressNodes.build()) + .build(); + result.set(RpcResultBuilder.success(reconcilingInProgress).build()); + return result; } else { return buildErrorResponse("Error executing command reconcile. " + "No node information is found for reconciliation"); } - result.set(RpcResultBuilder.success().build()); - return result; } - private ListenableFuture> buildErrorResponse(String msg) { - SettableFuture> result = SettableFuture.create(); - LOG.error(msg); - RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.PROTOCOL, "reconcile", msg); - result.set(RpcResultBuilder.failed().withRpcError(error).build()); - return result; + private ReconciliationState getReconciliationState(final Long nodeId) { + return reconciliationStates.get(nodeId.toString()); } - public List getAllNodes() { - List nodeList = ShellUtil.getAllNodes(broker); - List nodes = nodeList.stream().distinct().map(node -> node.getNodeId()).collect(Collectors.toList()); - return nodes; + private static ListenableFuture> buildErrorResponse(final String msg) { + LOG.error("Error {}", msg); + return RpcResultBuilder.failed() + .withError(ErrorType.PROTOCOL, new ErrorTag("reconcile"), msg) + .buildFuture(); } - private void increaseReconcileCount(BigInteger nodeId, Boolean reconcileState) { - InstanceIdentifier instanceIdentifier = InstanceIdentifier - .builder(ReconciliationCounter.class).child(ReconcileCounter.class, - new ReconcileCounterKey(nodeId)).build(); - ReadWriteTransaction tx = broker.newReadWriteTransaction(); - Optional optional = readReconcileCounterFromDS(tx, instanceIdentifier, nodeId); - ReconcileCounterBuilder counterBuilder = new ReconcileCounterBuilder() - .withKey(new ReconcileCounterKey(nodeId)).setNodeId(nodeId) - .setLastRequestTime(LocalDateTime.now().toString()); - if (reconcileState) { - counterBuilder.setSuccessCount(startCount); - if (optional.isPresent()) { - ReconcileCounter counter = optional.get(); - Long successCount = counter.getSuccessCount(); - counterBuilder.setSuccessCount(++successCount); - LOG.debug("Reconcile Success count {} for the node: {} ", successCount, nodeId); - } - } else { - counterBuilder.setFailureCount(startCount); - if (optional.isPresent()) { - ReconcileCounter counter = optional.get(); - Long failureCount = counter.getFailureCount(); - counterBuilder.setFailureCount(++failureCount); - LOG.debug("Reconcile Failure count {} for the node: {} ", failureCount, nodeId); - } - } - try { - tx.merge(LogicalDatastoreType.OPERATIONAL, instanceIdentifier, counterBuilder.build(), true); - tx.submit().get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Exception while submitting counter {}", nodeId, e); - } - } - - private Optional readReconcileCounterFromDS(ReadWriteTransaction tx, - InstanceIdentifier instanceIdentifier, BigInteger nodeId) { - try { - return tx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier).get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Exception while reading counter for node: {}", nodeId, e); - } - return Optional.absent(); + private List getAllNodes() { + List nodeList = ShellUtil.getAllNodes(nodeListener); + List nodes = nodeList.stream().distinct().map(OFNode::getNodeId).collect(Collectors.toList()); + return nodes; } private final class ReconciliationTask implements Runnable { + private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; private final NodeKey nodeKey; - private final Long nodeId; + private final Uint64 nodeId; - private ReconciliationTask(Long nodeId, NodeKey nodeKey) { + private ReconciliationTask(final Uint64 nodeId, final NodeKey nodeKey) { this.nodeId = nodeId; this.nodeKey = nodeKey; } @Override public void run() { - BigInteger node = new BigInteger(String.valueOf(nodeId)); ReconcileNodeInput reconInput = new ReconcileNodeInputBuilder() - .setNodeId(node).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class) + .setNodeId(nodeId).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class) .child(Node.class, nodeKey).build())).build(); + updateReconciliationState(STARTED); Future> reconOutput = frmReconciliationService .reconcileNode(reconInput); try { RpcResult rpcResult = reconOutput.get(); if (rpcResult.isSuccessful()) { - increaseReconcileCount(node, true); + increaseReconcileCount(true); + updateReconciliationState(COMPLETED); LOG.info("Reconciliation successfully completed for node {}", nodeId); } else { - increaseReconcileCount(node, false); + increaseReconcileCount(false); + updateReconciliationState(FAILED); LOG.error("Reconciliation failed for node {} with error {}", nodeId, rpcResult.getErrors()); } } catch (ExecutionException | InterruptedException e) { + increaseReconcileCount(false); + updateReconciliationState(FAILED); LOG.error("Error occurred while invoking reconcile RPC for node {}", nodeId, e); } finally { - alarmAgent.clearNodeReconciliationAlarm(nodeId); + alarmAgent.clearNodeReconciliationAlarm(nodeId.longValue()); + } + } + + private void increaseReconcileCount(final boolean isSuccess) { + // FIXME: do not use SimpleDateFormat + final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT); + InstanceIdentifier instanceIdentifier = InstanceIdentifier + .builder(ReconciliationCounter.class).child(ReconcileCounter.class, + new ReconcileCounterKey(nodeId)).build(); + ReadWriteTransaction tx = broker.newReadWriteTransaction(); + Optional count = getReconciliationCount(tx, instanceIdentifier); + ReconcileCounterBuilder counterBuilder = new ReconcileCounterBuilder() + .withKey(new ReconcileCounterKey(nodeId)) + .setLastRequestTime(new DateAndTime(simpleDateFormat.format(new Date()))); + + if (isSuccess) { + if (count.isPresent()) { + long successCount = count.get().getSuccessCount().toJava(); + counterBuilder.setSuccessCount(Uint32.valueOf(++successCount)); + LOG.debug("Reconcile success count {} for the node: {} ", successCount, nodeId); + } else { + counterBuilder.setSuccessCount(Uint32.ONE); + } + } else { + if (count.isPresent()) { + long failureCount = count.get().getFailureCount().toJava(); + counterBuilder.setFailureCount(Uint32.valueOf(++failureCount)); + LOG.debug("Reconcile failure count {} for the node: {} ", failureCount, nodeId); + } else { + counterBuilder.setFailureCount(Uint32.ONE); + } } + try { + tx.mergeParentStructureMerge(LogicalDatastoreType.OPERATIONAL, instanceIdentifier, + counterBuilder.build()); + tx.commit().get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Exception while submitting counter for {}", nodeId, e); + } + } + + private Optional getReconciliationCount(final ReadWriteTransaction tx, + final InstanceIdentifier instanceIdentifier) { + try { + return tx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier).get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Exception while reading counter for node: {}", nodeId, e); + } + return Optional.empty(); + } + + private void updateReconciliationState(final ReconciliationState.ReconciliationStatus status) { + ReconciliationState state = new ReconciliationState(status, LocalDateTime.now()); + reconciliationStates.put(nodeId.toString(),state); } } }