2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.device;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalListener;
16 import com.google.common.cache.RemovalNotification;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23 import org.opendaylight.openflowplugin.api.openflow.device.MultiMsgCollector;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
31 * org.opendaylight.openflowplugin.impl.openflow.device
33 * Implementation for {@link MultiMsgCollector} interface
35 * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
36 * @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
38 * Created: Mar 23, 2015
41 class MultiMsgCollectorImpl implements MultiMsgCollector {
43 private static final Logger LOG = LoggerFactory.getLogger(MultiMsgCollectorImpl.class);
45 private final Cache<Long, MultiCollectorObject> cache;
47 public MultiMsgCollectorImpl () {
48 cache = initCacheBuilder(DEFAULT_TIME_OUT).build();
51 public MultiMsgCollectorImpl (final int timeout) {
52 cache = initCacheBuilder(timeout).build();
55 private RemovalListener<Long, MultiCollectorObject> getRemovalListener() {
56 return new RemovalListener<Long, MultiCollectorObject>() {
58 public void onRemoval(final RemovalNotification<Long, MultiCollectorObject> notification) {
59 notification.getValue().invalidateFutureByTimeout();
64 private CacheBuilder<Long, MultiCollectorObject> initCacheBuilder(final int timeout) {
65 return CacheBuilder.newBuilder()
66 .expireAfterWrite(timeout, TimeUnit.SECONDS)
67 .removalListener(getRemovalListener())
73 public ListenableFuture<Collection<MultipartReply>> registerMultipartMsg(final long xid) {
74 final SettableFuture<Collection<MultipartReply>> future = SettableFuture.create();
75 cache.put(xid, new MultiCollectorObject(future));
80 public void addMultipartMsg(final MultipartReply reply) {
81 Preconditions.checkNotNull(reply);
82 final Long xid = reply.getXid();
83 final MultiCollectorObject cachedRef = cache.getIfPresent(xid);
84 if (cachedRef == null) {
85 LOG.info("Orphaned multipart msg with XID : {}", xid);
89 if ( ! reply.getFlags().isOFPMPFREQMORE()) {
90 // flag OFPMFFREEQMORE false says "I'm a last one'
91 cachedRef.populateSettableFuture(); // settable futue has now whole collection
92 cache.invalidate(xid); // we don't need a reference anymore
96 private class MultiCollectorObject {
97 private final SettableFuture<Collection<MultipartReply>> future;
98 private final Collection<MultipartReply> replyCollection;
100 MultiCollectorObject (final SettableFuture<Collection<MultipartReply>> future) {
101 this.future = future;
102 replyCollection = new ArrayList<>();
105 void add(final MultipartReply reply) {
106 replyCollection.add(reply);
109 void populateSettableFuture() {
110 future.set(replyCollection);
113 void invalidateFutureByTimeout() {
114 final String msg = "MultiMsgCollector can not wait for last multipart any more";
115 future.setException(new TimeoutException(msg));