2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.JdkFutureAdapters;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.concurrent.Future;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.common.RpcError;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
54 * Simple implementation providing bulk flows operations.
56 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
58 private final SalFlowService flowService;
59 private final DataBroker dataBroker;
61 public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
62 this.flowService = Preconditions.checkNotNull(flowService);
63 this.dataBroker = Preconditions.checkNotNull(dataBroker);
68 public Future<RpcResult<Void>> addFlowsDs(AddFlowsDsInput input) {
69 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
70 boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
71 boolean createParents = true;
72 for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
73 FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
74 flowBuilder.setTableId(bulkFlow.getTableId());
75 flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
76 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow), flowBuilder.build(), createParents);
77 createParents = createParentsNextTime;
79 CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
80 return handleResultFuture(submitFuture);
83 private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
84 final NodeRef nodeRef = bulkFlow.getNode();
85 return ((InstanceIdentifier<Node>) nodeRef.getValue())
86 .augmentation(FlowCapableNode.class)
87 .child(Table.class, new TableKey(bulkFlow.getTableId()))
88 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow.class,
89 new FlowKey(new FlowId(bulkFlow.getFlowId())));
93 public Future<RpcResult<Void>> removeFlowsDs(RemoveFlowsDsInput input) {
94 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
95 for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
96 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
98 CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
99 return handleResultFuture(submitFuture);
102 private ListenableFuture<RpcResult<Void>> handleResultFuture(CheckedFuture<Void, TransactionCommitFailedException> submitFuture) {
103 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
104 Futures.addCallback(submitFuture, new FutureCallback<Void>() {
106 public void onSuccess(Void result) {
107 rpcResult.set(RpcResultBuilder.success(result).build());
111 public void onFailure(Throwable t) {
112 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
113 .withRpcErrors(Collections.singleton(
114 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
116 rpcResult.set(rpcResultBld.build());
122 private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
123 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
124 Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
126 public void onSuccess(List<T> result) {
127 rpcResult.set(RpcResultBuilder.success((Void) null).build());
131 public void onFailure(Throwable t) {
132 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
133 .withRpcErrors(Collections.singleton(
134 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
136 rpcResult.set(rpcResultBld.build());
143 public Future<RpcResult<Void>> addFlowsRpc(AddFlowsRpcInput input) {
144 List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
146 for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
147 AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder((Flow) bulkFlow);
148 final NodeRef nodeRef = bulkFlow.getNode();
149 flowInputBuilder.setNode(nodeRef);
150 flowInputBuilder.setTableId(bulkFlow.getTableId());
151 Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
152 bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
154 return handleResultFuture(Futures.allAsList(bulkResults));
158 public Future<RpcResult<Void>> removeFlowsRpc(RemoveFlowsRpcInput input) {
159 List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
161 for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
162 RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder((Flow) bulkFlow);
163 final NodeRef nodeRef = bulkFlow.getNode();
164 flowInputBuilder.setNode(nodeRef);
165 flowInputBuilder.setTableId(bulkFlow.getTableId());
166 Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
167 bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
169 return handleResultFuture(Futures.allAsList(bulkResults));