/**
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.openflowplugin.impl.connection;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.CheckForNull;
import org.opendaylight.openflowplugin.api.openflow.connection.MultiMsgCollector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* openflowplugin-api
* org.opendaylight.openflowplugin.impl.openflow.device
*
* Implementation for {@link MultiMsgCollector} interface
*
* @author Vaclav Demcak
* @author Timotej Kubas
*
* Created: Mar 23, 2015
*/
@VisibleForTesting
class MultiMsgCollectorImpl implements MultiMsgCollector {
private static final Logger LOG = LoggerFactory.getLogger(MultiMsgCollectorImpl.class);
private final Cache cache;
public MultiMsgCollectorImpl () {
cache = initCacheBuilder(DEFAULT_TIME_OUT).build();
}
public MultiMsgCollectorImpl (final int timeout) {
cache = initCacheBuilder(timeout).build();
}
private RemovalListener getRemovalListener() {
return new RemovalListener() {
@Override
public void onRemoval(final RemovalNotification notification) {
if ( ! notification.getValue().future.isDone()) {
LOG.warn("Removing data with XID {} from cache", notification.getKey());
notification.getValue().invalidateFutureByTimeout();
}
}
};
}
private CacheBuilder initCacheBuilder(final int timeout) {
return CacheBuilder.newBuilder()
.expireAfterAccess(timeout, TimeUnit.SECONDS)
.removalListener(getRemovalListener())
.initialCapacity(200)
.maximumSize(500)
.concurrencyLevel(1);
}
public void registerMultipartFutureMsg(final long xid, @CheckForNull final SettableFuture> future) {
Preconditions.checkArgument(future != null);
cache.put(xid, new MultiCollectorObject(future));
}
@Override
public ListenableFuture> registerMultipartMsg(final long xid) {
final SettableFuture> future = SettableFuture.create();
cache.put(xid, new MultiCollectorObject(future));
return future;
}
@Override
public void addMultipartMsg(final MultipartReply reply) {
Preconditions.checkNotNull(reply);
LOG.trace("Try to add Multipart reply msg with XID {}", reply.getXid());
final Long xid = reply.getXid();
final MultiCollectorObject cachedRef = cache.getIfPresent(xid);
if (cachedRef == null) {
LOG.info("Orphaned multipart msg with XID : {}", xid);
return;
}
cachedRef.add(reply);
if ( ! reply.getFlags().isOFPMPFREQMORE()) {
// flag OFPMFFREEQMORE false says "I'm a last one'
cachedRef.populateSettableFuture(); // settable futue has now whole collection
cache.invalidate(xid); // we don't need a reference anymore
}
}
private class MultiCollectorObject {
private final SettableFuture> future;
private final Collection replyCollection;
private MultipartType msgType;
MultiCollectorObject (final SettableFuture> future) {
this.future = future;
replyCollection = new ArrayList<>();
}
void add(final MultipartReply reply) {
/* Rise possible exception if it possible */
msgTypeValidation(reply.getType());
replyCollection.add(reply);
}
void populateSettableFuture() {
future.set(replyCollection);
}
void invalidateFutureByTimeout() {
final String msg = "MultiMsgCollector can not wait for last multipart any more";
future.setException(new TimeoutException(msg));
}
void invalidateFutureByInputType(final MultipartType type) {
final String msg = "MultiMsgCollector get incorrect multipart msg with type " + type
+ " but expected type is " + msgType;
future.setException(new IllegalArgumentException(msg));
}
private void msgTypeValidation(final MultipartType type) {
if (msgType == null) {
msgType = type;
return;
}
if ( ! msgType.equals(type)) {
invalidateFutureByInputType(type);
}
}
}
}