2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.services.sal;
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.ArrayList;
15 import java.util.List;
16 import org.opendaylight.openflowplugin.impl.util.BarrierUtil;
17 import org.opendaylight.openflowplugin.impl.util.FlowUtil;
18 import org.opendaylight.openflowplugin.impl.util.PathUtil;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.BatchFlowInputGrouping;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.BatchFlowInputUpdateGrouping;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.SalFlowsBatchService;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.output.list.grouping.BatchFailedFlowsOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.update.flows.batch.input.BatchUpdateFlows;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
52 * Default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}.
54 public class SalFlowsBatchServiceImpl implements SalFlowsBatchService {
55 private static final Logger LOG = LoggerFactory.getLogger(SalFlowsBatchServiceImpl.class);
57 private final SalFlowService salFlowService;
58 private final FlowCapableTransactionService transactionService;
60 public SalFlowsBatchServiceImpl(final SalFlowService salFlowService,
61 final FlowCapableTransactionService transactionService) {
62 this.salFlowService = Preconditions.checkNotNull(salFlowService, "delegate flow service must not be null");
63 this.transactionService =
64 Preconditions.checkNotNull(transactionService, "delegate transaction service must not be null");
68 public ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBatch(final RemoveFlowsBatchInput input) {
69 LOG.trace("Removing flows @ {} : {}",
70 PathUtil.extractNodeId(input.getNode()),
71 input.getBatchRemoveFlows().size());
72 final ArrayList<ListenableFuture<RpcResult<RemoveFlowOutput>>> resultsLot = new ArrayList<>();
73 for (BatchFlowInputGrouping batchFlow : input.nonnullBatchRemoveFlows().values()) {
74 final RemoveFlowInput removeFlowInput = new RemoveFlowInputBuilder(batchFlow)
75 .setFlowRef(createFlowRef(input.getNode(), batchFlow))
76 .setNode(input.getNode())
78 resultsLot.add(salFlowService.removeFlow(removeFlowInput));
81 final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
82 Futures.transform(Futures.successfulAsList(resultsLot),
83 FlowUtil.createCumulatingFunction(input.nonnullBatchRemoveFlows().values()),
84 MoreExecutors.directExecutor());
86 ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBulkFuture =
87 Futures.transform(commonResult, FlowUtil.FLOW_REMOVE_TRANSFORM, MoreExecutors.directExecutor());
89 if (input.isBarrierAfter()) {
90 removeFlowsBulkFuture = BarrierUtil.chainBarrier(removeFlowsBulkFuture, input.getNode(),
91 transactionService, FlowUtil.FLOW_REMOVE_COMPOSING_TRANSFORM);
94 return removeFlowsBulkFuture;
98 public ListenableFuture<RpcResult<AddFlowsBatchOutput>> addFlowsBatch(final AddFlowsBatchInput input) {
99 LOG.trace("Adding flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchAddFlows().size());
100 final ArrayList<ListenableFuture<RpcResult<AddFlowOutput>>> resultsLot = new ArrayList<>();
101 for (BatchFlowInputGrouping batchFlow : input.nonnullBatchAddFlows().values()) {
102 final AddFlowInput addFlowInput = new AddFlowInputBuilder(batchFlow)
103 .setFlowRef(createFlowRef(input.getNode(), batchFlow))
104 .setNode(input.getNode())
106 resultsLot.add(salFlowService.addFlow(addFlowInput));
109 final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
110 Futures.transform(Futures.successfulAsList(resultsLot),
111 FlowUtil.createCumulatingFunction(input.nonnullBatchAddFlows().values()),
112 MoreExecutors.directExecutor());
114 ListenableFuture<RpcResult<AddFlowsBatchOutput>> addFlowsBulkFuture =
115 Futures.transform(commonResult, FlowUtil.FLOW_ADD_TRANSFORM, MoreExecutors.directExecutor());
117 if (input.isBarrierAfter()) {
118 addFlowsBulkFuture = BarrierUtil.chainBarrier(addFlowsBulkFuture, input.getNode(),
119 transactionService, FlowUtil.FLOW_ADD_COMPOSING_TRANSFORM);
122 return addFlowsBulkFuture;
125 private static FlowRef createFlowRef(final NodeRef nodeRef, final BatchFlowInputGrouping batchFlow) {
126 return FlowUtil.buildFlowPath((InstanceIdentifier<Node>) nodeRef.getValue(),
127 batchFlow.getTableId(), batchFlow.getFlowId());
130 private static FlowRef createFlowRef(final NodeRef nodeRef, final BatchFlowInputUpdateGrouping batchFlow) {
131 return FlowUtil.buildFlowPath((InstanceIdentifier<Node>) nodeRef.getValue(),
132 batchFlow.getOriginalBatchedFlow().getTableId(), batchFlow.getFlowId());
136 public ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBatch(final UpdateFlowsBatchInput input) {
137 LOG.trace("Updating flows @ {} : {}",
138 PathUtil.extractNodeId(input.getNode()),
139 input.getBatchUpdateFlows().size());
140 final ArrayList<ListenableFuture<RpcResult<UpdateFlowOutput>>> resultsLot = new ArrayList<>();
141 for (BatchUpdateFlows batchFlow : input.nonnullBatchUpdateFlows().values()) {
142 final UpdateFlowInput updateFlowInput = new UpdateFlowInputBuilder(input)
143 .setOriginalFlow(new OriginalFlowBuilder(batchFlow.getOriginalBatchedFlow()).build())
144 .setUpdatedFlow(new UpdatedFlowBuilder(batchFlow.getUpdatedBatchedFlow()).build())
145 .setFlowRef(createFlowRef(input.getNode(), batchFlow))
146 .setNode(input.getNode())
148 resultsLot.add(salFlowService.updateFlow(updateFlowInput));
151 final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
152 Futures.transform(Futures.successfulAsList(resultsLot),
153 FlowUtil.createCumulatingFunction(input.nonnullBatchUpdateFlows().values()),
154 MoreExecutors.directExecutor());
156 ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBulkFuture =
157 Futures.transform(commonResult, FlowUtil.FLOW_UPDATE_TRANSFORM, MoreExecutors.directExecutor());
159 if (input.isBarrierAfter()) {
160 updateFlowsBulkFuture = BarrierUtil.chainBarrier(updateFlowsBulkFuture, input.getNode(),
161 transactionService, FlowUtil.FLOW_UPDATE_COMPOSING_TRANSFORM);
164 return updateFlowsBulkFuture;