Merge "OPNFLWPLUG-929 : Remove deprecated guava library"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalFlatBatchServiceImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.services.sal;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.Future;
19 import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
20 import org.opendaylight.openflowplugin.impl.services.batch.BatchStepJob;
21 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchFlowAdapters;
22 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchGroupAdapters;
23 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchMeterAdapters;
24 import org.opendaylight.openflowplugin.impl.util.FlatBatchUtil;
25 import org.opendaylight.openflowplugin.impl.util.PathUtil;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.SalFlowsBatchService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatchInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatchOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatchInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatchOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.SalGroupsBatchService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatchInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatchOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.SalMetersBatchService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutput;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}.
58  */
59 public class SalFlatBatchServiceImpl implements SalFlatBatchService {
60     private static final Logger LOG = LoggerFactory.getLogger(SalFlatBatchServiceImpl.class);
61
62     private final SalFlowsBatchService salFlowService;
63     private final SalGroupsBatchService salGroupService;
64     private final SalMetersBatchService salMeterService;
65
66     public SalFlatBatchServiceImpl(final SalFlowsBatchService salFlowBatchService,
67                                    final SalGroupsBatchService salGroupsBatchService,
68                                    final SalMetersBatchService salMetersBatchService) {
69         this.salFlowService = Preconditions.checkNotNull(salFlowBatchService, "delegate flow service must not be null");
70         this.salGroupService =
71                 Preconditions.checkNotNull(salGroupsBatchService, "delegate group service must not be null");
72         this.salMeterService =
73                 Preconditions.checkNotNull(salMetersBatchService, "delegate meter service must not be null");
74     }
75
76     @Override
77     public Future<RpcResult<ProcessFlatBatchOutput>> processFlatBatch(final ProcessFlatBatchInput input) {
78         LOG.trace("processing flat batch @ {} : {}",
79                   PathUtil.extractNodeId(input.getNode()).getValue(),
80                   input.getBatch().size());
81         // create plan
82         final List<BatchPlanStep> batchPlan = FlatBatchUtil.assembleBatchPlan(input.getBatch());
83         // add barriers where needed
84         FlatBatchUtil.markBarriersWhereNeeded(batchPlan);
85         // prepare chain elements
86         final List<BatchStepJob> batchChainElements =
87                 prepareBatchChain(batchPlan, input.getNode(), input.isExitOnFirstError());
88         // execute plan with barriers and collect outputs chain correspondingly, collect results
89         return executeBatchPlan(batchChainElements);
90     }
91
92     @VisibleForTesting
93     Future<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<BatchStepJob> batchJobsChain) {
94         BatchStepJob batchJob;
95         final List<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>> firedJobs = new ArrayList<>();
96         ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainSummaryResult =
97                 FlatBatchUtil.createEmptyRpcBatchResultFuture(true);
98
99         for (int i = 0; i < batchJobsChain.size(); i++)  {
100             batchJob = batchJobsChain.get(i);
101             // wire actual job with chain
102             firedJobs.add(Futures.transformAsync(chainSummaryResult, batchJob.getStepFunction(),
103                     MoreExecutors.directExecutor()));
104             // if barrier after actual job is needed or it is the last job -> merge fired job results with chain result
105             if ((batchJob.getPlanStep().isBarrierAfter()) || (i == batchJobsChain.size() - 1)) {
106                 firedJobs.add(0, chainSummaryResult);
107                 chainSummaryResult = FlatBatchUtil.mergeJobsResultsFutures(firedJobs);
108                 firedJobs.clear();
109             }
110         }
111         return chainSummaryResult;
112     }
113
114     @VisibleForTesting
115     List<BatchStepJob> prepareBatchChain(final List<BatchPlanStep> batchPlan,
116                                          final NodeRef node,
117                                          final boolean exitOnFirstError) {
118         // create batch API calls based on plan steps
119         final List<BatchStepJob> chainJobs = new ArrayList<>();
120         int stepOffset = 0;
121         for (final BatchPlanStep planStep : batchPlan) {
122             final int currentOffset = stepOffset;
123             chainJobs.add(new BatchStepJob(planStep, chainInput -> {
124                 if (exitOnFirstError && !chainInput.isSuccessful()) {
125                     LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
126                     return FlatBatchUtil.createEmptyRpcBatchResultFuture(false);
127                 }
128                 LOG.trace("batch progressing on step type {}, previous steps result: {}", planStep.getStepType(),
129                         chainInput.isSuccessful());
130                 return getChainOutput(node, planStep, currentOffset);
131             }));
132             stepOffset += planStep.getTaskBag().size();
133         }
134
135         return chainJobs;
136     }
137
138     private ListenableFuture<RpcResult<ProcessFlatBatchOutput>> getChainOutput(final NodeRef node,
139                                                                                final BatchPlanStep planStep,
140                                                                                final int currentOffset) {
141         final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
142
143         switch (planStep.getStepType()) {
144             case FLOW_ADD:
145                 final AddFlowsBatchInput addFlowsBatchInput =
146                         FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
147                 final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture =
148                         salFlowService.addFlowsBatch(addFlowsBatchInput);
149                 chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
150                 break;
151             case FLOW_REMOVE:
152                 final RemoveFlowsBatchInput removeFlowsBatchInput =
153                         FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
154                 final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture =
155                         salFlowService.removeFlowsBatch(removeFlowsBatchInput);
156                 chainOutput =
157                         FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
158                 break;
159             case FLOW_UPDATE:
160                 final UpdateFlowsBatchInput updateFlowsBatchInput =
161                         FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
162                 final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture =
163                         salFlowService.updateFlowsBatch(updateFlowsBatchInput);
164                 chainOutput =
165                         FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
166                 break;
167             case GROUP_ADD:
168                 final AddGroupsBatchInput addGroupsBatchInput =
169                         FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
170                 final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture =
171                         salGroupService.addGroupsBatch(addGroupsBatchInput);
172                 chainOutput =
173                         FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
174                 break;
175             case GROUP_REMOVE:
176                 final RemoveGroupsBatchInput removeGroupsBatchInput =
177                         FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
178                 final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture =
179                         salGroupService.removeGroupsBatch(removeGroupsBatchInput);
180                 chainOutput =
181                         FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
182                 break;
183             case GROUP_UPDATE:
184                 final UpdateGroupsBatchInput updateGroupsBatchInput =
185                         FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
186                 final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture =
187                         salGroupService.updateGroupsBatch(updateGroupsBatchInput);
188                 chainOutput =
189                         FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
190                 break;
191             case METER_ADD:
192                 final AddMetersBatchInput addMetersBatchInput =
193                         FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
194                 final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture =
195                         salMeterService.addMetersBatch(addMetersBatchInput);
196                 chainOutput =
197                         FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
198                 break;
199             case METER_REMOVE:
200                 final RemoveMetersBatchInput removeMetersBatchInput =
201                         FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
202                 final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture =
203                         salMeterService.removeMetersBatch(removeMetersBatchInput);
204                 chainOutput =
205                         FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
206                 break;
207             case METER_UPDATE:
208                 final UpdateMetersBatchInput updateMetersBatchInput =
209                         FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
210                 final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture =
211                         salMeterService.updateMetersBatch(updateMetersBatchInput);
212                 chainOutput =
213                         FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
214                 break;
215             default:
216                 LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
217                 chainOutput = FlatBatchUtil.createEmptyRpcBatchResultFuture(true);
218         }
219
220         return chainOutput;
221     }
222 }