From 89246202a625302ae22911da3dd47f2929d1630d Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 6 Mar 2024 15:28:29 +0100 Subject: [PATCH] Fix drop-test-karaf service injection drop-test-karaf is based on blueprint, hence it can only consume interface-based services. Introduce DropTest{,Commiter,Sender} interfaces to allow injection to work, fixing a NPE seen in CSIT. JIRA: OPNFLWPLUG-1112 Change-Id: I3b929fe4f2c01761c0451c440e9408c144a8bc73 Signed-off-by: Robert Varga --- .../testcommon/AbstractDropTest.java | 4 +- .../openflowplugin/testcommon/DropTest.java | 19 +++ .../testcommon/DropTestCommiter.java | 137 +--------------- .../testcommon/DropTestCommiterImpl.java | 147 ++++++++++++++++++ .../testcommon/DropTestRpcSender.java | 123 +-------------- .../testcommon/DropTestRpcSenderImpl.java | 133 ++++++++++++++++ 6 files changed, 306 insertions(+), 257 deletions(-) create mode 100644 test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTest.java create mode 100644 test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestCommiterImpl.java create mode 100644 test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestRpcSenderImpl.java diff --git a/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/AbstractDropTest.java b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/AbstractDropTest.java index d7703b5e6c..0b00f2e782 100644 --- a/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/AbstractDropTest.java +++ b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/AbstractDropTest.java @@ -44,7 +44,7 @@ import org.opendaylight.yangtools.yang.common.Uint8; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class AbstractDropTest implements Listener, AutoCloseable { +abstract class AbstractDropTest implements Listener, AutoCloseable, DropTest { private static final Logger LOG = LoggerFactory.getLogger(AbstractDropTest.class); protected static final Uint16 PRIORITY = Uint16.valueOf(4); @@ -86,6 +86,7 @@ abstract class AbstractDropTest implements Listener, AutoCloseab .newUpdater(AbstractDropTest.class, "runablesRejected"); protected volatile int runablesRejected; + @Override public final DropTestStats getStats() { return new DropTestStats(sent, rcvd, excs, ftrFailed, ftrSuccess, runablesExecuted, runablesRejected); } @@ -107,6 +108,7 @@ abstract class AbstractDropTest implements Listener, AutoCloseab executorService = threadPool; } + @Override public final void clearStats() { sent = 0; rcvd = 0; diff --git a/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTest.java b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTest.java new file mode 100644 index 0000000000..7f07c70f3b --- /dev/null +++ b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTest.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.testcommon; + +public interface DropTest { + + DropTestStats getStats(); + + void clearStats(); + + boolean start(); + + boolean stop(); +} diff --git a/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestCommiter.java b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestCommiter.java index 08d9f16b9a..44d0ebff55 100644 --- a/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestCommiter.java +++ b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestCommiter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,139 +7,6 @@ */ package org.opendaylight.openflowplugin.testcommon; -import static java.util.Objects.requireNonNull; +public interface DropTestCommiter extends DropTest { -import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.PreDestroy; -import javax.inject.Inject; -import javax.inject.Singleton; -import org.opendaylight.mdsal.binding.api.DataBroker; -import org.opendaylight.mdsal.binding.api.NotificationService; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowModFlags; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Instructions; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived; -import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.Uint64; -import org.opendaylight.yangtools.yang.common.Uint8; -import org.osgi.service.component.annotations.Activate; -import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Deactivate; -import org.osgi.service.component.annotations.Reference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provides cbench responder behavior: upon packetIn arrival addFlow action is sent out to device using dataStore - * strategy (FRM involved). - */ -@Singleton -@Component(service = DropTestCommiter.class, immediate = true) -public final class DropTestCommiter extends AbstractDropTest { - private static final Logger LOG = LoggerFactory.getLogger(DropTestCommiter.class); - private static final TableKey ZERO_TABLE = new TableKey(Uint8.ZERO); - private static final ThreadLocal BUILDER = ThreadLocal.withInitial(() -> { - final var cookie = new FlowCookie(Uint64.TEN); - return new FlowBuilder() - .setPriority(PRIORITY) - .setBufferId(BUFFER_ID) - .setCookie(cookie) - .setCookieMask(cookie) - .setTableId(TABLE_ID) - .setHardTimeout(HARD_TIMEOUT) - .setIdleTimeout(IDLE_TIMEOUT) - .setFlags(new FlowModFlags(false, false, false, false, false)); - }); - - private final AtomicLong idCounter = new AtomicLong(); - private final DataBroker dataBroker; - private final NotificationService notificationService; - - private Registration reg = null; - - @Inject - @Activate - public DropTestCommiter(@Reference final DataBroker dataBroker, - @Reference final NotificationService notificationService) { - this.dataBroker = requireNonNull(dataBroker); - this.notificationService = requireNonNull(notificationService); - } - - @PreDestroy - @Deactivate - @Override - public void close() { - stop(); - super.close(); - LOG.debug("DropTestProvider terminated"); - } - - /** - * Start listening on packetIn. - * - * @return {@code false} if already started - */ - public synchronized boolean start() { - if (reg != null) { - return false; - } - reg = notificationService.registerListener(PacketReceived.class, this); - LOG.debug("DropTestProvider started"); - return true; - } - - /** - * Stop listening on packetIn. - * - * @return {@code false} if already stopped - */ - public synchronized boolean stop() { - if (reg == null) { - return false; - } - reg.close(); - reg = null; - LOG.debug("DropTestProvider stopped"); - return true; - } - - @Override - protected void processPacket(final InstanceIdentifier node, final Match match, - final Instructions instructions) { - - // Finally build our flow - final FlowBuilder fb = BUILDER.get(); - fb.setMatch(match); - fb.setInstructions(instructions); - fb.setId(new FlowId(String.valueOf(fb.hashCode()) + "." + idCounter.getAndIncrement())); - - // Construct the flow instance id - final var flowInstanceId = node.builder() - // That is flow capable, only FlowCapableNodes have tables - .augmentation(FlowCapableNode.class) - // In the table identified by TableKey - .child(Table.class, ZERO_TABLE) - // A flow identified by flowKey - .child(Flow.class, new FlowKey(fb.getId())) - .build(); - - final var flow = fb.build(); - final var transaction = dataBroker.newReadWriteTransaction(); - - LOG.debug("onPacketReceived - About to write flow {}", flow); - transaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow); - transaction.commit(); - LOG.debug("onPacketReceived - About to write flow commited"); - } } diff --git a/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestCommiterImpl.java b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestCommiterImpl.java new file mode 100644 index 0000000000..fb73898e3a --- /dev/null +++ b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestCommiterImpl.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.testcommon; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.NotificationService; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowModFlags; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Instructions; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.Uint64; +import org.opendaylight.yangtools.yang.common.Uint8; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides cbench responder behavior: upon packetIn arrival addFlow action is sent out to device using dataStore + * strategy (FRM involved). + */ +@Singleton +@Component(service = DropTestCommiter.class, immediate = true) +public final class DropTestCommiterImpl extends AbstractDropTest implements DropTestCommiter { + private static final Logger LOG = LoggerFactory.getLogger(DropTestCommiterImpl.class); + private static final TableKey ZERO_TABLE = new TableKey(Uint8.ZERO); + private static final ThreadLocal BUILDER = ThreadLocal.withInitial(() -> { + final var cookie = new FlowCookie(Uint64.TEN); + return new FlowBuilder() + .setPriority(PRIORITY) + .setBufferId(BUFFER_ID) + .setCookie(cookie) + .setCookieMask(cookie) + .setTableId(TABLE_ID) + .setHardTimeout(HARD_TIMEOUT) + .setIdleTimeout(IDLE_TIMEOUT) + .setFlags(new FlowModFlags(false, false, false, false, false)); + }); + + private final AtomicLong idCounter = new AtomicLong(); + private final DataBroker dataBroker; + private final NotificationService notificationService; + + private Registration reg = null; + + @Inject + @Activate + public DropTestCommiterImpl(@Reference final DataBroker dataBroker, + @Reference final NotificationService notificationService) { + this.dataBroker = requireNonNull(dataBroker); + this.notificationService = requireNonNull(notificationService); + } + + @PreDestroy + @Deactivate + @Override + public void close() { + stop(); + super.close(); + LOG.debug("DropTestProvider terminated"); + } + + /** + * Start listening on packetIn. + * + * @return {@code false} if already started + */ + @Override + public synchronized boolean start() { + if (reg != null) { + return false; + } + reg = notificationService.registerListener(PacketReceived.class, this); + LOG.debug("DropTestProvider started"); + return true; + } + + /** + * Stop listening on packetIn. + * + * @return {@code false} if already stopped + */ + @Override + public synchronized boolean stop() { + if (reg == null) { + return false; + } + reg.close(); + reg = null; + LOG.debug("DropTestProvider stopped"); + return true; + } + + @Override + protected void processPacket(final InstanceIdentifier node, final Match match, + final Instructions instructions) { + + // Finally build our flow + final FlowBuilder fb = BUILDER.get(); + fb.setMatch(match); + fb.setInstructions(instructions); + fb.setId(new FlowId(String.valueOf(fb.hashCode()) + "." + idCounter.getAndIncrement())); + + // Construct the flow instance id + final var flowInstanceId = node.builder() + // That is flow capable, only FlowCapableNodes have tables + .augmentation(FlowCapableNode.class) + // In the table identified by TableKey + .child(Table.class, ZERO_TABLE) + // A flow identified by flowKey + .child(Flow.class, new FlowKey(fb.getId())) + .build(); + + final var flow = fb.build(); + final var transaction = dataBroker.newReadWriteTransaction(); + + LOG.debug("onPacketReceived - About to write flow {}", flow); + transaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow); + transaction.commit(); + LOG.debug("onPacketReceived - About to write flow commited"); + } +} diff --git a/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestRpcSender.java b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestRpcSender.java index 8b52ba2a6d..e8922da432 100644 --- a/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestRpcSender.java +++ b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestRpcSender.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,125 +7,6 @@ */ package org.opendaylight.openflowplugin.testcommon; -import static java.util.Objects.requireNonNull; +public interface DropTestRpcSender extends DropTest { -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import javax.annotation.PreDestroy; -import javax.inject.Inject; -import javax.inject.Singleton; -import org.opendaylight.mdsal.binding.api.NotificationService; -import org.opendaylight.mdsal.binding.api.RpcService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlow; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowModFlags; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Instructions; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived; -import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.Uint64; -import org.osgi.service.component.annotations.Activate; -import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Deactivate; -import org.osgi.service.component.annotations.Reference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provides cbench responder behavior: upon packetIn arrival addFlow action is sent out to device using - * {@link AddFlow} strategy. - */ -@Singleton -@Component(service = DropTestRpcSender.class, immediate = true) -public final class DropTestRpcSender extends AbstractDropTest { - private static final Logger LOG = LoggerFactory.getLogger(DropTestRpcSender.class); - private static final ThreadLocal BUILDER = ThreadLocal.withInitial(() -> { - final var cookie = new FlowCookie(Uint64.TEN); - return new AddFlowInputBuilder() - .setPriority(PRIORITY) - .setBufferId(BUFFER_ID) - .setCookie(cookie) - .setCookieMask(cookie) - .setTableId(TABLE_ID) - .setHardTimeout(HARD_TIMEOUT) - .setIdleTimeout(IDLE_TIMEOUT) - .setFlags(new FlowModFlags(false, false, false, false, false)); - }); - - private final NotificationService notificationService; - private final AddFlow addFlow; - - private Registration reg = null; - - @Inject - @Activate - public DropTestRpcSender(@Reference final NotificationService notificationService, - @Reference final RpcService rpcService) { - this.notificationService = requireNonNull(notificationService); - addFlow = rpcService.getRpc(AddFlow.class); - } - - @PreDestroy - @Deactivate - @Override - public void close() { - stop(); - super.close(); - LOG.debug("DropTestProvider terminated"); - } - - public synchronized boolean start() { - if (reg != null) { - return false; - } - reg = notificationService.registerListener(PacketReceived.class, this); - LOG.debug("DropTestProvider started"); - return true; - } - - public synchronized boolean stop() { - if (reg == null) { - return false; - } - reg.close(); - reg = null; - LOG.debug("DropTestProvider stopped"); - return true; - } - - @Override - protected void processPacket(final InstanceIdentifier node, final Match match, - final Instructions instructions) { - final AddFlowInputBuilder fb = BUILDER.get(); - - // Finally build our flow - fb.setMatch(match); - fb.setInstructions(instructions); - - // Construct the flow instance id - - fb.setNode(new NodeRef(node)); - - // Add flow - final var flow = fb.build(); - LOG.debug("onPacketReceived - About to write flow (via SalFlowService) {}", flow); - Futures.addCallback(addFlow.invoke(flow), new FutureCallback>() { - @Override - public void onSuccess(final RpcResult result) { - countFutureSuccess(); - } - - @Override - public void onFailure(final Throwable throwable) { - countFutureError(); - } - }, MoreExecutors.directExecutor()); - } } diff --git a/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestRpcSenderImpl.java b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestRpcSenderImpl.java new file mode 100644 index 0000000000..fb492debdb --- /dev/null +++ b/test-common/src/main/java/org/opendaylight/openflowplugin/testcommon/DropTestRpcSenderImpl.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.testcommon; + +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.opendaylight.mdsal.binding.api.NotificationService; +import org.opendaylight.mdsal.binding.api.RpcService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlow; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowModFlags; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Instructions; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.Uint64; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides cbench responder behavior: upon packetIn arrival addFlow action is sent out to device using + * {@link AddFlow} strategy. + */ +@Singleton +@Component(service = DropTestRpcSender.class, immediate = true) +public final class DropTestRpcSenderImpl extends AbstractDropTest implements DropTestRpcSender { + private static final Logger LOG = LoggerFactory.getLogger(DropTestRpcSenderImpl.class); + private static final ThreadLocal BUILDER = ThreadLocal.withInitial(() -> { + final var cookie = new FlowCookie(Uint64.TEN); + return new AddFlowInputBuilder() + .setPriority(PRIORITY) + .setBufferId(BUFFER_ID) + .setCookie(cookie) + .setCookieMask(cookie) + .setTableId(TABLE_ID) + .setHardTimeout(HARD_TIMEOUT) + .setIdleTimeout(IDLE_TIMEOUT) + .setFlags(new FlowModFlags(false, false, false, false, false)); + }); + + private final NotificationService notificationService; + private final AddFlow addFlow; + + private Registration reg = null; + + @Inject + @Activate + public DropTestRpcSenderImpl(@Reference final NotificationService notificationService, + @Reference final RpcService rpcService) { + this.notificationService = requireNonNull(notificationService); + addFlow = rpcService.getRpc(AddFlow.class); + } + + @PreDestroy + @Deactivate + @Override + public void close() { + stop(); + super.close(); + LOG.debug("DropTestProvider terminated"); + } + + @Override + public synchronized boolean start() { + if (reg != null) { + return false; + } + reg = notificationService.registerListener(PacketReceived.class, this); + LOG.debug("DropTestProvider started"); + return true; + } + + @Override + public synchronized boolean stop() { + if (reg == null) { + return false; + } + reg.close(); + reg = null; + LOG.debug("DropTestProvider stopped"); + return true; + } + + @Override + protected void processPacket(final InstanceIdentifier node, final Match match, + final Instructions instructions) { + final AddFlowInputBuilder fb = BUILDER.get(); + + // Finally build our flow + fb.setMatch(match); + fb.setInstructions(instructions); + + // Construct the flow instance id + + fb.setNode(new NodeRef(node)); + + // Add flow + final var flow = fb.build(); + LOG.debug("onPacketReceived - About to write flow (via SalFlowService) {}", flow); + Futures.addCallback(addFlow.invoke(flow), new FutureCallback>() { + @Override + public void onSuccess(final RpcResult result) { + countFutureSuccess(); + } + + @Override + public void onFailure(final Throwable throwable) { + countFutureError(); + } + }, MoreExecutors.directExecutor()); + } +} -- 2.36.6