OPNFLWPLUG-1071 : Removal of javax.annotation.Nonnnull and replacement of javax.annot...
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / listener / MultiMsgCollectorImpl.java
index 6c67a3144af198612db7988c7921daf21835a534..ac100b5267e6b3a59a6c406615a247d97f6ae9e4 100644 (file)
-/**
+/*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.impl.device.listener;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.annotation.CheckForNull;
-
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
-import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
+import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * openflowplugin-api
- * org.opendaylight.openflowplugin.impl.openflow.device
- *
- * Implementation for {@link MultiMsgCollector} interface
+ * Implementation for {@link MultiMsgCollector} interface.
  *
  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
  * @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
- *
- * Created: Mar 23, 2015
  */
-@VisibleForTesting
-class MultiMsgCollectorImpl implements MultiMsgCollector {
-
+public class MultiMsgCollectorImpl<T extends OfHeader> implements MultiMsgCollector<T> {
     private static final Logger LOG = LoggerFactory.getLogger(MultiMsgCollectorImpl.class);
-
-    private final Cache<Long, MultiCollectorObject> cache;
-    private DeviceReplyProcessor deviceReplyProcessor;
-
-    public MultiMsgCollectorImpl () {
-        cache = initCacheBuilder(DEFAULT_TIME_OUT).build();
-    }
-
-    public MultiMsgCollectorImpl (final int timeout) {
-        cache = initCacheBuilder(timeout).build();
-    }
-
-    private RemovalListener<Long, MultiCollectorObject> getRemovalListener() {
-        return new RemovalListener<Long, MultiCollectorObject>() {
-            @Override
-            public void onRemoval(final RemovalNotification<Long, MultiCollectorObject> notification) {
-                if ( ! notification.getValue().future.isDone()) {
-                    LOG.warn("Removing data with XID {} from cache", notification.getKey());
-                    notification.getValue().invalidateFutureByTimeout();
-                }
-            }
-        };
-    }
-
-    private CacheBuilder<Long, MultiCollectorObject> initCacheBuilder(final int timeout) {
-        return CacheBuilder.newBuilder()
-                .expireAfterAccess(timeout, TimeUnit.SECONDS)
-                .removalListener(getRemovalListener())
-                .initialCapacity(200)
-                .maximumSize(500)
-                .concurrencyLevel(1);
-    }
-
-    public void registerMultipartFutureMsg(final long xid, @CheckForNull final SettableFuture<Collection<MultipartReply>> future) {
-        Preconditions.checkArgument(future != null);
-        cache.put(xid, new MultiCollectorObject(future));
+    private final List<T> replyCollection = new ArrayList<>();
+    private final RequestContext<List<T>> requestContext;
+    private final DeviceReplyProcessor deviceReplyProcessor;
+
+    public MultiMsgCollectorImpl(final DeviceReplyProcessor deviceReplyProcessor,
+                                 final RequestContext<List<T>> requestContext) {
+        this.deviceReplyProcessor = Preconditions.checkNotNull(deviceReplyProcessor);
+        this.requestContext = Preconditions.checkNotNull(requestContext);
     }
 
     @Override
-    public ListenableFuture<Collection<MultipartReply>> registerMultipartMsg(final long xid) {
-        final SettableFuture<Collection<MultipartReply>> future = SettableFuture.create();
-        cache.put(xid, new MultiCollectorObject(future));
-        return future;
-    }
-
-    @Override
-    public void addMultipartMsg(final MultipartReply reply) {
+    public void addMultipartMsg(@NonNull final T reply, final boolean reqMore,
+                                @Nullable final EventIdentifier eventIdentifier) {
         Preconditions.checkNotNull(reply);
+        Preconditions.checkNotNull(requestContext.getXid());
+        Preconditions.checkArgument(requestContext.getXid().getValue().equals(reply.getXid()));
         LOG.trace("Try to add Multipart reply msg with XID {}", reply.getXid());
-        final Long xid = reply.getXid();
-        final MultiCollectorObject cachedRef = cache.getIfPresent(xid);
-        if (cachedRef == null) {
-            LOG.trace("Orphaned multipart msg with XID : {}", xid);
-            deviceReplyProcessor.processException(new Xid(xid), new DeviceDataException("unknown xid received"));
-            return;
-        }
-        cachedRef.add(reply);
-        if ( ! reply.getFlags().isOFPMPFREQMORE()) {
-            // flag OFPMFFREEQMORE false says "I'm a last one'
-            cachedRef.populateSettableFuture(xid); // settable futue has now whole collection
-            cache.invalidate(xid);              // we don't need a reference anymore
-        }
-    }
-
-    public void setDeviceReplyProcessor(DeviceReplyProcessor deviceReplyProcessor) {
-        this.deviceReplyProcessor = deviceReplyProcessor;
-    }
-
-    private class MultiCollectorObject {
-        private final SettableFuture<Collection<MultipartReply>> future;
-        private final List<MultipartReply> replyCollection;
-        private MultipartType msgType;
+        replyCollection.add(reply);
 
-        MultiCollectorObject (final SettableFuture<Collection<MultipartReply>> future) {
-            this.future = future;
-            replyCollection = new ArrayList<>();
-        }
-
-        void add(final MultipartReply reply) {
-            /* Rise possible exception if it possible */
-            msgTypeValidation(reply.getType());
-            replyCollection.add(reply);
-        }
-
-        void populateSettableFuture(long xid) {
-            future.set(replyCollection);
-            deviceReplyProcessor.processReply(new Xid(xid), replyCollection);
+        if (!reqMore) {
+            endCollecting(eventIdentifier);
         }
+    }
 
-        void invalidateFutureByTimeout() {
-            final String msg = "MultiMsgCollector can not wait for last multipart any more";
-            future.setException(new TimeoutException(msg));
-        }
+    @Override
+    public void endCollecting(@Nullable final EventIdentifier eventIdentifier) {
+        final RpcResult<List<T>> rpcResult = RpcResultBuilder.success(replyCollection).build();
 
-        void invalidateFutureByInputType(final MultipartType type) {
-            final String msg = "MultiMsgCollector get incorrect multipart msg with type " + type
-                    + " but expected type is " + msgType;
-            future.setException(new IllegalArgumentException(msg));
+        if (eventIdentifier != null) {
+            EventsTimeCounter.markEnd(eventIdentifier);
         }
 
-        private void msgTypeValidation(final MultipartType type) {
-            if (msgType == null) {
-                msgType = type;
-                return;
-            }
-            if ( ! msgType.equals(type)) {
-                invalidateFutureByInputType(type);
-            }
-        }
+        requestContext.setResult(rpcResult);
+        requestContext.close();
+        deviceReplyProcessor.processReply(requestContext.getXid(), replyCollection);
     }
 }