2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.services;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.Future;
19 import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
20 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchFlowAdapters;
21 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchGroupAdapters;
22 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchMeterAdapters;
23 import org.opendaylight.openflowplugin.impl.util.FlatBatchUtil;
24 import org.opendaylight.openflowplugin.impl.util.PathUtil;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.SalFlowsBatchService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatchInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatchOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatchInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatchOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.SalGroupsBatchService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatchInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatchOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.SalMetersBatchService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutput;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 * default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}
60 public class SalFlatBatchServiceImpl implements SalFlatBatchService {
61 private static final Logger LOG = LoggerFactory.getLogger(SalFlatBatchServiceImpl.class);
63 private final SalFlowsBatchService salFlowService;
64 private final SalGroupsBatchService salGroupService;
65 private final SalMetersBatchService salMeterService;
67 public SalFlatBatchServiceImpl(final SalFlowsBatchService salFlowBatchService,
68 final SalGroupsBatchService salGroupsBatchService,
69 final SalMetersBatchService salMetersBatchService) {
70 this.salFlowService = Preconditions.checkNotNull(salFlowBatchService, "delegate flow service must not be null");
71 this.salGroupService = Preconditions.checkNotNull(salGroupsBatchService, "delegate group service must not be null");
72 this.salMeterService = Preconditions.checkNotNull(salMetersBatchService, "delegate meter service must not be null");
76 public Future<RpcResult<ProcessFlatBatchOutput>> processFlatBatch(final ProcessFlatBatchInput input) {
77 LOG.trace("processing flat batch @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatch().size());
80 final List<BatchPlanStep> batchPlan = FlatBatchUtil.assembleBatchPlan(input.getBatch());
81 // add barriers where needed
82 FlatBatchUtil.markBarriersWhereNeeded(batchPlan);
83 // prepare chain elements
84 final List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>> batchChainElements =
85 prepareBatchChain(batchPlan, input.getNode(), input.isExitOnFirstError());
86 // execute plan with barriers and collect outputs chain correspondingly, collect results
87 return executeBatchPlan(batchChainElements);
91 Future<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>> batchChainElements) {
92 ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainSummaryResult =
93 RpcResultBuilder.success(new ProcessFlatBatchOutputBuilder().build()).buildFuture();
95 for (AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>> chainElement : batchChainElements) {
96 chainSummaryResult = Futures.transform(chainSummaryResult, chainElement);
99 return chainSummaryResult;
104 List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>> prepareBatchChain(
105 final List<BatchPlanStep> batchPlan,
107 final boolean exitOnFirstError) {
109 // create batch API calls based on plan steps
110 final List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>> chainJobs = new ArrayList<>();
112 for (final BatchPlanStep planStep : batchPlan) {
113 final int currentOffset = stepOffset;
114 chainJobs.add(new AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>() {
116 public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> apply(final RpcResult<ProcessFlatBatchOutput> chainInput) throws Exception {
117 if (exitOnFirstError && !chainInput.isSuccessful()) {
118 LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
119 return Futures.immediateFuture(chainInput);
122 LOG.trace("batch progressing on step type {}", planStep.getStepType());
123 LOG.trace("batch progressing previous step result: {}", chainInput.isSuccessful());
125 final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
126 switch (planStep.getStepType()) {
128 final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(
130 final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
131 chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultAddFlowFuture, currentOffset);
134 final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(
136 final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
137 chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultRemoveFlowFuture, currentOffset);
140 final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(
142 final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
143 chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultUpdateFlowFuture, currentOffset);
146 final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(
148 final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
149 chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultAddGroupFuture, currentOffset);
152 final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(
154 final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
155 chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultRemoveGroupFuture, currentOffset);
158 final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(
160 final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
161 chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultUpdateGroupFuture, currentOffset);
164 final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(
166 final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
167 chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultAddMeterFuture, currentOffset);
170 final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(
172 final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
173 chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultRemoveMeterFuture, currentOffset);
176 final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(
178 final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
179 chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultUpdateMeterFuture, currentOffset);
182 LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
183 chainOutput = Futures.immediateFuture(chainInput);
188 stepOffset += planStep.getTaskBag().size();