X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fbulk-o-matic%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Fbulk%2Fo%2Fmatic%2FFlowWriterDirectOFRpc.java;h=5632425239fc11673341ca3ba6a29603b2fd5a39;hb=310c8fa14f33cbc4e5cfa7887bc3af9f3b8d1990;hp=59d8409e56de777777b31604f05a7997502d3241;hpb=2f235c206ba79d90e5cfc3ef18d229d830697572;p=openflowplugin.git diff --git a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterDirectOFRpc.java b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterDirectOFRpc.java index 59d8409e56..5632425239 100644 --- a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterDirectOFRpc.java +++ b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterDirectOFRpc.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Ericsson Systems, Inc. and others. All rights reserved. + * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -13,15 +13,18 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.infrautils.utils.concurrent.JdkFutures; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef; @@ -30,6 +33,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,29 +45,26 @@ public class FlowWriterDirectOFRpc { private final ExecutorService flowPusher; private static final long PAUSE_BETWEEN_BATCH_MILLIS = 40; - public FlowWriterDirectOFRpc(final DataBroker dataBroker, - final SalFlowService salFlowService, - final ExecutorService flowPusher) { + public FlowWriterDirectOFRpc(final DataBroker dataBroker, final SalFlowService salFlowService, + final ExecutorService flowPusher) { this.dataBroker = dataBroker; this.flowService = salFlowService; this.flowPusher = flowPusher; } - - public void rpcFlowAdd(String dpId, int flowsPerDpn, int batchSize){ + public void rpcFlowAdd(String dpId, int flowsPerDpn, int batchSize) { if (!getAllNodes().isEmpty() && getAllNodes().contains(dpId)) { FlowRPCHandlerTask addFlowRpcTask = new FlowRPCHandlerTask(dpId, flowsPerDpn, batchSize); flowPusher.execute(addFlowRpcTask); } } - public void rpcFlowAddAll(int flowsPerDpn, int batchSize){ + public void rpcFlowAddAll(int flowsPerDpn, int batchSize) { Set nodeIdSet = getAllNodes(); - if (nodeIdSet.isEmpty()){ + if (nodeIdSet.isEmpty()) { LOG.warn("No nodes seen on OPERATIONAL DS. Aborting !!!!"); - } - else{ - for (String dpId : nodeIdSet){ + } else { + for (String dpId : nodeIdSet) { LOG.info("Starting FlowRPCTaskHandler for switch id {}", dpId); FlowRPCHandlerTask addFlowRpcTask = new FlowRPCHandlerTask(dpId, flowsPerDpn, batchSize); flowPusher.execute(addFlowRpcTask); @@ -71,31 +72,28 @@ public class FlowWriterDirectOFRpc { } } - private Set getAllNodes(){ + private Set getAllNodes() { Set nodeIds = new HashSet<>(); InstanceIdentifier nodes = InstanceIdentifier.create(Nodes.class); - ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction(); - try { - Optional nodesDataNode = rTx.read(LogicalDatastoreType.OPERATIONAL, nodes).checkedGet(); - if (nodesDataNode.isPresent()){ + try (ReadOnlyTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction()) { + Optional nodesDataNode = readOnlyTransaction.read(LogicalDatastoreType.OPERATIONAL, nodes) + .checkedGet(); + if (nodesDataNode.isPresent()) { List nodesCollection = nodesDataNode.get().getNode(); if (nodesCollection != null && !nodesCollection.isEmpty()) { for (Node node : nodesCollection) { LOG.info("Switch with ID {} discovered !!", node.getId().getValue()); nodeIds.add(node.getId().getValue()); } - } - else{ + } else { return Collections.emptySet(); } - } - else{ + } else { return Collections.emptySet(); } - } - catch(ReadFailedException rdFailedException){ + } catch (ReadFailedException rdFailedException) { LOG.error("Failed to read connected nodes {}", rdFailedException); } return nodeIds; @@ -106,9 +104,7 @@ public class FlowWriterDirectOFRpc { private final int flowsPerDpn; private final int batchSize; - public FlowRPCHandlerTask(final String dpId, - final int flowsPerDpn, - final int batchSize){ + public FlowRPCHandlerTask(final String dpId, final int flowsPerDpn, final int batchSize) { this.dpId = dpId; this.flowsPerDpn = flowsPerDpn; this.batchSize = batchSize; @@ -117,10 +113,10 @@ public class FlowWriterDirectOFRpc { @Override public void run() { - short tableId = (short)1; + short tableId = (short) 1; int initFlowId = 500; - for (int i=1; i<= flowsPerDpn; i++){ + for (int i = 1; i <= flowsPerDpn; i++) { String flowId = Integer.toString(initFlowId + i); @@ -140,9 +136,9 @@ public class FlowWriterDirectOFRpc { AddFlowInput addFlowInput = builder.build(); - LOG.debug("RPC invocation for adding flow-id {} with input {}", flowId, - addFlowInput.toString()); - flowService.addFlow(addFlowInput); + LOG.debug("RPC invocation for adding flow-id {} with input {}", flowId, addFlowInput.toString()); + final Future> resultFuture = flowService.addFlow(addFlowInput); + JdkFutures.addErrorLogging(resultFuture, LOG, "addFlow"); if (i % batchSize == 0) { try { @@ -151,7 +147,7 @@ public class FlowWriterDirectOFRpc { TimeUnit.MILLISECONDS.sleep(PAUSE_BETWEEN_BATCH_MILLIS); } catch (InterruptedException iEx) { - LOG.error("Interrupted while pausing after batched push upto {}. Ex {}", i, iEx); + LOG.error("Interrupted while pausing after batched push upto {} Ex ", i, iEx); } } }