MultiMsgCollector for Multipart response messaging
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / MultiMsgCollectorImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalListener;
16 import com.google.common.cache.RemovalNotification;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23 import org.opendaylight.openflowplugin.api.openflow.device.MultiMsgCollector;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28
29 /**
30  * openflowplugin-api
31  * org.opendaylight.openflowplugin.impl.openflow.device
32  *
33  * Implementation for {@link MultiMsgCollector} interface
34  *
35  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
36  * @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
37  *
38  * Created: Mar 23, 2015
39  */
40 @VisibleForTesting
41 class MultiMsgCollectorImpl implements MultiMsgCollector {
42
43     private static final Logger LOG = LoggerFactory.getLogger(MultiMsgCollectorImpl.class);
44
45     private final Cache<Long, MultiCollectorObject> cache;
46
47     public MultiMsgCollectorImpl () {
48         cache = initCacheBuilder(DEFAULT_TIME_OUT).build();
49     }
50
51     public MultiMsgCollectorImpl (final int timeout) {
52         cache = initCacheBuilder(timeout).build();
53     }
54
55     private RemovalListener<Long, MultiCollectorObject> getRemovalListener() {
56         return new RemovalListener<Long, MultiCollectorObject>() {
57             @Override
58             public void onRemoval(final RemovalNotification<Long, MultiCollectorObject> notification) {
59                 notification.getValue().invalidateFutureByTimeout();
60             }
61         };
62     }
63
64     private CacheBuilder<Long, MultiCollectorObject> initCacheBuilder(final int timeout) {
65         return CacheBuilder.newBuilder()
66                 .expireAfterWrite(timeout, TimeUnit.SECONDS)
67                 .removalListener(getRemovalListener())
68                 .initialCapacity(200)
69                 .maximumSize(500);
70     }
71
72     @Override
73     public ListenableFuture<Collection<MultipartReply>> registerMultipartMsg(final long xid) {
74         final SettableFuture<Collection<MultipartReply>> future = SettableFuture.create();
75         cache.put(xid, new MultiCollectorObject(future));
76         return future;
77     }
78
79     @Override
80     public void addMultipartMsg(final MultipartReply reply) {
81         Preconditions.checkNotNull(reply);
82         final Long xid = reply.getXid();
83         final MultiCollectorObject cachedRef = cache.getIfPresent(xid);
84         if (cachedRef == null) {
85             LOG.info("Orphaned multipart msg with XID : {}", xid);
86             return;
87         }
88         cachedRef.add(reply);
89         if ( ! reply.getFlags().isOFPMPFREQMORE()) {
90             // flag OFPMFFREEQMORE false says "I'm a last one'
91             cachedRef.populateSettableFuture(); // settable futue has now whole collection
92             cache.invalidate(xid);              // we don't need a reference anymore
93         }
94     }
95
96     private class MultiCollectorObject {
97         private final SettableFuture<Collection<MultipartReply>> future;
98         private final Collection<MultipartReply> replyCollection;
99
100         MultiCollectorObject (final SettableFuture<Collection<MultipartReply>> future) {
101             this.future = future;
102             replyCollection = new ArrayList<>();
103         }
104
105         void add(final MultipartReply reply) {
106             replyCollection.add(reply);
107         }
108
109         void populateSettableFuture() {
110             future.set(replyCollection);
111         }
112
113         void invalidateFutureByTimeout() {
114             final String msg = "MultiMsgCollector can not wait for last multipart any more";
115             future.setException(new TimeoutException(msg));
116         }
117     }
118 }