Merge "BUG-4117: notification supplier - rename"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / SalFlatBatchServiceImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.services;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.Future;
19 import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
20 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchFlowAdapters;
21 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchGroupAdapters;
22 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchMeterAdapters;
23 import org.opendaylight.openflowplugin.impl.util.FlatBatchUtil;
24 import org.opendaylight.openflowplugin.impl.util.PathUtil;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.SalFlowsBatchService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatchInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.AddGroupsBatchOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatchInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.RemoveGroupsBatchOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.SalGroupsBatchService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatchInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.UpdateGroupsBatchOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.SalMetersBatchService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutput;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58  * default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}
59  */
60 public class SalFlatBatchServiceImpl implements SalFlatBatchService {
61     private static final Logger LOG = LoggerFactory.getLogger(SalFlatBatchServiceImpl.class);
62
63     private final SalFlowsBatchService salFlowService;
64     private final SalGroupsBatchService salGroupService;
65     private final SalMetersBatchService salMeterService;
66
67     public SalFlatBatchServiceImpl(final SalFlowsBatchService salFlowBatchService,
68                                    final SalGroupsBatchService salGroupsBatchService,
69                                    final SalMetersBatchService salMetersBatchService) {
70         this.salFlowService = Preconditions.checkNotNull(salFlowBatchService, "delegate flow service must not be null");
71         this.salGroupService = Preconditions.checkNotNull(salGroupsBatchService, "delegate group service must not be null");
72         this.salMeterService = Preconditions.checkNotNull(salMetersBatchService, "delegate meter service must not be null");
73     }
74
75     @Override
76     public Future<RpcResult<ProcessFlatBatchOutput>> processFlatBatch(final ProcessFlatBatchInput input) {
77         LOG.trace("processing flat batch @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatch().size());
78
79         // create plan
80         final List<BatchPlanStep> batchPlan = FlatBatchUtil.assembleBatchPlan(input.getBatch());
81         // add barriers where needed
82         FlatBatchUtil.markBarriersWhereNeeded(batchPlan);
83         // prepare chain elements
84         final List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>> batchChainElements =
85                 prepareBatchChain(batchPlan, input.getNode(), input.isExitOnFirstError());
86         // execute plan with barriers and collect outputs chain correspondingly, collect results
87         return executeBatchPlan(batchChainElements);
88     }
89
90     @VisibleForTesting
91     Future<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>> batchChainElements) {
92         ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainSummaryResult =
93                 RpcResultBuilder.success(new ProcessFlatBatchOutputBuilder().build()).buildFuture();
94
95         for (AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>> chainElement : batchChainElements) {
96             chainSummaryResult = Futures.transform(chainSummaryResult, chainElement);
97         }
98
99         return chainSummaryResult;
100
101     }
102
103     @VisibleForTesting
104     List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>> prepareBatchChain(
105             final List<BatchPlanStep> batchPlan,
106             final NodeRef node,
107             final boolean exitOnFirstError) {
108
109         // create batch API calls based on plan steps
110         final List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>> chainJobs = new ArrayList<>();
111         int stepOffset = 0;
112         for (final BatchPlanStep planStep : batchPlan) {
113             final int currentOffset = stepOffset;
114             chainJobs.add(new AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>() {
115                 @Override
116                 public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> apply(final RpcResult<ProcessFlatBatchOutput> chainInput) throws Exception {
117                     if (exitOnFirstError && !chainInput.isSuccessful()) {
118                         LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
119                         return Futures.immediateFuture(chainInput);
120                     }
121
122                     LOG.trace("batch progressing on step type {}", planStep.getStepType());
123                     LOG.trace("batch progressing previous step result: {}", chainInput.isSuccessful());
124
125                     final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
126                     switch (planStep.getStepType()) {
127                         case FLOW_ADD:
128                             final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(
129                                     planStep, node);
130                             final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
131                             chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultAddFlowFuture, currentOffset);
132                             break;
133                         case FLOW_REMOVE:
134                             final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(
135                                     planStep, node);
136                             final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
137                             chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultRemoveFlowFuture, currentOffset);
138                             break;
139                         case FLOW_UPDATE:
140                             final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(
141                                     planStep, node);
142                             final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
143                             chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultUpdateFlowFuture, currentOffset);
144                             break;
145                         case GROUP_ADD:
146                             final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(
147                                     planStep, node);
148                             final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
149                             chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultAddGroupFuture, currentOffset);
150                             break;
151                         case GROUP_REMOVE:
152                             final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(
153                                     planStep, node);
154                             final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
155                             chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultRemoveGroupFuture, currentOffset);
156                             break;
157                         case GROUP_UPDATE:
158                             final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(
159                                     planStep, node);
160                             final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
161                             chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultUpdateGroupFuture, currentOffset);
162                             break;
163                         case METER_ADD:
164                             final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(
165                                     planStep, node);
166                             final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
167                             chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultAddMeterFuture, currentOffset);
168                             break;
169                         case METER_REMOVE:
170                             final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(
171                                     planStep, node);
172                             final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
173                             chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultRemoveMeterFuture, currentOffset);
174                             break;
175                         case METER_UPDATE:
176                             final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(
177                                     planStep, node);
178                             final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
179                             chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultUpdateMeterFuture, currentOffset);
180                             break;
181                         default:
182                             LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
183                             chainOutput = Futures.immediateFuture(chainInput);
184                     }
185                     return chainOutput;
186                 }
187             });
188             stepOffset += planStep.getTaskBag().size();
189         }
190
191         return chainJobs;
192     }
193
194 }