Decompose RPC implementation classes
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / ProcessFlatBatchImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.services.sal;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.List;
18 import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
19 import org.opendaylight.openflowplugin.impl.services.batch.BatchStepJob;
20 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchFlowAdapters;
21 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchGroupAdapters;
22 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchMeterAdapters;
23 import org.opendaylight.openflowplugin.impl.util.FlatBatchUtil;
24 import org.opendaylight.openflowplugin.impl.util.PathUtil;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatch;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatch;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatch;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatch;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatch;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatch;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatch;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatch;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatch;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatch;
38 import org.opendaylight.yangtools.yang.common.RpcResult;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 public final class ProcessFlatBatchImpl implements ProcessFlatBatch {
43     private static final Logger LOG = LoggerFactory.getLogger(ProcessFlatBatchImpl.class);
44
45     private final AddFlowsBatch addFlowsBatch;
46     private final RemoveFlowsBatch removeFlowsBatch;
47     private final UpdateFlowsBatch updateFlowsBatch;
48     private final AddGroupsBatch addGroupsBatch;
49     private final RemoveGroupsBatch removeGroupsBatch;
50     private final UpdateGroupsBatch updateGroupsBatch;
51     private final AddMetersBatch addMetersBatch;
52     private final RemoveMetersBatch removeMetersBatch;
53     private final UpdateMetersBatch updateMetersBatch;
54
55     public ProcessFlatBatchImpl(final AddFlowsBatch addFlowsBatch, final RemoveFlowsBatch removeFlowsBatch,
56             final UpdateFlowsBatch updateFlowsBatch, final AddGroupsBatch addGroupsBatch,
57             final RemoveGroupsBatch removeGroupsBatch, final UpdateGroupsBatch updateGroupsBatch,
58             final AddMetersBatch addMetersBatch, final RemoveMetersBatch removeMetersBatch,
59             final UpdateMetersBatch updateMetersBatch) {
60         this.addFlowsBatch = requireNonNull(addFlowsBatch);
61         this.removeFlowsBatch = requireNonNull(removeFlowsBatch);
62         this.updateFlowsBatch = requireNonNull(updateFlowsBatch);
63         this.addGroupsBatch = requireNonNull(addGroupsBatch);
64         this.removeGroupsBatch = requireNonNull(removeGroupsBatch);
65         this.updateGroupsBatch = requireNonNull(updateGroupsBatch);
66         this.addMetersBatch = requireNonNull(addMetersBatch);
67         this.removeMetersBatch = requireNonNull(removeMetersBatch);
68         this.updateMetersBatch = requireNonNull(updateMetersBatch);
69     }
70
71     @Override
72     public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> invoke(final ProcessFlatBatchInput input) {
73         final var batch = input.nonnullBatch().values();
74         if (LOG.isTraceEnabled()) {
75             LOG.trace("processing flat batch @ {} : {}", PathUtil.extractNodeId(input.getNode()).getValue(),
76                 batch.size());
77         }
78
79         // create plan
80         final var batchPlan = FlatBatchUtil.assembleBatchPlan(batch);
81         // add barriers where needed
82         FlatBatchUtil.markBarriersWhereNeeded(batchPlan);
83         // prepare chain elements
84         final var batchChainElements = prepareBatchChain(batchPlan, input.getNode(), input.getExitOnFirstError());
85         // execute plan with barriers and collect outputs chain correspondingly, collect results
86         return executeBatchPlan(batchChainElements);
87     }
88
89     @VisibleForTesting
90     static ListenableFuture<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(
91             final List<BatchStepJob> batchJobsChain) {
92         BatchStepJob batchJob;
93         final var firedJobs = new ArrayList<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>>();
94         var chainSummaryResult = FlatBatchUtil.createEmptyRpcBatchResultFuture(true);
95
96         for (int i = 0; i < batchJobsChain.size(); i++)  {
97             batchJob = batchJobsChain.get(i);
98             // wire actual job with chain
99             firedJobs.add(Futures.transformAsync(chainSummaryResult, batchJob.getStepFunction(),
100                     MoreExecutors.directExecutor()));
101             // if barrier after actual job is needed or it is the last job -> merge fired job results with chain result
102             if (batchJob.getPlanStep().isBarrierAfter() || i == batchJobsChain.size() - 1) {
103                 firedJobs.add(0, chainSummaryResult);
104                 chainSummaryResult = FlatBatchUtil.mergeJobsResultsFutures(firedJobs);
105                 firedJobs.clear();
106             }
107         }
108         return chainSummaryResult;
109     }
110
111     @VisibleForTesting
112     List<BatchStepJob> prepareBatchChain(final List<BatchPlanStep> batchPlan, final NodeRef node,
113             final boolean exitOnFirstError) {
114         // create batch API calls based on plan steps
115         final var chainJobs = new ArrayList<BatchStepJob>();
116         int stepOffset = 0;
117         for (final BatchPlanStep planStep : batchPlan) {
118             final int currentOffset = stepOffset;
119             chainJobs.add(new BatchStepJob(planStep, chainInput -> {
120                 if (exitOnFirstError && !chainInput.isSuccessful()) {
121                     LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
122                     return FlatBatchUtil.createEmptyRpcBatchResultFuture(false);
123                 }
124                 LOG.trace("batch progressing on step type {}, previous steps result: {}", planStep.getStepType(),
125                         chainInput.isSuccessful());
126                 return getChainOutput(node, planStep, currentOffset);
127             }));
128             stepOffset += planStep.getTaskBag().size();
129         }
130
131         return chainJobs;
132     }
133
134     private ListenableFuture<RpcResult<ProcessFlatBatchOutput>> getChainOutput(final NodeRef node,
135             final BatchPlanStep planStep, final int currentOffset) {
136         return switch (planStep.getStepType()) {
137             case FLOW_ADD -> FlatBatchFlowAdapters.convertFlowBatchFutureForChain(
138                 addFlowsBatch.invoke(FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node)), currentOffset);
139             case FLOW_REMOVE -> FlatBatchFlowAdapters.convertFlowBatchFutureForChain(
140                 removeFlowsBatch.invoke(FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node)), currentOffset);
141             case FLOW_UPDATE -> FlatBatchFlowAdapters.convertFlowBatchFutureForChain(
142                 updateFlowsBatch.invoke(FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node)), currentOffset);
143             case GROUP_ADD -> FlatBatchGroupAdapters.convertGroupBatchFutureForChain(
144                 addGroupsBatch.invoke(FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node)), currentOffset);
145             case GROUP_REMOVE -> FlatBatchGroupAdapters.convertGroupBatchFutureForChain(
146                 removeGroupsBatch.invoke(FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node)),
147                 currentOffset);
148             case GROUP_UPDATE -> FlatBatchGroupAdapters.convertGroupBatchFutureForChain(
149                 updateGroupsBatch.invoke(FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node)),
150                 currentOffset);
151             case METER_ADD -> FlatBatchMeterAdapters.convertMeterBatchFutureForChain(
152                 addMetersBatch.invoke(FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node)), currentOffset);
153             case METER_REMOVE -> FlatBatchMeterAdapters.convertMeterBatchFutureForChain(
154                 removeMetersBatch.invoke(FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node)),
155                 currentOffset);
156             case METER_UPDATE -> FlatBatchMeterAdapters.convertMeterBatchFutureForChain(
157                 updateMetersBatch.invoke(FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node)),
158                 currentOffset);
159         };
160     }
161 }