2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.services.sal;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.List;
18 import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
19 import org.opendaylight.openflowplugin.impl.services.batch.BatchStepJob;
20 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchFlowAdapters;
21 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchGroupAdapters;
22 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchMeterAdapters;
23 import org.opendaylight.openflowplugin.impl.util.FlatBatchUtil;
24 import org.opendaylight.openflowplugin.impl.util.PathUtil;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.SalFlowsBatchService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatchInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatchOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatchInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatchOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.SalGroupsBatchService;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatchInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatchOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.SalMetersBatchService;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutput;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * Default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}.
58 public class SalFlatBatchServiceImpl implements SalFlatBatchService {
59 private static final Logger LOG = LoggerFactory.getLogger(SalFlatBatchServiceImpl.class);
61 private final SalFlowsBatchService salFlowService;
62 private final SalGroupsBatchService salGroupService;
63 private final SalMetersBatchService salMeterService;
65 public SalFlatBatchServiceImpl(final SalFlowsBatchService salFlowBatchService,
66 final SalGroupsBatchService salGroupsBatchService,
67 final SalMetersBatchService salMetersBatchService) {
68 this.salFlowService = Preconditions.checkNotNull(salFlowBatchService, "delegate flow service must not be null");
69 this.salGroupService =
70 Preconditions.checkNotNull(salGroupsBatchService, "delegate group service must not be null");
71 this.salMeterService =
72 Preconditions.checkNotNull(salMetersBatchService, "delegate meter service must not be null");
76 public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> processFlatBatch(final ProcessFlatBatchInput input) {
77 LOG.trace("processing flat batch @ {} : {}",
78 PathUtil.extractNodeId(input.getNode()).getValue(),
79 input.getBatch().size());
81 final List<BatchPlanStep> batchPlan = FlatBatchUtil.assembleBatchPlan(input.getBatch());
82 // add barriers where needed
83 FlatBatchUtil.markBarriersWhereNeeded(batchPlan);
84 // prepare chain elements
85 final List<BatchStepJob> batchChainElements =
86 prepareBatchChain(batchPlan, input.getNode(), input.isExitOnFirstError());
87 // execute plan with barriers and collect outputs chain correspondingly, collect results
88 return executeBatchPlan(batchChainElements);
92 ListenableFuture<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<BatchStepJob> batchJobsChain) {
93 BatchStepJob batchJob;
94 final List<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>> firedJobs = new ArrayList<>();
95 ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainSummaryResult =
96 FlatBatchUtil.createEmptyRpcBatchResultFuture(true);
98 for (int i = 0; i < batchJobsChain.size(); i++) {
99 batchJob = batchJobsChain.get(i);
100 // wire actual job with chain
101 firedJobs.add(Futures.transformAsync(chainSummaryResult, batchJob.getStepFunction(),
102 MoreExecutors.directExecutor()));
103 // if barrier after actual job is needed or it is the last job -> merge fired job results with chain result
104 if (batchJob.getPlanStep().isBarrierAfter() || i == batchJobsChain.size() - 1) {
105 firedJobs.add(0, chainSummaryResult);
106 chainSummaryResult = FlatBatchUtil.mergeJobsResultsFutures(firedJobs);
110 return chainSummaryResult;
114 List<BatchStepJob> prepareBatchChain(final List<BatchPlanStep> batchPlan,
116 final boolean exitOnFirstError) {
117 // create batch API calls based on plan steps
118 final List<BatchStepJob> chainJobs = new ArrayList<>();
120 for (final BatchPlanStep planStep : batchPlan) {
121 final int currentOffset = stepOffset;
122 chainJobs.add(new BatchStepJob(planStep, chainInput -> {
123 if (exitOnFirstError && !chainInput.isSuccessful()) {
124 LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
125 return FlatBatchUtil.createEmptyRpcBatchResultFuture(false);
127 LOG.trace("batch progressing on step type {}, previous steps result: {}", planStep.getStepType(),
128 chainInput.isSuccessful());
129 return getChainOutput(node, planStep, currentOffset);
131 stepOffset += planStep.getTaskBag().size();
137 private ListenableFuture<RpcResult<ProcessFlatBatchOutput>> getChainOutput(final NodeRef node,
138 final BatchPlanStep planStep,
139 final int currentOffset) {
140 final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
142 switch (planStep.getStepType()) {
144 final AddFlowsBatchInput addFlowsBatchInput =
145 FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
146 final ListenableFuture<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture =
147 salFlowService.addFlowsBatch(addFlowsBatchInput);
148 chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
151 final RemoveFlowsBatchInput removeFlowsBatchInput =
152 FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
153 final ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture =
154 salFlowService.removeFlowsBatch(removeFlowsBatchInput);
156 FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
159 final UpdateFlowsBatchInput updateFlowsBatchInput =
160 FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
161 final ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture =
162 salFlowService.updateFlowsBatch(updateFlowsBatchInput);
164 FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
167 final AddGroupsBatchInput addGroupsBatchInput =
168 FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
169 final ListenableFuture<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture =
170 salGroupService.addGroupsBatch(addGroupsBatchInput);
172 FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
175 final RemoveGroupsBatchInput removeGroupsBatchInput =
176 FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
177 final ListenableFuture<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture =
178 salGroupService.removeGroupsBatch(removeGroupsBatchInput);
180 FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
183 final UpdateGroupsBatchInput updateGroupsBatchInput =
184 FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
185 final ListenableFuture<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture =
186 salGroupService.updateGroupsBatch(updateGroupsBatchInput);
188 FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
191 final AddMetersBatchInput addMetersBatchInput =
192 FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
193 final ListenableFuture<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture =
194 salMeterService.addMetersBatch(addMetersBatchInput);
196 FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
199 final RemoveMetersBatchInput removeMetersBatchInput =
200 FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
201 final ListenableFuture<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture =
202 salMeterService.removeMetersBatch(removeMetersBatchInput);
204 FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
207 final UpdateMetersBatchInput updateMetersBatchInput =
208 FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
209 final ListenableFuture<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture =
210 salMeterService.updateMetersBatch(updateMetersBatchInput);
212 FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
215 LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
216 chainOutput = FlatBatchUtil.createEmptyRpcBatchResultFuture(true);