X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fsouthbound-cli%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Fsouthboundcli%2FReconciliationServiceImpl.java;h=6a9118eb6a23c43d39df2686fb315364bbb8d2fd;hb=93fa4f844b0712c35a01f16ec321693557496202;hp=6680c58c11e75d0ccdf4411de92641994aa6ba3f;hpb=05f8db12159673d0e0a95642fe86e62c14b7dc7b;p=openflowplugin.git diff --git a/applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/ReconciliationServiceImpl.java b/applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/ReconciliationServiceImpl.java index 6680c58c11..6a9118eb6a 100644 --- a/applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/ReconciliationServiceImpl.java +++ b/applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/ReconciliationServiceImpl.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.applications.southboundcli; import static java.util.Objects.requireNonNull; @@ -13,17 +12,16 @@ import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.R import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.FAILED; import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.STARTED; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.math.BigInteger; import java.text.SimpleDateFormat; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -56,25 +54,25 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.ErrorTag; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.common.Uint32; import org.opendaylight.yangtools.yang.common.Uint64; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ReconciliationServiceImpl implements ReconciliationService, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(ReconciliationServiceImpl.class); private final DataBroker broker; private final FrmReconciliationService frmReconciliationService; private final AlarmAgent alarmAgent; private final NodeListener nodeListener; - private final Long startCount = 1L; - private final int threadPoolSize = 10; - private final ExecutorService executor = Executors.newWorkStealingPool(threadPoolSize); - private volatile Map reconciliationStates = new ConcurrentHashMap(); + private final Map reconciliationStates; + + private ExecutorService executor = Executors.newWorkStealingPool(10); public ReconciliationServiceImpl(final DataBroker broker, final FrmReconciliationService frmReconciliationService, final AlarmAgent alarmAgent, final NodeListener nodeListener, @@ -90,15 +88,16 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo public void close() { if (executor != null) { executor.shutdownNow(); + executor = null; } } @Override - public ListenableFuture> reconcile(ReconcileInput input) { - boolean reconcileAllNodes = input.isReconcileAllNodes(); - List inputNodes = input.getNodes(); + public ListenableFuture> reconcile(final ReconcileInput input) { + boolean reconcileAllNodes = input.getReconcileAllNodes(); + Set inputNodes = input.getNodes(); if (inputNodes == null) { - inputNodes = new ArrayList<>(); + inputNodes = Set.of(); } if (reconcileAllNodes && inputNodes.size() > 0) { return buildErrorResponse("Error executing command reconcile. " @@ -118,7 +117,7 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo return buildErrorResponse("Error executing command reconcile. " + "Node(s) not found: " + String.join(", ", unresolvedNodes.toString())); } - List inprogressNodes = new ArrayList<>(); + ImmutableSet.Builder inprogressNodes = ImmutableSet.builder(); nodesToReconcile.parallelStream().forEach(nodeId -> { ReconciliationState state = getReconciliationState(nodeId); if (state != null && state.getState().equals(STARTED)) { @@ -127,13 +126,12 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo alarmAgent.raiseNodeReconciliationAlarm(nodeId); LOG.info("Executing reconciliation for node {} with state ", nodeId); NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId)); - ReconciliationTask reconcileTask = new ReconciliationTask(new BigInteger(String.valueOf(nodeId)), - nodeKey); + ReconciliationTask reconcileTask = new ReconciliationTask(Uint64.valueOf(nodeId), nodeKey); executor.execute(reconcileTask); } }); ReconcileOutput reconcilingInProgress = new ReconcileOutputBuilder() - .setInprogressNodes(inprogressNodes) + .setInprogressNodes(inprogressNodes.build()) .build(); result.set(RpcResultBuilder.success(reconcilingInProgress).build()); return result; @@ -147,26 +145,25 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo return reconciliationStates.get(nodeId.toString()); } - private ListenableFuture> buildErrorResponse(String msg) { - SettableFuture> result = SettableFuture.create(); + private static ListenableFuture> buildErrorResponse(final String msg) { LOG.error("Error {}", msg); - RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.PROTOCOL, "reconcile", msg); - result.set(RpcResultBuilder.failed().withRpcError(error).build()); - return result; + return RpcResultBuilder.failed() + .withError(ErrorType.PROTOCOL, new ErrorTag("reconcile"), msg) + .buildFuture(); } private List getAllNodes() { List nodeList = ShellUtil.getAllNodes(nodeListener); - List nodes = nodeList.stream().distinct().map(node -> node.getNodeId()).collect(Collectors.toList()); + List nodes = nodeList.stream().distinct().map(OFNode::getNodeId).collect(Collectors.toList()); return nodes; } private final class ReconciliationTask implements Runnable { private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; private final NodeKey nodeKey; - private final BigInteger nodeId; + private final Uint64 nodeId; - private ReconciliationTask(BigInteger nodeId, NodeKey nodeKey) { + private ReconciliationTask(final Uint64 nodeId, final NodeKey nodeKey) { this.nodeId = nodeId; this.nodeKey = nodeKey; } @@ -184,22 +181,23 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo if (rpcResult.isSuccessful()) { increaseReconcileCount(true); updateReconciliationState(COMPLETED); - LOG.info("Reconciliation successfully completed for node {}", this.nodeId); + LOG.info("Reconciliation successfully completed for node {}", nodeId); } else { increaseReconcileCount(false); updateReconciliationState(FAILED); - LOG.error("Reconciliation failed for node {} with error {}", this.nodeId, rpcResult.getErrors()); + LOG.error("Reconciliation failed for node {} with error {}", nodeId, rpcResult.getErrors()); } } catch (ExecutionException | InterruptedException e) { increaseReconcileCount(false); updateReconciliationState(FAILED); - LOG.error("Error occurred while invoking reconcile RPC for node {}", this.nodeId, e); + LOG.error("Error occurred while invoking reconcile RPC for node {}", nodeId, e); } finally { alarmAgent.clearNodeReconciliationAlarm(nodeId.longValue()); } } private void increaseReconcileCount(final boolean isSuccess) { + // FIXME: do not use SimpleDateFormat final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT); InstanceIdentifier instanceIdentifier = InstanceIdentifier .builder(ReconciliationCounter.class).child(ReconcileCounter.class, @@ -212,20 +210,18 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo if (isSuccess) { if (count.isPresent()) { - long successCount = count.get().getSuccessCount().toJava(); - counterBuilder.setSuccessCount(++successCount); + long successCount = count.orElseThrow().getSuccessCount().toJava(); + counterBuilder.setSuccessCount(Uint32.valueOf(++successCount)); LOG.debug("Reconcile success count {} for the node: {} ", successCount, nodeId); } else { - counterBuilder.setSuccessCount(startCount); + counterBuilder.setSuccessCount(Uint32.ONE); } + } else if (count.isPresent()) { + long failureCount = count.orElseThrow().getFailureCount().toJava(); + counterBuilder.setFailureCount(Uint32.valueOf(++failureCount)); + LOG.debug("Reconcile failure count {} for the node: {} ", failureCount, nodeId); } else { - if (count.isPresent()) { - long failureCount = count.get().getFailureCount().toJava(); - counterBuilder.setFailureCount(++failureCount); - LOG.debug("Reconcile failure count {} for the node: {} ", failureCount, nodeId); - } else { - counterBuilder.setFailureCount(startCount); - } + counterBuilder.setFailureCount(Uint32.ONE); } try { tx.mergeParentStructureMerge(LogicalDatastoreType.OPERATIONAL, instanceIdentifier, @@ -236,8 +232,8 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo } } - private Optional getReconciliationCount(ReadWriteTransaction tx, - InstanceIdentifier instanceIdentifier) { + private Optional getReconciliationCount(final ReadWriteTransaction tx, + final InstanceIdentifier instanceIdentifier) { try { return tx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier).get(); } catch (InterruptedException | ExecutionException e) { @@ -246,8 +242,7 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo return Optional.empty(); } - - private void updateReconciliationState(ReconciliationState.ReconciliationStatus status) { + private void updateReconciliationState(final ReconciliationState.ReconciliationStatus status) { ReconciliationState state = new ReconciliationState(status, LocalDateTime.now()); reconciliationStates.put(nodeId.toString(),state); }