/** * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opendaylight.openflowplugin.api.openflow.device.MultiMsgCollector; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * openflowplugin-api * org.opendaylight.openflowplugin.impl.openflow.device * * Implementation for {@link MultiMsgCollector} interface * * @author Vaclav Demcak * @author Timotej Kubas * * Created: Mar 23, 2015 */ @VisibleForTesting class MultiMsgCollectorImpl implements MultiMsgCollector { private static final Logger LOG = LoggerFactory.getLogger(MultiMsgCollectorImpl.class); private final Cache cache; public MultiMsgCollectorImpl () { cache = initCacheBuilder(DEFAULT_TIME_OUT).build(); } public MultiMsgCollectorImpl (final int timeout) { cache = initCacheBuilder(timeout).build(); } private RemovalListener getRemovalListener() { return new RemovalListener() { @Override public void onRemoval(final RemovalNotification notification) { notification.getValue().invalidateFutureByTimeout(); } }; } private CacheBuilder initCacheBuilder(final int timeout) { return CacheBuilder.newBuilder() .expireAfterWrite(timeout, TimeUnit.SECONDS) .removalListener(getRemovalListener()) .initialCapacity(200) .maximumSize(500); } @Override public ListenableFuture> registerMultipartMsg(final long xid) { final SettableFuture> future = SettableFuture.create(); cache.put(xid, new MultiCollectorObject(future)); return future; } @Override public void addMultipartMsg(final MultipartReply reply) { Preconditions.checkNotNull(reply); final Long xid = reply.getXid(); final MultiCollectorObject cachedRef = cache.getIfPresent(xid); if (cachedRef == null) { LOG.info("Orphaned multipart msg with XID : {}", xid); return; } cachedRef.add(reply); if ( ! reply.getFlags().isOFPMPFREQMORE()) { // flag OFPMFFREEQMORE false says "I'm a last one' cachedRef.populateSettableFuture(); // settable futue has now whole collection cache.invalidate(xid); // we don't need a reference anymore } } private class MultiCollectorObject { private final SettableFuture> future; private final Collection replyCollection; MultiCollectorObject (final SettableFuture> future) { this.future = future; replyCollection = new ArrayList<>(); } void add(final MultipartReply reply) { replyCollection.add(reply); } void populateSettableFuture() { future.set(replyCollection); } void invalidateFutureByTimeout() { final String msg = "MultiMsgCollector can not wait for last multipart any more"; future.setException(new TimeoutException(msg)); } } }