From e51e4efb67681879f90685e4fa7c588f2e2a2f4d Mon Sep 17 00:00:00 2001 From: Martin Bobak Date: Tue, 24 Mar 2015 17:18:39 +0100 Subject: [PATCH] MultiMsgCollector for Multipart response messaging Note: OF device could return Multipart msg response for a requests. So we would like to have one-to-one contract for request-to-CollectionResponse and we will able to identify differences between actualState and a future state. * MultiMsgCollector interface - a definition * MultiMsgCollectorImpl class - implementation of definition with internal Cache for collecting MultipartResponseMessages * MultiMsgCollectorImplTest - test suite for testing basic functionality Change-Id: I920e2ee69929ed874304116da6711ab3f2dfb159 Signed-off-by: Timotej Kubas Signed-off-by: Vaclav Demcak Signed-off-by: Martin Bobak --- .../openflow/device/MultiMsgCollector.java | 52 +++++++ .../impl/device/MultiMsgCollectorImpl.java | 118 +++++++++++++++ .../device/MultiMsgCollectorImplTest.java | 141 ++++++++++++++++++ 3 files changed, 311 insertions(+) create mode 100644 openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/MultiMsgCollector.java create mode 100644 openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImpl.java create mode 100644 openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImplTest.java diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/MultiMsgCollector.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/MultiMsgCollector.java new file mode 100644 index 0000000000..22928deb0e --- /dev/null +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/MultiMsgCollector.java @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowplugin.api.openflow.device; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collection; +import javax.annotation.Nonnull; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; + +/** + * openflowplugin-api + * org.opendaylight.openflowplugin.api.openflow.device + * + * Collects multipart msgs from device by provided XID and returns them + * to the caller as request/collection response one-to-one contract. + * + * @author Vaclav Demcak + * @author Timotej Kubas + * + * Created: Mar 23, 2015 + */ +public interface MultiMsgCollector { + + /** + * Property used to know a max life time of Multipart collection in internal Cache + */ + final int DEFAULT_TIME_OUT = 10; + + /** + * Method registers a transaction id xid to the Multipart messages collector + * and returns Settable future with all MultipartReply. Method has to be called before + * send a request to the device, otherwise there is a small possibility to miss a first msg. + * + * @param xid + * @return + */ + ListenableFuture> registerMultipartMsg(@Nonnull long xid); + + /** + * Method adds a reply multipart message to the collection and if the message has marker + * "I'M A LAST" method set whole Collection to Future object and remove from cache. + * + * @param reply + */ + void addMultipartMsg(@Nonnull MultipartReply reply); +} diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImpl.java new file mode 100644 index 0000000000..e59e3c2db7 --- /dev/null +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImpl.java @@ -0,0 +1,118 @@ +/** + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowplugin.impl.device; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.opendaylight.openflowplugin.api.openflow.device.MultiMsgCollector; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * openflowplugin-api + * org.opendaylight.openflowplugin.impl.openflow.device + * + * Implementation for {@link MultiMsgCollector} interface + * + * @author Vaclav Demcak + * @author Timotej Kubas + * + * Created: Mar 23, 2015 + */ +@VisibleForTesting +class MultiMsgCollectorImpl implements MultiMsgCollector { + + private static final Logger LOG = LoggerFactory.getLogger(MultiMsgCollectorImpl.class); + + private final Cache cache; + + public MultiMsgCollectorImpl () { + cache = initCacheBuilder(DEFAULT_TIME_OUT).build(); + } + + public MultiMsgCollectorImpl (final int timeout) { + cache = initCacheBuilder(timeout).build(); + } + + private RemovalListener getRemovalListener() { + return new RemovalListener() { + @Override + public void onRemoval(final RemovalNotification notification) { + notification.getValue().invalidateFutureByTimeout(); + } + }; + } + + private CacheBuilder initCacheBuilder(final int timeout) { + return CacheBuilder.newBuilder() + .expireAfterWrite(timeout, TimeUnit.SECONDS) + .removalListener(getRemovalListener()) + .initialCapacity(200) + .maximumSize(500); + } + + @Override + public ListenableFuture> registerMultipartMsg(final long xid) { + final SettableFuture> future = SettableFuture.create(); + cache.put(xid, new MultiCollectorObject(future)); + return future; + } + + @Override + public void addMultipartMsg(final MultipartReply reply) { + Preconditions.checkNotNull(reply); + final Long xid = reply.getXid(); + final MultiCollectorObject cachedRef = cache.getIfPresent(xid); + if (cachedRef == null) { + LOG.info("Orphaned multipart msg with XID : {}", xid); + return; + } + cachedRef.add(reply); + if ( ! reply.getFlags().isOFPMPFREQMORE()) { + // flag OFPMFFREEQMORE false says "I'm a last one' + cachedRef.populateSettableFuture(); // settable futue has now whole collection + cache.invalidate(xid); // we don't need a reference anymore + } + } + + private class MultiCollectorObject { + private final SettableFuture> future; + private final Collection replyCollection; + + MultiCollectorObject (final SettableFuture> future) { + this.future = future; + replyCollection = new ArrayList<>(); + } + + void add(final MultipartReply reply) { + replyCollection.add(reply); + } + + void populateSettableFuture() { + future.set(replyCollection); + } + + void invalidateFutureByTimeout() { + final String msg = "MultiMsgCollector can not wait for last multipart any more"; + future.setException(new TimeoutException(msg)); + } + } +} diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImplTest.java new file mode 100644 index 0000000000..0fe9422146 --- /dev/null +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/MultiMsgCollectorImplTest.java @@ -0,0 +1,141 @@ +/** + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowplugin.impl.device; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessageBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCaseBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDesc; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDescBuilder; + +/** + * openflowplugin-api + * org.opendaylight.openflowplugin.impl.openflow.device + * + * Test class for testing basic method functionality for {@link org.opendaylight.openflowplugin.api.openflow.device.MultiMsgCollector} + * + * @author Vaclav Demcak + * @author Timotej Kubas + * + * Created: Mar 23, 2015 + */ +public class MultiMsgCollectorImplTest { + + private MultiMsgCollectorImpl collector; + + @Before + public void initialization() { + collector = new MultiMsgCollectorImpl(1); + } + + /** + * Test method for {@link org.opendaylight.openflowplugin.impl.openflow.device.MultiMsgCollectorImpl#registerMultipartMsg(org.opendaylight.openflowplugin.api.openflow.device.Xid)}. + * @throws ExecutionException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testRegisterMultipartMsg() throws InterruptedException, ExecutionException, TimeoutException{ + final long xid = 45L; + final String hwTestValue = "test-value"; + final ListenableFuture> response = collector.registerMultipartMsg(xid); + collector.addMultipartMsg(makeMultipartDescReply(xid, hwTestValue, false)); + + validateDescReply(response, xid, Collections.singletonList(hwTestValue)); + } + + /** + * Test method for {@link org.opendaylight.openflowplugin.impl.openflow.device.MultiMsgCollectorImpl#addMultipartMsg(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply)}. + * @throws TimeoutException + * @throws ExecutionException + * @throws InterruptedException + */ + @Test + public void testAddMultipartMsg() throws InterruptedException, ExecutionException, TimeoutException{ + final long xid = 22L; + final String hwTestValue1 = "test-value1"; + final String hwTestValue2 = "test-value2"; + final ListenableFuture> response = collector.registerMultipartMsg(xid); + collector.addMultipartMsg(makeMultipartDescReply(xid, hwTestValue1, true)); + collector.addMultipartMsg(makeMultipartDescReply(xid, hwTestValue2, false)); + + validateDescReply(response, xid, Arrays.asList(hwTestValue1, hwTestValue2)); + } + + /** + * Test could return NullPointerException if the body of addMultipartMsg not + */ + @Test + public void testAddMultipartMsgNotExpectedXid() { + final long xid = 23L; + final String hwTestValue = "test-value"; + collector.addMultipartMsg(makeMultipartDescReply(xid, hwTestValue, true)); + } + + /** + * Test could return NullPointerException if the body of addMultipartMsg not + * @throws InterruptedException + */ + @Test(timeout=20000) + public void testCheckExistMultipartMsgInCacheAfterTimeout() throws InterruptedException, ExecutionException { + final long xid = 24L; + final ListenableFuture> response = collector.registerMultipartMsg(xid); + assertNotNull(response); + Thread.sleep(2000); + collector.addMultipartMsg(makeMultipartDescReply(xid, "hw-text-value", false)); + try { + response.get(1L, TimeUnit.SECONDS); + fail("We expected timeout exception"); + } + catch (final TimeoutException e) { + // expected exception + } + } + + private void validateDescReply(final ListenableFuture> response, final long xid, + final Collection hwTestValues) throws InterruptedException, ExecutionException, TimeoutException { + assertNotNull(response); + assertNotNull(xid); + assertNotNull(hwTestValues); + + final Collection multipartReplyColl = response.get(1L, TimeUnit.SECONDS); + assertNotNull(multipartReplyColl); + assertTrue(multipartReplyColl.size() > 0); + for (final MultipartReply reply : multipartReplyColl) { + assertEquals(xid, reply.getXid().longValue()); + assertTrue(reply.getMultipartReplyBody() instanceof MultipartReplyDescCase); + final String replayHwTestString = ((MultipartReplyDescCase) reply.getMultipartReplyBody()) + .getMultipartReplyDesc().getHwDesc(); + assertTrue(hwTestValues.contains(replayHwTestString)); + } + } + + private MultipartReply makeMultipartDescReply(final long xid, final String value, final boolean isLast) { + final MultipartReplyDesc descValue = new MultipartReplyDescBuilder().setHwDesc(value).build(); + final MultipartReplyDescCase replyBody = new MultipartReplyDescCaseBuilder() + .setMultipartReplyDesc(descValue).build(); + return new MultipartReplyMessageBuilder().setMultipartReplyBody(replyBody) + .setXid(xid).setFlags(new MultipartRequestFlags(isLast)).build(); + } +} -- 2.36.6