Merge "bulk support application - initial proposal"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / SalBulkFlowServiceImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.JdkFutureAdapters;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.concurrent.Future;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.common.RpcError;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
52
53 /**
54  * Simple implementation providing bulk flows operations.
55  */
56 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
57
58     private final SalFlowService flowService;
59     private final DataBroker dataBroker;
60
61     public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
62         this.flowService = Preconditions.checkNotNull(flowService);
63         this.dataBroker = Preconditions.checkNotNull(dataBroker);
64     }
65
66
67     @Override
68     public Future<RpcResult<Void>> addFlowsDs(AddFlowsDsInput input) {
69         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
70         boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
71         boolean createParents = true;
72         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
73             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
74             flowBuilder.setTableId(bulkFlow.getTableId());
75             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
76             writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow), flowBuilder.build(), createParents);
77             createParents = createParentsNextTime;
78         }
79         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
80         return handleResultFuture(submitFuture);
81     }
82
83     private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
84         final NodeRef nodeRef = bulkFlow.getNode();
85         return ((InstanceIdentifier<Node>) nodeRef.getValue())
86                 .augmentation(FlowCapableNode.class)
87                 .child(Table.class, new TableKey(bulkFlow.getTableId()))
88                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow.class,
89                         new FlowKey(new FlowId(bulkFlow.getFlowId())));
90     }
91
92     @Override
93     public Future<RpcResult<Void>> removeFlowsDs(RemoveFlowsDsInput input) {
94         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
95         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
96             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
97         }
98         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
99         return handleResultFuture(submitFuture);
100     }
101
102     private ListenableFuture<RpcResult<Void>> handleResultFuture(CheckedFuture<Void, TransactionCommitFailedException> submitFuture) {
103         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
104         Futures.addCallback(submitFuture, new FutureCallback<Void>() {
105             @Override
106             public void onSuccess(Void result) {
107                 rpcResult.set(RpcResultBuilder.success(result).build());
108             }
109
110             @Override
111             public void onFailure(Throwable t) {
112                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
113                         .withRpcErrors(Collections.singleton(
114                                 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
115                         ));
116                 rpcResult.set(rpcResultBld.build());
117             }
118         });
119         return rpcResult;
120     }
121
122     private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
123         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
124         Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
125             @Override
126             public void onSuccess(List<T> result) {
127                 rpcResult.set(RpcResultBuilder.success((Void) null).build());
128             }
129
130             @Override
131             public void onFailure(Throwable t) {
132                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
133                         .withRpcErrors(Collections.singleton(
134                                 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
135                         ));
136                 rpcResult.set(rpcResultBld.build());
137             }
138         });
139         return rpcResult;
140     }
141
142     @Override
143     public Future<RpcResult<Void>> addFlowsRpc(AddFlowsRpcInput input) {
144         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
145
146         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
147             AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder((Flow) bulkFlow);
148             final NodeRef nodeRef = bulkFlow.getNode();
149             flowInputBuilder.setNode(nodeRef);
150             flowInputBuilder.setTableId(bulkFlow.getTableId());
151             Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
152             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
153         }
154         return handleResultFuture(Futures.allAsList(bulkResults));
155     }
156
157     @Override
158     public Future<RpcResult<Void>> removeFlowsRpc(RemoveFlowsRpcInput input) {
159         List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
160
161         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
162             RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder((Flow) bulkFlow);
163             final NodeRef nodeRef = bulkFlow.getNode();
164             flowInputBuilder.setNode(nodeRef);
165             flowInputBuilder.setTableId(bulkFlow.getTableId());
166             Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
167             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
168         }
169         return handleResultFuture(Futures.allAsList(bulkResults));
170     }
171 }