From 89db8a3bf04990f7c4eebe7ff51c1faacd1343a1 Mon Sep 17 00:00:00 2001 From: Jozef Bacigal Date: Mon, 27 Jun 2016 15:28:59 +0200 Subject: [PATCH] Bug 5928 - Future error after OutboundQueueEntry.java failed In case that OutboundQueueEntry has failed and after that is called commit again on the same "entry" (completed entry) with callback, on this callback was never called onFailure. - try to store last OutboundQueueErorr - if commit called on completed entry and it has a callback send onFailure to the callback - added test for entire class Change-Id: Ie347811ac6dc2c95d58ad49bbf2aa4d69033b33c Signed-off-by: Jozef Bacigal --- .../core/connection/OutboundQueueEntry.java | 26 +++- .../connection/OutboundQueueEntryTest.java | 140 ++++++++++++++++++ 2 files changed, 161 insertions(+), 5 deletions(-) create mode 100644 openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntryTest.java diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java index 70900cad..72efc18a 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java @@ -7,6 +7,7 @@ */ package org.opendaylight.openflowjava.protocol.impl.core.connection; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; @@ -24,14 +25,22 @@ final class OutboundQueueEntry { private boolean completed; private boolean barrier; private volatile boolean committed; + private OutboundQueueException lastException = null; void commit(final OfHeader message, final FutureCallback callback) { - this.message = message; - this.callback = callback; - this.barrier = message instanceof BarrierInput; + if (this.completed) { + LOG.warn("Can't commit a completed message."); + if (callback != null) { + callback.onFailure(lastException); + } + } else { + this.message = message; + this.callback = callback; + this.barrier = message instanceof BarrierInput; - // Volatile write, needs to be last - committed = true; + // Volatile write, needs to be last + this.committed = true; + } } void reset() { @@ -103,6 +112,7 @@ final class OutboundQueueEntry { void fail(final OutboundQueueException cause) { if (!completed) { + lastException = cause; completed = true; if (callback != null) { callback.onFailure(cause); @@ -112,4 +122,10 @@ final class OutboundQueueEntry { LOG.warn("Ignoring failure {} for completed message", cause); } } + + @VisibleForTesting + /** This method is only for testing to prove that after queue entry is completed there is not callback future */ + boolean hasCallback() { + return (callback != null); + } } diff --git a/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntryTest.java b/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntryTest.java new file mode 100644 index 00000000..7e0c71f7 --- /dev/null +++ b/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntryTest.java @@ -0,0 +1,140 @@ +package org.opendaylight.openflowjava.protocol.impl.core.connection; + +import com.google.common.util.concurrent.FutureCallback; +import javax.annotation.Nullable; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessageBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessageBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInputBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link OutboundQueueEntry} class test + */ +@RunWith(MockitoJUnitRunner.class) +public class OutboundQueueEntryTest { + + private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntryTest.class); + + private static final short VERSION = (short) 13; + private static final long VALUE = 1L; + + private Integer failCounter = 0; + + @Mock + private OfHeader ofHeader; + @Mock + private FutureCallback futureCallback; + + private final OutboundQueueEntry outboundQueueEntry = new OutboundQueueEntry(); + private final OfHeader barrierInput = new BarrierInputBuilder().setVersion(VERSION).setXid(VALUE).build(); + private final OfHeader packetOutInput = new PacketOutInputBuilder().setVersion(VERSION).setXid(VALUE).build(); + private final OfHeader multipartReplyMessage = + new MultipartReplyMessageBuilder().setVersion(VERSION).setXid(VALUE).setFlags(new MultipartRequestFlags(false)).build(); + private final OfHeader flowModInput = new FlowModInputBuilder().setVersion(VERSION).setXid(VALUE).build(); + private final OfHeader flowRemoved = new FlowRemovedMessageBuilder().setVersion(VERSION).setXid(VALUE).build(); + + @Test + public void commit() throws Exception { + outboundQueueEntry.commit(ofHeader, futureCallback); + Assert.assertTrue(outboundQueueEntry.isCommitted()); + Assert.assertFalse(outboundQueueEntry.isCompleted()); + Assert.assertFalse(outboundQueueEntry.isBarrier()); + } + + @Test + public void reset() throws Exception { + outboundQueueEntry.commit(ofHeader, futureCallback); + Assert.assertTrue(outboundQueueEntry.isCommitted()); + + outboundQueueEntry.reset(); + Assert.assertFalse(outboundQueueEntry.isCommitted()); + } + + @Test + public void isBarrier() throws Exception { + outboundQueueEntry.commit(barrierInput, futureCallback); + Assert.assertTrue(outboundQueueEntry.isBarrier()); + } + + @Test + public void takeMessage() throws Exception { + outboundQueueEntry.commit(packetOutInput, futureCallback); + outboundQueueEntry.takeMessage(); + Mockito.verify(futureCallback).onSuccess(Mockito.any()); + } + + @Test + public void complete() throws Exception { + final boolean result = outboundQueueEntry.complete(multipartReplyMessage); + Assert.assertTrue(result); + Assert.assertTrue(outboundQueueEntry.isCompleted()); + } + + @Test(expected = IllegalStateException.class) + public void completeTwice() throws Exception { + outboundQueueEntry.complete(multipartReplyMessage); + outboundQueueEntry.complete(multipartReplyMessage); + } + + @Test + public void fail() throws Exception { + outboundQueueEntry.commit(ofHeader, futureCallback); + outboundQueueEntry.fail(null); + Mockito.verify(futureCallback).onFailure(Mockito.any()); + } + + private Integer increaseFailCounter() { + return ++this.failCounter; + } + + @Test + public void test() throws Exception { + + final FutureCallback result = + new FutureCallback() { + + @Override + public void onSuccess(@Nullable OfHeader ofHeader) { + LOG.info("onSuccess: xid: {}", ofHeader.getXid()); + } + + @Override + public void onFailure(Throwable throwable) { + LOG.info("onFailure! Error: {}", throwable); + LOG.info("Failure called {} time", increaseFailCounter()); + } + }; + + /** This scenario creates entry with XID 1 then commit it, fail it and again commit it */ + /** Simulates behavior when entry is committed after fail */ + /** It shouldn't be in state completed and still have callback, it can consume all threads in thread pool */ + + /** Entry but no callback */ + outboundQueueEntry.commit(flowModInput, null); + /** Failed entry for whatever reason */ + outboundQueueEntry.fail(null); + /** Commit the same entry adding callback */ + outboundQueueEntry.commit(flowModInput, result); + + Assert.assertTrue(outboundQueueEntry.isCompleted()); + Assert.assertTrue(outboundQueueEntry.isCommitted()); + + /** This is check that no callback is in entry stuck */ + Assert.assertFalse(outboundQueueEntry.hasCallback()); + + Assert.assertTrue(this.failCounter == 1); + } + +} \ No newline at end of file -- 2.36.6