Reconciliation Failure Count is not increased in failure scenarios.
[openflowplugin.git] / applications / southbound-cli / src / main / java / org / opendaylight / openflowplugin / applications / southboundcli / ReconciliationServiceImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.southboundcli;
10
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.SettableFuture;
14 import java.math.BigInteger;
15 import java.time.LocalDateTime;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.Future;
22 import java.util.stream.Collectors;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.openflowplugin.applications.southboundcli.alarm.AlarmAgent;
27 import org.opendaylight.openflowplugin.applications.southboundcli.util.OFNode;
28 import org.opendaylight.openflowplugin.applications.southboundcli.util.ShellUtil;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.FrmReconciliationService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInputBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationCounter;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounter;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterKey;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 public class ReconciliationServiceImpl implements ReconciliationService, AutoCloseable {
53
54     private static final Logger LOG = LoggerFactory.getLogger(ReconciliationServiceImpl.class);
55     private final DataBroker broker;
56     private final FrmReconciliationService frmReconciliationService;
57     private final Long startCount = 1L;
58     private final AlarmAgent alarmAgent;
59     private static final int THREAD_POOL_SIZE = 10;
60     private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
61
62     public ReconciliationServiceImpl(final DataBroker broker, final FrmReconciliationService frmReconciliationService,
63                                      final AlarmAgent alarmAgent) {
64         this.broker = broker;
65         this.frmReconciliationService = frmReconciliationService;
66         this.alarmAgent = alarmAgent;
67     }
68
69     @Override
70     public void close() {
71         if (executor != null) {
72             executor.shutdownNow();
73         }
74     }
75
76     @Override
77     public ListenableFuture<RpcResult<ReconcileOutput>> reconcile(ReconcileInput input) {
78         boolean reconcileAllNodes = input.isReconcileAllNodes();
79         List<BigInteger> inputNodes = input.getNodes();
80         if (inputNodes == null) {
81             inputNodes = new ArrayList<>();
82         }
83         if (reconcileAllNodes && inputNodes.size() > 0) {
84             return buildErrorResponse("Error executing command reconcile. "
85                     + "If 'all' option is enabled, no Node must be specified as input parameter.");
86         }
87         if (!reconcileAllNodes && inputNodes.size() == 0) {
88             return buildErrorResponse("Error executing command reconcile. No Node information was specified.");
89         }
90         SettableFuture<RpcResult<ReconcileOutput>> result = SettableFuture.create();
91         List<Long> nodeList = getAllNodes();
92         List<Long> nodesToReconcile = reconcileAllNodes ? nodeList :
93                 inputNodes.stream().distinct().map(node -> node.longValue()).collect(Collectors.toList());
94         if (nodesToReconcile.size() > 0) {
95             List<Long> unresolvedNodes =
96                     nodesToReconcile.stream().filter(node -> !nodeList.contains(node)).collect(Collectors.toList());
97             if (!unresolvedNodes.isEmpty()) {
98                 return buildErrorResponse("Error executing command reconcile. "
99                         + "Node(s) not found: " + String.join(", ", unresolvedNodes.toString()));
100             }
101             nodesToReconcile.parallelStream().forEach(nodeId -> {
102                 alarmAgent.raiseNodeReconciliationAlarm(nodeId);
103                 LOG.info("Executing reconciliation for node {}", nodeId);
104                 NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId));
105                 ReconciliationTask reconcileTask = new ReconciliationTask(nodeId, nodeKey);
106                 executor.execute(reconcileTask);
107             });
108         } else {
109             return buildErrorResponse("Error executing command reconcile. "
110                     + "No node information is found for reconciliation");
111         }
112         result.set(RpcResultBuilder.<ReconcileOutput>success().build());
113         return result;
114     }
115
116     private ListenableFuture<RpcResult<ReconcileOutput>> buildErrorResponse(String msg) {
117         SettableFuture<RpcResult<ReconcileOutput>> result = SettableFuture.create();
118         LOG.error(msg);
119         RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.PROTOCOL, "reconcile", msg);
120         result.set(RpcResultBuilder.<ReconcileOutput>failed().withRpcError(error).build());
121         return result;
122     }
123
124     public List<Long> getAllNodes() {
125         List<OFNode> nodeList = ShellUtil.getAllNodes(broker);
126         List<Long> nodes = nodeList.stream().distinct().map(node -> node.getNodeId()).collect(Collectors.toList());
127         return nodes;
128     }
129
130     private void increaseReconcileCount(BigInteger nodeId, Boolean reconcileState) {
131         InstanceIdentifier<ReconcileCounter> instanceIdentifier = InstanceIdentifier
132                 .builder(ReconciliationCounter.class).child(ReconcileCounter.class,
133                         new ReconcileCounterKey(nodeId)).build();
134         ReadWriteTransaction tx = broker.newReadWriteTransaction();
135         Optional<ReconcileCounter> optional = readReconcileCounterFromDS(tx, instanceIdentifier, nodeId);
136         ReconcileCounterBuilder counterBuilder = new ReconcileCounterBuilder()
137                 .withKey(new ReconcileCounterKey(nodeId)).setNodeId(nodeId)
138                 .setLastRequestTime(LocalDateTime.now().toString());
139         if (reconcileState) {
140             counterBuilder.setSuccessCount(startCount);
141             if (optional.isPresent()) {
142                 ReconcileCounter counter = optional.get();
143                 Long successCount = counter.getSuccessCount();
144                 counterBuilder.setSuccessCount(++successCount);
145                 LOG.debug("Reconcile Success count {} for the node: {} ", successCount, nodeId);
146             }
147         } else {
148             counterBuilder.setFailureCount(startCount);
149             if (optional.isPresent()) {
150                 ReconcileCounter counter = optional.get();
151                 Long failureCount = counter.getFailureCount();
152                 counterBuilder.setFailureCount(++failureCount);
153                 LOG.debug("Reconcile Failure count {} for the node: {} ", failureCount, nodeId);
154             }
155         }
156         try {
157             tx.merge(LogicalDatastoreType.OPERATIONAL, instanceIdentifier, counterBuilder.build(), true);
158             tx.submit().get();
159         } catch (InterruptedException | ExecutionException e) {
160             LOG.error("Exception while submitting counter {}", nodeId, e);
161         }
162     }
163
164     private Optional<ReconcileCounter> readReconcileCounterFromDS(ReadWriteTransaction tx,
165                 InstanceIdentifier<ReconcileCounter> instanceIdentifier, BigInteger nodeId) {
166         try {
167             return tx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier).get();
168         } catch (InterruptedException | ExecutionException e) {
169             LOG.error("Exception while reading counter for node: {}", nodeId, e);
170         }
171         return Optional.absent();
172     }
173
174     private final class ReconciliationTask implements Runnable {
175         private final NodeKey nodeKey;
176         private final Long nodeId;
177
178         private ReconciliationTask(Long nodeId, NodeKey nodeKey) {
179             this.nodeId = nodeId;
180             this.nodeKey = nodeKey;
181         }
182
183         @Override
184         public void run() {
185             BigInteger node = new BigInteger(String.valueOf(nodeId));
186             ReconcileNodeInput reconInput = new ReconcileNodeInputBuilder()
187                     .setNodeId(node).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
188                             .child(Node.class, nodeKey).build())).build();
189             Future<RpcResult<ReconcileNodeOutput>> reconOutput = frmReconciliationService
190                     .reconcileNode(reconInput);
191             try {
192                 RpcResult<ReconcileNodeOutput> rpcResult = reconOutput.get();
193                 if (rpcResult.isSuccessful()) {
194                     increaseReconcileCount(node, true);
195                     LOG.info("Reconciliation successfully completed for node {}", nodeId);
196                 } else {
197                     increaseReconcileCount(node, false);
198                     LOG.error("Reconciliation failed for node {} with error {}", nodeId, rpcResult.getErrors());
199                 }
200             } catch (ExecutionException | InterruptedException e) {
201                 increaseReconcileCount(node, false);
202                 LOG.error("Error occurred while invoking reconcile RPC for node {}", nodeId, e);
203             } finally {
204                 alarmAgent.clearNodeReconciliationAlarm(nodeId);
205             }
206         }
207     }
208 }
209