7b33a60a658099cf328137b5fdd4f047a1dda556
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalFlowsBatchServiceImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.services.sal;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.Future;
18 import org.opendaylight.openflowplugin.impl.util.BarrierUtil;
19 import org.opendaylight.openflowplugin.impl.util.FlowUtil;
20 import org.opendaylight.openflowplugin.impl.util.PathUtil;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.BatchFlowInputGrouping;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.BatchFlowInputUpdateGrouping;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.SalFlowsBatchService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.output.list.grouping.BatchFailedFlowsOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.update.flows.batch.input.BatchUpdateFlows;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * Default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}.
55  */
56 public class SalFlowsBatchServiceImpl implements SalFlowsBatchService {
57     private static final Logger LOG = LoggerFactory.getLogger(SalFlowsBatchServiceImpl.class);
58
59     private final SalFlowService salFlowService;
60     private final FlowCapableTransactionService transactionService;
61
62     public SalFlowsBatchServiceImpl(final SalFlowService salFlowService,
63                                     final FlowCapableTransactionService transactionService) {
64         this.salFlowService = Preconditions.checkNotNull(salFlowService, "delegate flow service must not be null");
65         this.transactionService =
66                 Preconditions.checkNotNull(transactionService, "delegate transaction service must not be null");
67     }
68
69     @Override
70     public Future<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBatch(final RemoveFlowsBatchInput input) {
71         LOG.trace("Removing flows @ {} : {}",
72                   PathUtil.extractNodeId(input.getNode()),
73                   input.getBatchRemoveFlows().size());
74         final ArrayList<ListenableFuture<RpcResult<RemoveFlowOutput>>> resultsLot = new ArrayList<>();
75         for (BatchFlowInputGrouping batchFlow : input.getBatchRemoveFlows()) {
76             final RemoveFlowInput removeFlowInput = new RemoveFlowInputBuilder(batchFlow)
77                     .setFlowRef(createFlowRef(input.getNode(), batchFlow))
78                     .setNode(input.getNode())
79                     .build();
80             resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.removeFlow(removeFlowInput)));
81         }
82
83         final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
84                 Futures.transform(Futures.successfulAsList(resultsLot),
85                         FlowUtil.<RemoveFlowOutput>createCumulatingFunction(input.getBatchRemoveFlows()),
86                         MoreExecutors.directExecutor());
87
88         ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBulkFuture =
89                 Futures.transform(commonResult, FlowUtil.FLOW_REMOVE_TRANSFORM, MoreExecutors.directExecutor());
90
91         if (input.isBarrierAfter()) {
92             removeFlowsBulkFuture = BarrierUtil.chainBarrier(removeFlowsBulkFuture, input.getNode(),
93                     transactionService, FlowUtil.FLOW_REMOVE_COMPOSING_TRANSFORM);
94         }
95
96         return removeFlowsBulkFuture;
97     }
98
99     @Override
100     public Future<RpcResult<AddFlowsBatchOutput>> addFlowsBatch(final AddFlowsBatchInput input) {
101         LOG.trace("Adding flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchAddFlows().size());
102         final ArrayList<ListenableFuture<RpcResult<AddFlowOutput>>> resultsLot = new ArrayList<>();
103         for (BatchFlowInputGrouping batchFlow : input.getBatchAddFlows()) {
104             final AddFlowInput addFlowInput = new AddFlowInputBuilder(batchFlow)
105                     .setFlowRef(createFlowRef(input.getNode(), batchFlow))
106                     .setNode(input.getNode())
107                     .build();
108             resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.addFlow(addFlowInput)));
109         }
110
111         final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
112                 Futures.transform(Futures.successfulAsList(resultsLot),
113                         FlowUtil.<AddFlowOutput>createCumulatingFunction(input.getBatchAddFlows()),
114                         MoreExecutors.directExecutor());
115
116         ListenableFuture<RpcResult<AddFlowsBatchOutput>> addFlowsBulkFuture =
117                 Futures.transform(commonResult, FlowUtil.FLOW_ADD_TRANSFORM, MoreExecutors.directExecutor());
118
119         if (input.isBarrierAfter()) {
120             addFlowsBulkFuture = BarrierUtil.chainBarrier(addFlowsBulkFuture, input.getNode(),
121                     transactionService, FlowUtil.FLOW_ADD_COMPOSING_TRANSFORM);
122         }
123
124         return addFlowsBulkFuture;
125     }
126
127     private static FlowRef createFlowRef(final NodeRef nodeRef, final BatchFlowInputGrouping batchFlow) {
128         return FlowUtil.buildFlowPath((InstanceIdentifier<Node>) nodeRef.getValue(),
129                 batchFlow.getTableId(), batchFlow.getFlowId());
130     }
131
132     private static FlowRef createFlowRef(final NodeRef nodeRef, final BatchFlowInputUpdateGrouping batchFlow) {
133         return FlowUtil.buildFlowPath((InstanceIdentifier<Node>) nodeRef.getValue(),
134                 batchFlow.getOriginalBatchedFlow().getTableId(), batchFlow.getFlowId());
135     }
136
137     @Override
138     public Future<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBatch(final UpdateFlowsBatchInput input) {
139         LOG.trace("Updating flows @ {} : {}",
140                   PathUtil.extractNodeId(input.getNode()),
141                   input.getBatchUpdateFlows().size());
142         final ArrayList<ListenableFuture<RpcResult<UpdateFlowOutput>>> resultsLot = new ArrayList<>();
143         for (BatchUpdateFlows batchFlow : input.getBatchUpdateFlows()) {
144             final UpdateFlowInput updateFlowInput = new UpdateFlowInputBuilder(input)
145                     .setOriginalFlow(new OriginalFlowBuilder(batchFlow.getOriginalBatchedFlow()).build())
146                     .setUpdatedFlow(new UpdatedFlowBuilder(batchFlow.getUpdatedBatchedFlow()).build())
147                     .setFlowRef(createFlowRef(input.getNode(), batchFlow))
148                     .setNode(input.getNode())
149                     .build();
150             resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.updateFlow(updateFlowInput)));
151         }
152
153         final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
154                 Futures.transform(Futures.successfulAsList(resultsLot),
155                                   FlowUtil.<UpdateFlowOutput>createCumulatingFunction(input.getBatchUpdateFlows()),
156                         MoreExecutors.directExecutor());
157
158         ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBulkFuture =
159                 Futures.transform(commonResult, FlowUtil.FLOW_UPDATE_TRANSFORM, MoreExecutors.directExecutor());
160
161         if (input.isBarrierAfter()) {
162             updateFlowsBulkFuture = BarrierUtil.chainBarrier(updateFlowsBulkFuture, input.getNode(),
163                     transactionService, FlowUtil.FLOW_UPDATE_COMPOSING_TRANSFORM);
164         }
165
166         return updateFlowsBulkFuture;
167     }
168 }