Merge "BUG-8607: Fix issues in checkstyle enforcement for module bulk-o-matic"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterDirectOFRpc.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.base.Optional;
11 import java.util.Collections;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Set;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.TimeUnit;
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
20 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
32 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 public class FlowWriterDirectOFRpc {
37
38     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterDirectOFRpc.class);
39     private final DataBroker dataBroker;
40     private final SalFlowService flowService;
41     private final ExecutorService flowPusher;
42     private static final long PAUSE_BETWEEN_BATCH_MILLIS = 40;
43
44     public FlowWriterDirectOFRpc(final DataBroker dataBroker, final SalFlowService salFlowService,
45             final ExecutorService flowPusher) {
46         this.dataBroker = dataBroker;
47         this.flowService = salFlowService;
48         this.flowPusher = flowPusher;
49     }
50
51     public void rpcFlowAdd(String dpId, int flowsPerDpn, int batchSize) {
52         if (!getAllNodes().isEmpty() && getAllNodes().contains(dpId)) {
53             FlowRPCHandlerTask addFlowRpcTask = new FlowRPCHandlerTask(dpId, flowsPerDpn, batchSize);
54             flowPusher.execute(addFlowRpcTask);
55         }
56     }
57
58     public void rpcFlowAddAll(int flowsPerDpn, int batchSize) {
59         Set<String> nodeIdSet = getAllNodes();
60         if (nodeIdSet.isEmpty()) {
61             LOG.warn("No nodes seen on OPERATIONAL DS. Aborting !!!!");
62         } else {
63             for (String dpId : nodeIdSet) {
64                 LOG.info("Starting FlowRPCTaskHandler for switch id {}", dpId);
65                 FlowRPCHandlerTask addFlowRpcTask = new FlowRPCHandlerTask(dpId, flowsPerDpn, batchSize);
66                 flowPusher.execute(addFlowRpcTask);
67             }
68         }
69     }
70
71     private Set<String> getAllNodes() {
72
73         Set<String> nodeIds = new HashSet<>();
74         InstanceIdentifier<Nodes> nodes = InstanceIdentifier.create(Nodes.class);
75         ReadOnlyTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction();
76
77         try {
78             Optional<Nodes> nodesDataNode = readOnlyTransaction.read(LogicalDatastoreType.OPERATIONAL, nodes)
79                     .checkedGet();
80             if (nodesDataNode.isPresent()) {
81                 List<Node> nodesCollection = nodesDataNode.get().getNode();
82                 if (nodesCollection != null && !nodesCollection.isEmpty()) {
83                     for (Node node : nodesCollection) {
84                         LOG.info("Switch with ID {} discovered !!", node.getId().getValue());
85                         nodeIds.add(node.getId().getValue());
86                     }
87                 } else {
88                     return Collections.emptySet();
89                 }
90             } else {
91                 return Collections.emptySet();
92             }
93         } catch (ReadFailedException rdFailedException) {
94             LOG.error("Failed to read connected nodes {}", rdFailedException);
95         }
96         return nodeIds;
97     }
98
99     public class FlowRPCHandlerTask implements Runnable {
100         private final String dpId;
101         private final int flowsPerDpn;
102         private final int batchSize;
103
104         public FlowRPCHandlerTask(final String dpId, final int flowsPerDpn, final int batchSize) {
105             this.dpId = dpId;
106             this.flowsPerDpn = flowsPerDpn;
107             this.batchSize = batchSize;
108         }
109
110         @Override
111         public void run() {
112
113             short tableId = (short) 1;
114             int initFlowId = 500;
115
116             for (int i = 1; i <= flowsPerDpn; i++) {
117
118                 String flowId = Integer.toString(initFlowId + i);
119
120                 LOG.debug("Framing AddFlowInput for flow-id {}", flowId);
121
122                 Match match = BulkOMaticUtils.getMatch(i);
123                 InstanceIdentifier<Node> nodeIId = BulkOMaticUtils.getFlowCapableNodeId(dpId);
124                 InstanceIdentifier<Table> tableIId = BulkOMaticUtils.getTableId(tableId, dpId);
125                 InstanceIdentifier<Flow> flowIId = BulkOMaticUtils.getFlowId(tableIId, flowId);
126
127                 Flow flow = BulkOMaticUtils.buildFlow(tableId, flowId, match);
128
129                 AddFlowInputBuilder builder = new AddFlowInputBuilder(flow);
130                 builder.setNode(new NodeRef(nodeIId));
131                 builder.setFlowTable(new FlowTableRef(tableIId));
132                 builder.setFlowRef(new FlowRef(flowIId));
133
134                 AddFlowInput addFlowInput = builder.build();
135
136                 LOG.debug("RPC invocation for adding flow-id {} with input {}", flowId, addFlowInput.toString());
137                 flowService.addFlow(addFlowInput);
138
139                 if (i % batchSize == 0) {
140                     try {
141                         LOG.info("Pausing for {} MILLISECONDS after batch of {} RPC invocations",
142                                 PAUSE_BETWEEN_BATCH_MILLIS, batchSize);
143
144                         TimeUnit.MILLISECONDS.sleep(PAUSE_BETWEEN_BATCH_MILLIS);
145                     } catch (InterruptedException iEx) {
146                         LOG.error("Interrupted while pausing after batched push upto {}. Ex {}", i, iEx);
147                     }
148                 }
149             }
150         }
151     }
152 }