2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.device.listener;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalListener;
16 import com.google.common.cache.RemovalNotification;
17 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
18 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
19 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
20 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.TimeUnit;
32 * org.opendaylight.openflowplugin.impl.openflow.device
34 * Implementation for {@link MultiMsgCollector} interface
36 * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
37 * @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
39 * Created: Mar 23, 2015
42 public class MultiMsgCollectorImpl implements MultiMsgCollector {
44 private static final Logger LOG = LoggerFactory.getLogger(MultiMsgCollectorImpl.class);
46 private final Cache<Long, MultiCollectorObject> cache;
47 private DeviceReplyProcessor deviceReplyProcessor;
49 public MultiMsgCollectorImpl() {
50 cache = initCacheBuilder(DEFAULT_TIME_OUT).build();
53 public MultiMsgCollectorImpl(final int timeout) {
54 cache = initCacheBuilder(timeout).build();
57 private RemovalListener<Long, MultiCollectorObject> getRemovalListener() {
58 return new RemovalListener<Long, MultiCollectorObject>() {
60 public void onRemoval(final RemovalNotification<Long, MultiCollectorObject> notification) {
61 LOG.warn("Removing data with XID {} from cache", notification.getKey());
62 deviceReplyProcessor.processException(new Xid(notification.getKey()), new DeviceDataException("Data removed from cache"));
63 notification.getValue().invalidateFutureByTimeout(notification.getKey());
68 private CacheBuilder<Long, MultiCollectorObject> initCacheBuilder(final int timeout) {
69 return CacheBuilder.newBuilder()
70 .expireAfterAccess(timeout, TimeUnit.SECONDS)
71 .removalListener(getRemovalListener())
78 public void registerMultipartXid(final long xid) {
79 cache.put(xid, new MultiCollectorObject());
83 public void addMultipartMsg(final MultipartReply reply) {
84 Preconditions.checkNotNull(reply);
85 LOG.trace("Try to add Multipart reply msg with XID {}", reply.getXid());
86 final long xid = reply.getXid();
87 final MultiCollectorObject cachedRef = cache.getIfPresent(xid);
88 if (cachedRef == null) {
89 LOG.trace("Orphaned multipart msg with XID : {}", xid);
90 deviceReplyProcessor.processException(new Xid(xid), new DeviceDataException("unknown xid received"));
94 if (!reply.getFlags().isOFPMPFREQMORE()) {
95 // flag OFPMFFREEQMORE false says "I'm a last one'
96 cachedRef.populateSettableFuture(xid); // settable futue has now whole collection
97 cache.invalidate(xid); // we don't need a reference anymore
102 public void setDeviceReplyProcessor(DeviceReplyProcessor deviceReplyProcessor) {
103 this.deviceReplyProcessor = deviceReplyProcessor;
106 private class MultiCollectorObject {
107 private final List<MultipartReply> replyCollection;
108 private MultipartType msgType;
110 MultiCollectorObject() {
111 replyCollection = new ArrayList<>();
114 void add(final MultipartReply reply) {
115 /* Rise possible exception if it possible */
116 msgTypeValidation(reply.getType(), reply.getXid());
117 replyCollection.add(reply);
120 void populateSettableFuture(long xid) {
121 deviceReplyProcessor.processReply(new Xid(xid), replyCollection);
124 void invalidateFutureByTimeout(final long key) {
125 final String msg = "MultiMsgCollector can not wait for last multipart any more";
126 deviceReplyProcessor.processException(new Xid(key), new DeviceDataException(msg));
129 void invalidateFutureByInputType(final MultipartType type, final long key) {
130 final String msg = "MultiMsgCollector get incorrect multipart msg with type " + type
131 + " but expected type is " + msgType;
132 deviceReplyProcessor.processException(new Xid(key), new DeviceDataException(msg));
135 private void msgTypeValidation(final MultipartType type, final long key) {
136 if (msgType == null) {
140 if (!msgType.equals(type)) {
141 invalidateFutureByInputType(type, key);