Slice front-end request messages
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / test / java / org / opendaylight / controller / cluster / messaging / MessageSlicerTest.java
index 20b6cab818ae641786361cb90874190dde1e8f49..e71b7154e4a5ff4fb9dcea0e31634d835c85c745 100644 (file)
@@ -8,11 +8,14 @@
 package org.opendaylight.controller.cluster.messaging;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
@@ -26,6 +29,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.yangtools.concepts.Identifier;
 
 /**
@@ -47,13 +51,18 @@ public class MessageSlicerTest extends AbstractMessagingTest {
 
     @Test
     public void testHandledMessages() {
-        final MessageSliceReply reply = MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref());
-        assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
-        assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
-
         try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
+            MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+            final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref());
+            assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
+            assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
+
             assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
             assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
+            assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+                    IDENTIFIER, 1,testProbe.ref())));
+            assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+                    new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref())));
         }
     }
 
@@ -66,8 +75,9 @@ public class MessageSlicerTest extends AbstractMessagingTest {
         doThrow(mockFailure).when(mockFiledBackedStream).flush();
 
         try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) {
-            slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
-                    mockOnFailureCallback);
+            final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+                    testProbe.ref(), mockOnFailureCallback);
+            assertFalse(wasSliced);
 
             assertFailureCallback(IOException.class);
             verify(mockFiledBackedStream).cleanup();
@@ -81,8 +91,9 @@ public class MessageSlicerTest extends AbstractMessagingTest {
         doThrow(mockFailure).when(mockByteSource).openBufferedStream();
 
         try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) {
-            slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
-                    mockOnFailureCallback);
+            final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+                    testProbe.ref(), mockOnFailureCallback);
+            assertFalse(wasSliced);
 
             assertFailureCallback(IOException.class);
             verify(mockFiledBackedStream).cleanup();
@@ -94,8 +105,9 @@ public class MessageSlicerTest extends AbstractMessagingTest {
         doReturn(0).when(mockInputStream).read(any(byte[].class));
 
         try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) {
-            slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
-                    mockOnFailureCallback);
+            final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+                    testProbe.ref(), mockOnFailureCallback);
+            assertFalse(wasSliced);
 
             assertFailureCallback(IOException.class);
             verify(mockFiledBackedStream).cleanup();
@@ -105,9 +117,10 @@ public class MessageSlicerTest extends AbstractMessagingTest {
     @Test
     public void testMessageSliceReplyWithNoState() {
         try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
-            slicer.handleMessage(MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref()));
+            MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+            slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref()));
             final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
-            assertEquals("Identifier", IDENTIFIER, abortSlicing.getIdentifier());
+            assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier());
         }
     }
 
@@ -125,6 +138,27 @@ public class MessageSlicerTest extends AbstractMessagingTest {
         verifyNoMoreInteractions(mockOnFailureCallback);
     }
 
+    @Test
+    public void testCancelSlicing() throws IOException {
+        doReturn(1).when(mockInputStream).read(any(byte[].class));
+
+        final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
+        slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(mockFiledBackedStream)
+                .sendTo(testProbe.ref()).replyTo(testProbe.ref()).onFailureCallback(mockOnFailureCallback).build());
+
+        final FileBackedOutputStream mockFiledBackedStream2 = mock(FileBackedOutputStream.class);
+        setupMockFiledBackedStream(mockFiledBackedStream2);
+        slicer.slice(SliceOptions.builder().identifier(new StringIdentifier("test2"))
+                .fileBackedOutputStream(mockFiledBackedStream2).sendTo(testProbe.ref()).replyTo(testProbe.ref())
+                .onFailureCallback(mockOnFailureCallback).build());
+
+        slicer.cancelSlicing(id -> id.equals(IDENTIFIER));
+
+        verify(mockFiledBackedStream).cleanup();
+        verify(mockFiledBackedStream2, never()).cleanup();
+        verifyNoMoreInteractions(mockOnFailureCallback);
+    }
+
     @Test
     public void testCheckExpiredSlicedMessageState() throws IOException {
         doReturn(1).when(mockInputStream).read(any(byte[].class));
@@ -132,7 +166,7 @@ public class MessageSlicerTest extends AbstractMessagingTest {
         final int expiryDuration = 200;
         try (MessageSlicer slicer = MessageSlicer.builder().messageSliceSize(1)
                 .logContext("testCheckExpiredSlicedMessageState")
-                .filedBackedStreamFactory(mockFiledBackedStreamFactory)
+                .fileBackedStreamFactory(mockFiledBackedStreamFactory)
                 .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
             slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
                     mockOnFailureCallback);
@@ -153,12 +187,12 @@ public class MessageSlicerTest extends AbstractMessagingTest {
 
     private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
         return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
-                .filedBackedStreamFactory(mockFiledBackedStreamFactory).build();
+                .fileBackedStreamFactory(mockFiledBackedStreamFactory).build();
     }
 
-    static void slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
+    static boolean slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
             ActorRef replyTo, Consumer<Throwable> onFailureCallback) {
-        slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo).replyTo(replyTo)
-                .onFailureCallback(onFailureCallback).build());
+        return slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo)
+                .replyTo(replyTo).onFailureCallback(onFailureCallback).build());
     }
 }