OPNFLWPLUG-1071 : Removal of javax.annotation.Nonnnull and replacement of javax.annot...
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / listener / MultiMsgCollectorImpl.java
index 9d0a6449eddc03a0e5e6ca69d7d087c3c27969d0..ac100b5267e6b3a59a6c406615a247d97f6ae9e4 100644 (file)
-/**
+/*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.impl.device.listener;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
-import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
+import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * <p>
- * openflowplugin-api
- * org.opendaylight.openflowplugin.impl.openflow.device
- *
- * Implementation for {@link MultiMsgCollector} interface
+ * Implementation for {@link MultiMsgCollector} interface.
  *
  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
  * @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
- *         </p>
- *         Created: Mar 23, 2015
  */
-@VisibleForTesting
-public class MultiMsgCollectorImpl implements MultiMsgCollector {
-
+public class MultiMsgCollectorImpl<T extends OfHeader> implements MultiMsgCollector<T> {
     private static final Logger LOG = LoggerFactory.getLogger(MultiMsgCollectorImpl.class);
-
-    private final Cache<Long, MultiCollectorObject> cache;
-    private DeviceReplyProcessor deviceReplyProcessor;
-
-    public MultiMsgCollectorImpl() {
-        this(DEFAULT_TIME_OUT);
-    }
-
-    public MultiMsgCollectorImpl(final int timeout) {
-        cache = initCacheBuilder(timeout).build();
-    }
-
-    private static RemovalListener<Long, MultiCollectorObject> getRemovalListener() {
-        return new RemovalListener<Long, MultiCollectorObject>() {
-            @Override
-            public void onRemoval(final RemovalNotification<Long, MultiCollectorObject> notification) {
-                LOG.trace("Removing data with XID {} from cache, cause: {}", notification.getKey(), notification.getCause());
-                switch (notification.getCause()) {
-                    case EXPIRED:
-                        notification.getValue().invalidateFutureByTimeout(notification.getKey());
-                }
-            }
-        };
-    }
-
-    private static CacheBuilder<Long, MultiCollectorObject> initCacheBuilder(final int timeout) {
-        return CacheBuilder.newBuilder()
-                .expireAfterAccess(timeout, TimeUnit.SECONDS)
-                .removalListener(getRemovalListener())
-                .initialCapacity(200)
-                .maximumSize(500)
-                .concurrencyLevel(1);
-    }
-
-    @Override
-    public void registerMultipartRequestContext(final RequestContext requestContext) {
-        cache.put(requestContext.getXid().getValue(), new MultiCollectorObject(requestContext));
+    private final List<T> replyCollection = new ArrayList<>();
+    private final RequestContext<List<T>> requestContext;
+    private final DeviceReplyProcessor deviceReplyProcessor;
+
+    public MultiMsgCollectorImpl(final DeviceReplyProcessor deviceReplyProcessor,
+                                 final RequestContext<List<T>> requestContext) {
+        this.deviceReplyProcessor = Preconditions.checkNotNull(deviceReplyProcessor);
+        this.requestContext = Preconditions.checkNotNull(requestContext);
     }
 
     @Override
-    public void addMultipartMsg(final MultipartReply reply) {
+    public void addMultipartMsg(@NonNull final T reply, final boolean reqMore,
+                                @Nullable final EventIdentifier eventIdentifier) {
         Preconditions.checkNotNull(reply);
+        Preconditions.checkNotNull(requestContext.getXid());
+        Preconditions.checkArgument(requestContext.getXid().getValue().equals(reply.getXid()));
         LOG.trace("Try to add Multipart reply msg with XID {}", reply.getXid());
-        final long xid = reply.getXid();
-        final MultiCollectorObject cachedRef = cache.getIfPresent(xid);
-        if (cachedRef == null) {
-            MultipartType multipartType = reply.getType();
-            LOG.trace("Orphaned multipart msg with XID : {} of type {}", xid, multipartType);
-            deviceReplyProcessor.processException(new Xid(xid),
-                    new DeviceDataException("unknown xid received for multipart of type " + multipartType));
-            return;
-        }
+        replyCollection.add(reply);
 
-        try {
-            cachedRef.add(reply);
-            LOG.trace("Multipart reply msg with XID {} added successfully.", reply.getXid());
-            if (!reply.getFlags().isOFPMPFREQMORE()) {
-                // flag OFPMFFREEQMORE false says "I'm a last one'
-                cachedRef.publishCollection(xid); // settable future has now whole collection
-                cache.invalidate(xid);              // we don't need a reference anymore - remove explicitly
-            }
-        } catch (DeviceDataException e) {
-            deviceReplyProcessor.processException(new Xid(xid), e);
+        if (!reqMore) {
+            endCollecting(eventIdentifier);
         }
     }
 
     @Override
-    public void setDeviceReplyProcessor(final DeviceReplyProcessor deviceReplyProcessor) {
-        this.deviceReplyProcessor = deviceReplyProcessor;
-    }
-
-    private class MultiCollectorObject {
-        private final List<MultipartReply> replyCollection;
-        private MultipartType msgType;
-        private final RequestContext requestContext;
-
-        MultiCollectorObject(final RequestContext requestContext) {
-            replyCollection = new ArrayList<>();
-            this.requestContext = requestContext;
-        }
+    public void endCollecting(@Nullable final EventIdentifier eventIdentifier) {
+        final RpcResult<List<T>> rpcResult = RpcResultBuilder.success(replyCollection).build();
 
-        void add(final MultipartReply reply) throws DeviceDataException {
-            /* Rise possible exception if it possible */
-            msgTypeValidation(reply.getType(), reply.getXid());
-            replyCollection.add(reply);
+        if (eventIdentifier != null) {
+            EventsTimeCounter.markEnd(eventIdentifier);
         }
 
-        void publishCollection(final long xid) {
-            final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
-                    .<List<MultipartReply>>success()
-                    .withResult(replyCollection)
-                    .build();
-            requestContext.setResult(rpcResult);
-            try {
-                requestContext.close();
-            } catch (final Exception e) {
-                LOG.warn("Closing RequestContext failed: {}", e.getMessage());
-                LOG.debug("Closing RequestContext failed.. ", e);
-            }
-            deviceReplyProcessor.processReply(new Xid(xid), replyCollection);
-        }
-
-        void invalidateFutureByTimeout(final long key) {
-            final String msg = "MultiMsgCollector can not wait for last multipart any more";
-            DeviceDataException deviceDataException = new DeviceDataException(msg);
-            final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
-                    .<List<OfHeader>>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
-                    .build();
-            requestContext.setResult(rpcResult);
-            try {
-                requestContext.close();
-            } catch (final Exception e) {
-                LOG.warn("Closing RequestContext failed: ", e);
-                LOG.debug("Closing RequestContext failed..", e);
-            }
-            deviceReplyProcessor.processException(new Xid(key), deviceDataException);
-        }
-
-        public RequestContext getRequestContext() {
-            return requestContext;
-        }
-
-        private void msgTypeValidation(final MultipartType type, final long key) throws DeviceDataException {
-            if (msgType == null) {
-                msgType = type;
-                return;
-            }
-            if (!msgType.equals(type)) {
-                final String msg = "MultiMsgCollector get incorrect multipart msg with type {}"
-                        + " but expected type is {}";
-                LOG.trace(msg, type, msgType);
-                throw new DeviceDataException("multipart message type mismatch");
-            }
-        }
+        requestContext.setResult(rpcResult);
+        requestContext.close();
+        deviceReplyProcessor.processReply(requestContext.getXid(), replyCollection);
     }
 }