Remove unused routedRpcRegistration
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalFlatBatchServiceImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.services.sal;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.List;
18 import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
19 import org.opendaylight.openflowplugin.impl.services.batch.BatchStepJob;
20 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchFlowAdapters;
21 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchGroupAdapters;
22 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchMeterAdapters;
23 import org.opendaylight.openflowplugin.impl.util.FlatBatchUtil;
24 import org.opendaylight.openflowplugin.impl.util.PathUtil;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.SalFlowsBatchService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatchInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatchOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatchInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatchOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.SalGroupsBatchService;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatchInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatchOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.SalMetersBatchService;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutput;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * Default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}.
57  */
58 public class SalFlatBatchServiceImpl implements SalFlatBatchService {
59     private static final Logger LOG = LoggerFactory.getLogger(SalFlatBatchServiceImpl.class);
60
61     private final SalFlowsBatchService salFlowService;
62     private final SalGroupsBatchService salGroupService;
63     private final SalMetersBatchService salMeterService;
64
65     public SalFlatBatchServiceImpl(final SalFlowsBatchService salFlowBatchService,
66                                    final SalGroupsBatchService salGroupsBatchService,
67                                    final SalMetersBatchService salMetersBatchService) {
68         salFlowService = requireNonNull(salFlowBatchService, "delegate flow service must not be null");
69         salGroupService = requireNonNull(salGroupsBatchService, "delegate group service must not be null");
70         salMeterService = requireNonNull(salMetersBatchService, "delegate meter service must not be null");
71     }
72
73     @Override
74     public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> processFlatBatch(final ProcessFlatBatchInput input) {
75         LOG.trace("processing flat batch @ {} : {}",
76                   PathUtil.extractNodeId(input.getNode()).getValue(),
77                   input.getBatch().size());
78         // create plan
79         final List<BatchPlanStep> batchPlan = FlatBatchUtil.assembleBatchPlan(input.nonnullBatch().values());
80         // add barriers where needed
81         FlatBatchUtil.markBarriersWhereNeeded(batchPlan);
82         // prepare chain elements
83         final List<BatchStepJob> batchChainElements =
84                 prepareBatchChain(batchPlan, input.getNode(), input.getExitOnFirstError());
85         // execute plan with barriers and collect outputs chain correspondingly, collect results
86         return executeBatchPlan(batchChainElements);
87     }
88
89     @VisibleForTesting
90     ListenableFuture<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<BatchStepJob> batchJobsChain) {
91         BatchStepJob batchJob;
92         final List<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>> firedJobs = new ArrayList<>();
93         ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainSummaryResult =
94                 FlatBatchUtil.createEmptyRpcBatchResultFuture(true);
95
96         for (int i = 0; i < batchJobsChain.size(); i++)  {
97             batchJob = batchJobsChain.get(i);
98             // wire actual job with chain
99             firedJobs.add(Futures.transformAsync(chainSummaryResult, batchJob.getStepFunction(),
100                     MoreExecutors.directExecutor()));
101             // if barrier after actual job is needed or it is the last job -> merge fired job results with chain result
102             if (batchJob.getPlanStep().isBarrierAfter() || i == batchJobsChain.size() - 1) {
103                 firedJobs.add(0, chainSummaryResult);
104                 chainSummaryResult = FlatBatchUtil.mergeJobsResultsFutures(firedJobs);
105                 firedJobs.clear();
106             }
107         }
108         return chainSummaryResult;
109     }
110
111     @VisibleForTesting
112     List<BatchStepJob> prepareBatchChain(final List<BatchPlanStep> batchPlan,
113                                          final NodeRef node,
114                                          final boolean exitOnFirstError) {
115         // create batch API calls based on plan steps
116         final List<BatchStepJob> chainJobs = new ArrayList<>();
117         int stepOffset = 0;
118         for (final BatchPlanStep planStep : batchPlan) {
119             final int currentOffset = stepOffset;
120             chainJobs.add(new BatchStepJob(planStep, chainInput -> {
121                 if (exitOnFirstError && !chainInput.isSuccessful()) {
122                     LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
123                     return FlatBatchUtil.createEmptyRpcBatchResultFuture(false);
124                 }
125                 LOG.trace("batch progressing on step type {}, previous steps result: {}", planStep.getStepType(),
126                         chainInput.isSuccessful());
127                 return getChainOutput(node, planStep, currentOffset);
128             }));
129             stepOffset += planStep.getTaskBag().size();
130         }
131
132         return chainJobs;
133     }
134
135     private ListenableFuture<RpcResult<ProcessFlatBatchOutput>> getChainOutput(final NodeRef node,
136                                                                                final BatchPlanStep planStep,
137                                                                                final int currentOffset) {
138         return switch (planStep.getStepType()) {
139             case FLOW_ADD -> {
140                 final AddFlowsBatchInput addFlowsBatchInput =
141                     FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
142                 final ListenableFuture<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture =
143                     salFlowService.addFlowsBatch(addFlowsBatchInput);
144                 yield FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
145             }
146             case FLOW_REMOVE -> {
147                 final RemoveFlowsBatchInput removeFlowsBatchInput =
148                     FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
149                 final ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture =
150                     salFlowService.removeFlowsBatch(removeFlowsBatchInput);
151                 yield FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
152             }
153             case FLOW_UPDATE -> {
154                 final UpdateFlowsBatchInput updateFlowsBatchInput =
155                     FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
156                 final ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture =
157                     salFlowService.updateFlowsBatch(updateFlowsBatchInput);
158                 yield FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
159             }
160             case GROUP_ADD -> {
161                 final AddGroupsBatchInput addGroupsBatchInput =
162                     FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
163                 final ListenableFuture<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture =
164                     salGroupService.addGroupsBatch(addGroupsBatchInput);
165                 yield FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
166             }
167             case GROUP_REMOVE -> {
168                 final RemoveGroupsBatchInput removeGroupsBatchInput =
169                     FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
170                 final ListenableFuture<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture =
171                     salGroupService.removeGroupsBatch(removeGroupsBatchInput);
172                 yield FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
173             }
174             case GROUP_UPDATE -> {
175                 final UpdateGroupsBatchInput updateGroupsBatchInput =
176                     FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
177                 final ListenableFuture<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture =
178                     salGroupService.updateGroupsBatch(updateGroupsBatchInput);
179                 yield FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
180             }
181             case METER_ADD -> {
182                 final AddMetersBatchInput addMetersBatchInput =
183                     FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
184                 final ListenableFuture<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture =
185                     salMeterService.addMetersBatch(addMetersBatchInput);
186                 yield FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
187             }
188             case METER_REMOVE -> {
189                 final RemoveMetersBatchInput removeMetersBatchInput =
190                     FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
191                 final ListenableFuture<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture =
192                     salMeterService.removeMetersBatch(removeMetersBatchInput);
193                 yield FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
194             }
195             case METER_UPDATE -> {
196                 final UpdateMetersBatchInput updateMetersBatchInput =
197                     FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
198                 final ListenableFuture<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture =
199                     salMeterService.updateMetersBatch(updateMetersBatchInput);
200                 yield FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
201             }
202             default -> {
203                 LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
204                 yield FlatBatchUtil.createEmptyRpcBatchResultFuture(true);
205             }
206         };
207     }
208 }