2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.southboundcli;
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.SettableFuture;
14 import java.math.BigInteger;
15 import java.time.LocalDateTime;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.Future;
22 import java.util.stream.Collectors;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.openflowplugin.applications.southboundcli.alarm.AlarmAgent;
27 import org.opendaylight.openflowplugin.applications.southboundcli.util.OFNode;
28 import org.opendaylight.openflowplugin.applications.southboundcli.util.ShellUtil;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.FrmReconciliationService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInputBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationCounter;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounter;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterKey;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 public class ReconciliationServiceImpl implements ReconciliationService, AutoCloseable {
54 private static final Logger LOG = LoggerFactory.getLogger(ReconciliationServiceImpl.class);
55 private final DataBroker broker;
56 private final FrmReconciliationService frmReconciliationService;
57 private final Long startCount = 1L;
58 private final AlarmAgent alarmAgent;
59 private static final int THREAD_POOL_SIZE = 10;
60 private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
62 public ReconciliationServiceImpl(final DataBroker broker, final FrmReconciliationService frmReconciliationService,
63 final AlarmAgent alarmAgent) {
65 this.frmReconciliationService = frmReconciliationService;
66 this.alarmAgent = alarmAgent;
71 if (executor != null) {
72 executor.shutdownNow();
77 public ListenableFuture<RpcResult<ReconcileOutput>> reconcile(ReconcileInput input) {
78 boolean reconcileAllNodes = input.isReconcileAllNodes();
79 List<BigInteger> inputNodes = input.getNodes();
80 if (inputNodes == null) {
81 inputNodes = new ArrayList<>();
83 if (reconcileAllNodes && inputNodes.size() > 0) {
84 return buildErrorResponse("Error executing command reconcile. "
85 + "If 'all' option is enabled, no Node must be specified as input parameter.");
87 if (!reconcileAllNodes && inputNodes.size() == 0) {
88 return buildErrorResponse("Error executing command reconcile. No Node information was specified.");
90 SettableFuture<RpcResult<ReconcileOutput>> result = SettableFuture.create();
91 List<Long> nodeList = getAllNodes();
92 List<Long> nodesToReconcile = reconcileAllNodes ? nodeList :
93 inputNodes.stream().distinct().map(node -> node.longValue()).collect(Collectors.toList());
94 if (nodesToReconcile.size() > 0) {
95 List<Long> unresolvedNodes =
96 nodesToReconcile.stream().filter(node -> !nodeList.contains(node)).collect(Collectors.toList());
97 if (!unresolvedNodes.isEmpty()) {
98 return buildErrorResponse("Error executing command reconcile. "
99 + "Node(s) not found: " + String.join(", ", unresolvedNodes.toString()));
101 nodesToReconcile.parallelStream().forEach(nodeId -> {
102 alarmAgent.raiseNodeReconciliationAlarm(nodeId);
103 LOG.info("Executing reconciliation for node {}", nodeId);
104 NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId));
105 ReconciliationTask reconcileTask = new ReconciliationTask(nodeId, nodeKey);
106 executor.execute(reconcileTask);
109 return buildErrorResponse("Error executing command reconcile. "
110 + "No node information is found for reconciliation");
112 result.set(RpcResultBuilder.<ReconcileOutput>success().build());
116 private ListenableFuture<RpcResult<ReconcileOutput>> buildErrorResponse(String msg) {
117 SettableFuture<RpcResult<ReconcileOutput>> result = SettableFuture.create();
119 RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.PROTOCOL, "reconcile", msg);
120 result.set(RpcResultBuilder.<ReconcileOutput>failed().withRpcError(error).build());
124 public List<Long> getAllNodes() {
125 List<OFNode> nodeList = ShellUtil.getAllNodes(broker);
126 List<Long> nodes = nodeList.stream().distinct().map(node -> node.getNodeId()).collect(Collectors.toList());
130 private void increaseReconcileCount(BigInteger nodeId, Boolean reconcileState) {
131 InstanceIdentifier<ReconcileCounter> instanceIdentifier = InstanceIdentifier
132 .builder(ReconciliationCounter.class).child(ReconcileCounter.class,
133 new ReconcileCounterKey(nodeId)).build();
134 ReadWriteTransaction tx = broker.newReadWriteTransaction();
135 Optional<ReconcileCounter> optional = readReconcileCounterFromDS(tx, instanceIdentifier, nodeId);
136 ReconcileCounterBuilder counterBuilder = new ReconcileCounterBuilder()
137 .withKey(new ReconcileCounterKey(nodeId)).setNodeId(nodeId)
138 .setLastRequestTime(LocalDateTime.now().toString());
139 if (reconcileState) {
140 counterBuilder.setSuccessCount(startCount);
141 if (optional.isPresent()) {
142 ReconcileCounter counter = optional.get();
143 Long successCount = counter.getSuccessCount();
144 counterBuilder.setSuccessCount(++successCount);
145 LOG.debug("Reconcile Success count {} for the node: {} ", successCount, nodeId);
148 counterBuilder.setFailureCount(startCount);
149 if (optional.isPresent()) {
150 ReconcileCounter counter = optional.get();
151 Long failureCount = counter.getFailureCount();
152 counterBuilder.setFailureCount(++failureCount);
153 LOG.debug("Reconcile Failure count {} for the node: {} ", failureCount, nodeId);
157 tx.merge(LogicalDatastoreType.OPERATIONAL, instanceIdentifier, counterBuilder.build(), true);
159 } catch (InterruptedException | ExecutionException e) {
160 LOG.error("Exception while submitting counter {}", nodeId, e);
164 private Optional<ReconcileCounter> readReconcileCounterFromDS(ReadWriteTransaction tx,
165 InstanceIdentifier<ReconcileCounter> instanceIdentifier, BigInteger nodeId) {
167 return tx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier).get();
168 } catch (InterruptedException | ExecutionException e) {
169 LOG.error("Exception while reading counter for node: {}", nodeId, e);
171 return Optional.absent();
174 private final class ReconciliationTask implements Runnable {
175 private final NodeKey nodeKey;
176 private final Long nodeId;
178 private ReconciliationTask(Long nodeId, NodeKey nodeKey) {
179 this.nodeId = nodeId;
180 this.nodeKey = nodeKey;
185 BigInteger node = new BigInteger(String.valueOf(nodeId));
186 ReconcileNodeInput reconInput = new ReconcileNodeInputBuilder()
187 .setNodeId(node).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
188 .child(Node.class, nodeKey).build())).build();
189 Future<RpcResult<ReconcileNodeOutput>> reconOutput = frmReconciliationService
190 .reconcileNode(reconInput);
192 RpcResult<ReconcileNodeOutput> rpcResult = reconOutput.get();
193 if (rpcResult.isSuccessful()) {
194 increaseReconcileCount(node, true);
195 LOG.info("Reconciliation successfully completed for node {}", nodeId);
197 increaseReconcileCount(node, false);
198 LOG.error("Reconciliation failed for node {} with error {}", nodeId, rpcResult.getErrors());
200 } catch (ExecutionException | InterruptedException e) {
201 LOG.error("Error occurred while invoking reconcile RPC for node {}", nodeId, e);
203 alarmAgent.clearNodeReconciliationAlarm(nodeId);