Bump upstreams
[openflowplugin.git] / applications / southbound-cli / src / main / java / org / opendaylight / openflowplugin / applications / southboundcli / ReconciliationServiceImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.southboundcli;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.COMPLETED;
12 import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.FAILED;
13 import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.STARTED;
14
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.text.SimpleDateFormat;
19 import java.time.LocalDateTime;
20 import java.util.Date;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Optional;
24 import java.util.Set;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.stream.Collectors;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
34 import org.opendaylight.openflowplugin.api.openflow.ReconciliationState;
35 import org.opendaylight.openflowplugin.applications.southboundcli.alarm.AlarmAgent;
36 import org.opendaylight.openflowplugin.applications.southboundcli.util.OFNode;
37 import org.opendaylight.openflowplugin.applications.southboundcli.util.ShellUtil;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.FrmReconciliationService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationCounter;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationService;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounter;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterKey;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.opendaylight.yangtools.yang.common.ErrorTag;
58 import org.opendaylight.yangtools.yang.common.ErrorType;
59 import org.opendaylight.yangtools.yang.common.RpcResult;
60 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
61 import org.opendaylight.yangtools.yang.common.Uint32;
62 import org.opendaylight.yangtools.yang.common.Uint64;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 public class ReconciliationServiceImpl implements ReconciliationService, AutoCloseable {
67     private static final Logger LOG = LoggerFactory.getLogger(ReconciliationServiceImpl.class);
68
69     private final DataBroker broker;
70     private final FrmReconciliationService frmReconciliationService;
71     private final AlarmAgent alarmAgent;
72     private final NodeListener nodeListener;
73     private final Map<String, ReconciliationState> reconciliationStates;
74
75     private ExecutorService executor = Executors.newWorkStealingPool(10);
76
77     public ReconciliationServiceImpl(final DataBroker broker, final FrmReconciliationService frmReconciliationService,
78                                      final AlarmAgent alarmAgent, final NodeListener nodeListener,
79                                      final FlowGroupCacheManager flowGroupCacheManager) {
80         this.broker = broker;
81         this.frmReconciliationService = frmReconciliationService;
82         this.alarmAgent = alarmAgent;
83         this.nodeListener = requireNonNull(nodeListener, "NodeListener cannot be null!");
84         reconciliationStates = flowGroupCacheManager.getReconciliationStates();
85     }
86
87     @Override
88     public void close() {
89         if (executor != null) {
90             executor.shutdownNow();
91             executor = null;
92         }
93     }
94
95     @Override
96     public ListenableFuture<RpcResult<ReconcileOutput>> reconcile(final ReconcileInput input) {
97         boolean reconcileAllNodes = input.getReconcileAllNodes();
98         Set<Uint64> inputNodes = input.getNodes();
99         if (inputNodes == null) {
100             inputNodes = Set.of();
101         }
102         if (reconcileAllNodes && inputNodes.size() > 0) {
103             return buildErrorResponse("Error executing command reconcile. "
104                     + "If 'all' option is enabled, no Node must be specified as input parameter.");
105         }
106         if (!reconcileAllNodes && inputNodes.size() == 0) {
107             return buildErrorResponse("Error executing command reconcile. No Node information was specified.");
108         }
109         SettableFuture<RpcResult<ReconcileOutput>> result = SettableFuture.create();
110         List<Long> nodeList = getAllNodes();
111         List<Long> nodesToReconcile = reconcileAllNodes ? nodeList :
112                 inputNodes.stream().distinct().map(Uint64::longValue).collect(Collectors.toList());
113         if (nodesToReconcile.size() > 0) {
114             List<Long> unresolvedNodes =
115                     nodesToReconcile.stream().filter(node -> !nodeList.contains(node)).collect(Collectors.toList());
116             if (!unresolvedNodes.isEmpty()) {
117                 return buildErrorResponse("Error executing command reconcile. "
118                         + "Node(s) not found: " + String.join(", ", unresolvedNodes.toString()));
119             }
120             ImmutableSet.Builder<Uint64> inprogressNodes = ImmutableSet.builder();
121             nodesToReconcile.parallelStream().forEach(nodeId -> {
122                 ReconciliationState state = getReconciliationState(nodeId);
123                 if (state != null && state.getState().equals(STARTED)) {
124                     inprogressNodes.add(Uint64.valueOf(nodeId));
125                 } else {
126                     alarmAgent.raiseNodeReconciliationAlarm(nodeId);
127                     LOG.info("Executing reconciliation for node {} with state ", nodeId);
128                     NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId));
129                     ReconciliationTask reconcileTask = new ReconciliationTask(Uint64.valueOf(nodeId), nodeKey);
130                     executor.execute(reconcileTask);
131                 }
132             });
133             ReconcileOutput reconcilingInProgress = new ReconcileOutputBuilder()
134                     .setInprogressNodes(inprogressNodes.build())
135                     .build();
136             result.set(RpcResultBuilder.success(reconcilingInProgress).build());
137             return result;
138         } else {
139             return buildErrorResponse("Error executing command reconcile. "
140                     + "No node information is found for reconciliation");
141         }
142     }
143
144     private ReconciliationState getReconciliationState(final Long nodeId) {
145         return reconciliationStates.get(nodeId.toString());
146     }
147
148     private static ListenableFuture<RpcResult<ReconcileOutput>> buildErrorResponse(final String msg) {
149         LOG.error("Error {}", msg);
150         return RpcResultBuilder.<ReconcileOutput>failed()
151                 .withError(ErrorType.PROTOCOL, new ErrorTag("reconcile"), msg)
152                 .buildFuture();
153     }
154
155     private List<Long> getAllNodes() {
156         List<OFNode> nodeList = ShellUtil.getAllNodes(nodeListener);
157         List<Long> nodes = nodeList.stream().distinct().map(OFNode::getNodeId).collect(Collectors.toList());
158         return nodes;
159     }
160
161     private final class ReconciliationTask implements Runnable {
162         private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
163         private final NodeKey nodeKey;
164         private final Uint64 nodeId;
165
166         private ReconciliationTask(final Uint64 nodeId, final NodeKey nodeKey) {
167             this.nodeId = nodeId;
168             this.nodeKey = nodeKey;
169         }
170
171         @Override
172         public void run() {
173             ReconcileNodeInput reconInput = new ReconcileNodeInputBuilder()
174                     .setNodeId(nodeId).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
175                             .child(Node.class, nodeKey).build())).build();
176             updateReconciliationState(STARTED);
177             Future<RpcResult<ReconcileNodeOutput>> reconOutput = frmReconciliationService
178                     .reconcileNode(reconInput);
179             try {
180                 RpcResult<ReconcileNodeOutput> rpcResult = reconOutput.get();
181                 if (rpcResult.isSuccessful()) {
182                     increaseReconcileCount(true);
183                     updateReconciliationState(COMPLETED);
184                     LOG.info("Reconciliation successfully completed for node {}", nodeId);
185                 } else {
186                     increaseReconcileCount(false);
187                     updateReconciliationState(FAILED);
188                     LOG.error("Reconciliation failed for node {} with error {}", nodeId, rpcResult.getErrors());
189                 }
190             } catch (ExecutionException | InterruptedException e) {
191                 increaseReconcileCount(false);
192                 updateReconciliationState(FAILED);
193                 LOG.error("Error occurred while invoking reconcile RPC for node {}", nodeId, e);
194             } finally {
195                 alarmAgent.clearNodeReconciliationAlarm(nodeId.longValue());
196             }
197         }
198
199         private void increaseReconcileCount(final boolean isSuccess) {
200             // FIXME: do not use SimpleDateFormat
201             final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
202             InstanceIdentifier<ReconcileCounter> instanceIdentifier = InstanceIdentifier
203                     .builder(ReconciliationCounter.class).child(ReconcileCounter.class,
204                             new ReconcileCounterKey(nodeId)).build();
205             ReadWriteTransaction tx = broker.newReadWriteTransaction();
206             Optional<ReconcileCounter> count = getReconciliationCount(tx, instanceIdentifier);
207             ReconcileCounterBuilder counterBuilder = new ReconcileCounterBuilder()
208                     .withKey(new ReconcileCounterKey(nodeId))
209                     .setLastRequestTime(new DateAndTime(simpleDateFormat.format(new Date())));
210
211             if (isSuccess) {
212                 if (count.isPresent()) {
213                     long successCount = count.orElseThrow().getSuccessCount().toJava();
214                     counterBuilder.setSuccessCount(Uint32.valueOf(++successCount));
215                     LOG.debug("Reconcile success count {} for the node: {} ", successCount, nodeId);
216                 } else {
217                     counterBuilder.setSuccessCount(Uint32.ONE);
218                 }
219             } else if (count.isPresent()) {
220                 long failureCount = count.orElseThrow().getFailureCount().toJava();
221                 counterBuilder.setFailureCount(Uint32.valueOf(++failureCount));
222                 LOG.debug("Reconcile failure count {} for the node: {} ", failureCount, nodeId);
223             } else {
224                 counterBuilder.setFailureCount(Uint32.ONE);
225             }
226             try {
227                 tx.mergeParentStructureMerge(LogicalDatastoreType.OPERATIONAL, instanceIdentifier,
228                         counterBuilder.build());
229                 tx.commit().get();
230             } catch (InterruptedException | ExecutionException e) {
231                 LOG.error("Exception while submitting counter for {}", nodeId, e);
232             }
233         }
234
235         private Optional<ReconcileCounter> getReconciliationCount(final ReadWriteTransaction tx,
236                 final InstanceIdentifier<ReconcileCounter> instanceIdentifier) {
237             try {
238                 return tx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier).get();
239             } catch (InterruptedException | ExecutionException e) {
240                 LOG.error("Exception while reading counter for node: {}", nodeId, e);
241             }
242             return Optional.empty();
243         }
244
245         private void updateReconciliationState(final ReconciliationState.ReconciliationStatus status) {
246             ReconciliationState state = new ReconciliationState(status, LocalDateTime.now());
247             reconciliationStates.put(nodeId.toString(),state);
248         }
249     }
250 }
251