MultiMsgCollector for Multipart response messaging 57/17057/6
authorMartin Bobak <mbobak@cisco.com>
Tue, 24 Mar 2015 16:18:39 +0000 (17:18 +0100)
committerMartin Bobak <mbobak@cisco.com>
Thu, 26 Mar 2015 14:17:54 +0000 (15:17 +0100)
Note: OF device could return Multipart msg response for a requests.
So we would like to have one-to-one contract for request-to-CollectionResponse
and we will able to identify differences between actualState and
a future state.

* MultiMsgCollector interface - a definition
* MultiMsgCollectorImpl class - implementation of definition with
internal Cache for collecting MultipartResponseMessages
* MultiMsgCollectorImplTest   - test suite for testing basic
functionality

Change-Id: I920e2ee69929ed874304116da6711ab3f2dfb159
Signed-off-by: Timotej Kubas <tkubas@cisco.com>
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
Signed-off-by: Martin Bobak <mbobak@cisco.com>
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/MultiMsgCollector.java [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImpl.java [new file with mode: 0644]
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImplTest.java [new file with mode: 0644]

diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/MultiMsgCollector.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/MultiMsgCollector.java
new file mode 100644 (file)
index 0000000..22928de
--- /dev/null
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.device;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+
+/**
+ * openflowplugin-api
+ * org.opendaylight.openflowplugin.api.openflow.device
+ *
+ * Collects multipart msgs from device by provided XID and returns them
+ * to the caller as request/collection response one-to-one contract.
+ *
+ * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
+ * @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
+ *
+ * Created: Mar 23, 2015
+ */
+public interface MultiMsgCollector {
+
+    /**
+     * Property used to know a max life time of Multipart collection in internal Cache
+     */
+    final int DEFAULT_TIME_OUT = 10;
+
+    /**
+     * Method registers a transaction id xid to the Multipart messages collector
+     * and returns Settable future with all MultipartReply. Method has to be called before
+     * send a request to the device, otherwise there is a small possibility to miss a first msg.
+     *
+     * @param xid
+     * @return
+     */
+    ListenableFuture<Collection<MultipartReply>> registerMultipartMsg(@Nonnull long xid);
+
+    /**
+     * Method adds a reply multipart message to the collection and if the message has marker
+     * "I'M A LAST" method set whole Collection to Future object and remove from cache.
+     *
+     * @param reply
+     */
+    void addMultipartMsg(@Nonnull MultipartReply reply);
+}
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImpl.java
new file mode 100644 (file)
index 0000000..e59e3c2
--- /dev/null
@@ -0,0 +1,118 @@
+/**
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.device;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.openflowplugin.api.openflow.device.MultiMsgCollector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * openflowplugin-api
+ * org.opendaylight.openflowplugin.impl.openflow.device
+ *
+ * Implementation for {@link MultiMsgCollector} interface
+ *
+ * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
+ * @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
+ *
+ * Created: Mar 23, 2015
+ */
+@VisibleForTesting
+class MultiMsgCollectorImpl implements MultiMsgCollector {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MultiMsgCollectorImpl.class);
+
+    private final Cache<Long, MultiCollectorObject> cache;
+
+    public MultiMsgCollectorImpl () {
+        cache = initCacheBuilder(DEFAULT_TIME_OUT).build();
+    }
+
+    public MultiMsgCollectorImpl (final int timeout) {
+        cache = initCacheBuilder(timeout).build();
+    }
+
+    private RemovalListener<Long, MultiCollectorObject> getRemovalListener() {
+        return new RemovalListener<Long, MultiCollectorObject>() {
+            @Override
+            public void onRemoval(final RemovalNotification<Long, MultiCollectorObject> notification) {
+                notification.getValue().invalidateFutureByTimeout();
+            }
+        };
+    }
+
+    private CacheBuilder<Long, MultiCollectorObject> initCacheBuilder(final int timeout) {
+        return CacheBuilder.newBuilder()
+                .expireAfterWrite(timeout, TimeUnit.SECONDS)
+                .removalListener(getRemovalListener())
+                .initialCapacity(200)
+                .maximumSize(500);
+    }
+
+    @Override
+    public ListenableFuture<Collection<MultipartReply>> registerMultipartMsg(final long xid) {
+        final SettableFuture<Collection<MultipartReply>> future = SettableFuture.create();
+        cache.put(xid, new MultiCollectorObject(future));
+        return future;
+    }
+
+    @Override
+    public void addMultipartMsg(final MultipartReply reply) {
+        Preconditions.checkNotNull(reply);
+        final Long xid = reply.getXid();
+        final MultiCollectorObject cachedRef = cache.getIfPresent(xid);
+        if (cachedRef == null) {
+            LOG.info("Orphaned multipart msg with XID : {}", xid);
+            return;
+        }
+        cachedRef.add(reply);
+        if ( ! reply.getFlags().isOFPMPFREQMORE()) {
+            // flag OFPMFFREEQMORE false says "I'm a last one'
+            cachedRef.populateSettableFuture(); // settable futue has now whole collection
+            cache.invalidate(xid);              // we don't need a reference anymore
+        }
+    }
+
+    private class MultiCollectorObject {
+        private final SettableFuture<Collection<MultipartReply>> future;
+        private final Collection<MultipartReply> replyCollection;
+
+        MultiCollectorObject (final SettableFuture<Collection<MultipartReply>> future) {
+            this.future = future;
+            replyCollection = new ArrayList<>();
+        }
+
+        void add(final MultipartReply reply) {
+            replyCollection.add(reply);
+        }
+
+        void populateSettableFuture() {
+            future.set(replyCollection);
+        }
+
+        void invalidateFutureByTimeout() {
+            final String msg = "MultiMsgCollector can not wait for last multipart any more";
+            future.setException(new TimeoutException(msg));
+        }
+    }
+}
diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImplTest.java
new file mode 100644 (file)
index 0000000..0fe9422
--- /dev/null
@@ -0,0 +1,141 @@
+/**
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.device;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDesc;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDescBuilder;
+
+/**
+ * openflowplugin-api
+ * org.opendaylight.openflowplugin.impl.openflow.device
+ *
+ * Test class for testing basic method functionality for {@link org.opendaylight.openflowplugin.api.openflow.device.MultiMsgCollector}
+ *
+ * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
+ * @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
+ *
+ * Created: Mar 23, 2015
+ */
+public class MultiMsgCollectorImplTest {
+
+    private MultiMsgCollectorImpl collector;
+
+    @Before
+    public void initialization() {
+        collector = new MultiMsgCollectorImpl(1);
+    }
+
+    /**
+     * Test method for {@link org.opendaylight.openflowplugin.impl.openflow.device.MultiMsgCollectorImpl#registerMultipartMsg(org.opendaylight.openflowplugin.api.openflow.device.Xid)}.
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void testRegisterMultipartMsg() throws InterruptedException, ExecutionException, TimeoutException{
+        final long xid = 45L;
+        final String hwTestValue = "test-value";
+        final ListenableFuture<Collection<MultipartReply>> response = collector.registerMultipartMsg(xid);
+        collector.addMultipartMsg(makeMultipartDescReply(xid, hwTestValue, false));
+
+        validateDescReply(response, xid, Collections.singletonList(hwTestValue));
+    }
+
+    /**
+     * Test method for {@link org.opendaylight.openflowplugin.impl.openflow.device.MultiMsgCollectorImpl#addMultipartMsg(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply)}.
+     * @throws TimeoutException
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    @Test
+    public void testAddMultipartMsg() throws InterruptedException, ExecutionException, TimeoutException{
+        final long xid = 22L;
+        final String hwTestValue1 = "test-value1";
+        final String hwTestValue2 = "test-value2";
+        final ListenableFuture<Collection<MultipartReply>> response = collector.registerMultipartMsg(xid);
+        collector.addMultipartMsg(makeMultipartDescReply(xid, hwTestValue1, true));
+        collector.addMultipartMsg(makeMultipartDescReply(xid, hwTestValue2, false));
+
+        validateDescReply(response, xid, Arrays.asList(hwTestValue1, hwTestValue2));
+    }
+
+    /**
+     * Test could return NullPointerException if the body of addMultipartMsg not
+     */
+    @Test
+    public void testAddMultipartMsgNotExpectedXid() {
+        final long xid = 23L;
+        final String hwTestValue = "test-value";
+        collector.addMultipartMsg(makeMultipartDescReply(xid, hwTestValue, true));
+    }
+
+    /**
+     * Test could return NullPointerException if the body of addMultipartMsg not
+     * @throws InterruptedException
+     */
+    @Test(timeout=20000)
+    public void testCheckExistMultipartMsgInCacheAfterTimeout() throws InterruptedException, ExecutionException {
+        final long xid = 24L;
+        final ListenableFuture<Collection<MultipartReply>> response = collector.registerMultipartMsg(xid);
+        assertNotNull(response);
+        Thread.sleep(2000);
+        collector.addMultipartMsg(makeMultipartDescReply(xid, "hw-text-value", false));
+        try {
+            response.get(1L, TimeUnit.SECONDS);
+            fail("We expected timeout exception");
+        }
+        catch (final TimeoutException e) {
+            // expected exception
+        }
+    }
+
+    private void validateDescReply(final ListenableFuture<Collection<MultipartReply>> response, final long xid,
+            final Collection<String> hwTestValues) throws InterruptedException, ExecutionException, TimeoutException {
+        assertNotNull(response);
+        assertNotNull(xid);
+        assertNotNull(hwTestValues);
+
+        final Collection<MultipartReply> multipartReplyColl = response.get(1L, TimeUnit.SECONDS);
+        assertNotNull(multipartReplyColl);
+        assertTrue(multipartReplyColl.size() > 0);
+        for (final MultipartReply reply : multipartReplyColl) {
+            assertEquals(xid, reply.getXid().longValue());
+            assertTrue(reply.getMultipartReplyBody() instanceof MultipartReplyDescCase);
+            final String replayHwTestString = ((MultipartReplyDescCase) reply.getMultipartReplyBody())
+                    .getMultipartReplyDesc().getHwDesc();
+            assertTrue(hwTestValues.contains(replayHwTestString));
+        }
+    }
+
+    private MultipartReply makeMultipartDescReply(final long xid, final String value, final boolean isLast) {
+        final MultipartReplyDesc descValue = new MultipartReplyDescBuilder().setHwDesc(value).build();
+        final MultipartReplyDescCase replyBody = new MultipartReplyDescCaseBuilder()
+                                                        .setMultipartReplyDesc(descValue).build();
+        return new MultipartReplyMessageBuilder().setMultipartReplyBody(replyBody)
+                .setXid(xid).setFlags(new MultipartRequestFlags(isLast)).build();
+    }
+}