2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.services.sal;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.List;
18 import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
19 import org.opendaylight.openflowplugin.impl.services.batch.BatchStepJob;
20 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchFlowAdapters;
21 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchGroupAdapters;
22 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchMeterAdapters;
23 import org.opendaylight.openflowplugin.impl.util.FlatBatchUtil;
24 import org.opendaylight.openflowplugin.impl.util.PathUtil;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatch;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatch;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatch;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatch;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatch;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatch;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatch;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatch;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatch;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatch;
38 import org.opendaylight.yangtools.yang.common.RpcResult;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 public final class ProcessFlatBatchImpl implements ProcessFlatBatch {
43 private static final Logger LOG = LoggerFactory.getLogger(ProcessFlatBatchImpl.class);
45 private final AddFlowsBatch addFlowsBatch;
46 private final RemoveFlowsBatch removeFlowsBatch;
47 private final UpdateFlowsBatch updateFlowsBatch;
48 private final AddGroupsBatch addGroupsBatch;
49 private final RemoveGroupsBatch removeGroupsBatch;
50 private final UpdateGroupsBatch updateGroupsBatch;
51 private final AddMetersBatch addMetersBatch;
52 private final RemoveMetersBatch removeMetersBatch;
53 private final UpdateMetersBatch updateMetersBatch;
55 public ProcessFlatBatchImpl(final AddFlowsBatch addFlowsBatch, final RemoveFlowsBatch removeFlowsBatch,
56 final UpdateFlowsBatch updateFlowsBatch, final AddGroupsBatch addGroupsBatch,
57 final RemoveGroupsBatch removeGroupsBatch, final UpdateGroupsBatch updateGroupsBatch,
58 final AddMetersBatch addMetersBatch, final RemoveMetersBatch removeMetersBatch,
59 final UpdateMetersBatch updateMetersBatch) {
60 this.addFlowsBatch = requireNonNull(addFlowsBatch);
61 this.removeFlowsBatch = requireNonNull(removeFlowsBatch);
62 this.updateFlowsBatch = requireNonNull(updateFlowsBatch);
63 this.addGroupsBatch = requireNonNull(addGroupsBatch);
64 this.removeGroupsBatch = requireNonNull(removeGroupsBatch);
65 this.updateGroupsBatch = requireNonNull(updateGroupsBatch);
66 this.addMetersBatch = requireNonNull(addMetersBatch);
67 this.removeMetersBatch = requireNonNull(removeMetersBatch);
68 this.updateMetersBatch = requireNonNull(updateMetersBatch);
72 public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> invoke(final ProcessFlatBatchInput input) {
73 final var batch = input.nonnullBatch().values();
74 if (LOG.isTraceEnabled()) {
75 LOG.trace("processing flat batch @ {} : {}", PathUtil.extractNodeId(input.getNode()).getValue(),
80 final var batchPlan = FlatBatchUtil.assembleBatchPlan(batch);
81 // add barriers where needed
82 FlatBatchUtil.markBarriersWhereNeeded(batchPlan);
83 // prepare chain elements
84 final var batchChainElements = prepareBatchChain(batchPlan, input.getNode(), input.getExitOnFirstError());
85 // execute plan with barriers and collect outputs chain correspondingly, collect results
86 return executeBatchPlan(batchChainElements);
90 static ListenableFuture<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(
91 final List<BatchStepJob> batchJobsChain) {
92 BatchStepJob batchJob;
93 final var firedJobs = new ArrayList<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>>();
94 var chainSummaryResult = FlatBatchUtil.createEmptyRpcBatchResultFuture(true);
96 for (int i = 0; i < batchJobsChain.size(); i++) {
97 batchJob = batchJobsChain.get(i);
98 // wire actual job with chain
99 firedJobs.add(Futures.transformAsync(chainSummaryResult, batchJob.getStepFunction(),
100 MoreExecutors.directExecutor()));
101 // if barrier after actual job is needed or it is the last job -> merge fired job results with chain result
102 if (batchJob.getPlanStep().isBarrierAfter() || i == batchJobsChain.size() - 1) {
103 firedJobs.add(0, chainSummaryResult);
104 chainSummaryResult = FlatBatchUtil.mergeJobsResultsFutures(firedJobs);
108 return chainSummaryResult;
112 List<BatchStepJob> prepareBatchChain(final List<BatchPlanStep> batchPlan, final NodeRef node,
113 final boolean exitOnFirstError) {
114 // create batch API calls based on plan steps
115 final var chainJobs = new ArrayList<BatchStepJob>();
117 for (final BatchPlanStep planStep : batchPlan) {
118 final int currentOffset = stepOffset;
119 chainJobs.add(new BatchStepJob(planStep, chainInput -> {
120 if (exitOnFirstError && !chainInput.isSuccessful()) {
121 LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
122 return FlatBatchUtil.createEmptyRpcBatchResultFuture(false);
124 LOG.trace("batch progressing on step type {}, previous steps result: {}", planStep.getStepType(),
125 chainInput.isSuccessful());
126 return getChainOutput(node, planStep, currentOffset);
128 stepOffset += planStep.getTaskBag().size();
134 private ListenableFuture<RpcResult<ProcessFlatBatchOutput>> getChainOutput(final NodeRef node,
135 final BatchPlanStep planStep, final int currentOffset) {
136 return switch (planStep.getStepType()) {
137 case FLOW_ADD -> FlatBatchFlowAdapters.convertFlowBatchFutureForChain(
138 addFlowsBatch.invoke(FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node)), currentOffset);
139 case FLOW_REMOVE -> FlatBatchFlowAdapters.convertFlowBatchFutureForChain(
140 removeFlowsBatch.invoke(FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node)), currentOffset);
141 case FLOW_UPDATE -> FlatBatchFlowAdapters.convertFlowBatchFutureForChain(
142 updateFlowsBatch.invoke(FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node)), currentOffset);
143 case GROUP_ADD -> FlatBatchGroupAdapters.convertGroupBatchFutureForChain(
144 addGroupsBatch.invoke(FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node)), currentOffset);
145 case GROUP_REMOVE -> FlatBatchGroupAdapters.convertGroupBatchFutureForChain(
146 removeGroupsBatch.invoke(FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node)),
148 case GROUP_UPDATE -> FlatBatchGroupAdapters.convertGroupBatchFutureForChain(
149 updateGroupsBatch.invoke(FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node)),
151 case METER_ADD -> FlatBatchMeterAdapters.convertMeterBatchFutureForChain(
152 addMetersBatch.invoke(FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node)), currentOffset);
153 case METER_REMOVE -> FlatBatchMeterAdapters.convertMeterBatchFutureForChain(
154 removeMetersBatch.invoke(FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node)),
156 case METER_UPDATE -> FlatBatchMeterAdapters.convertMeterBatchFutureForChain(
157 updateMetersBatch.invoke(FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node)),