Merge "Rework of handleServiceCall method"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / listener / MultiMsgCollectorImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device.listener;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalListener;
16 import com.google.common.cache.RemovalNotification;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.concurrent.TimeUnit;
20 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
21 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
22 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
23 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
24 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
27 import org.opendaylight.yangtools.yang.common.RpcError;
28 import org.opendaylight.yangtools.yang.common.RpcResult;
29 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33
34 /**
35  * <p>
36  * openflowplugin-api
37  * org.opendaylight.openflowplugin.impl.openflow.device
38  *
39  * Implementation for {@link MultiMsgCollector} interface
40  *
41  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
42  * @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
43  *         </p>
44  *         Created: Mar 23, 2015
45  */
46 @VisibleForTesting
47 public class MultiMsgCollectorImpl implements MultiMsgCollector {
48
49     private static final Logger LOG = LoggerFactory.getLogger(MultiMsgCollectorImpl.class);
50
51     private final Cache<Long, MultiCollectorObject> cache;
52     private DeviceReplyProcessor deviceReplyProcessor;
53
54     public MultiMsgCollectorImpl() {
55         this(DEFAULT_TIME_OUT);
56     }
57
58     public MultiMsgCollectorImpl(final int timeout) {
59         cache = initCacheBuilder(timeout).build();
60     }
61
62     private static RemovalListener<Long, MultiCollectorObject> getRemovalListener() {
63         return new RemovalListener<Long, MultiCollectorObject>() {
64             @Override
65             public void onRemoval(final RemovalNotification<Long, MultiCollectorObject> notification) {
66                 LOG.trace("Removing data with XID {} from cache, cause: {}", notification.getKey(), notification.getCause());
67                 switch (notification.getCause()) {
68                     case EXPIRED:
69                         notification.getValue().invalidateFutureByTimeout(notification.getKey());
70                 }
71             }
72         };
73     }
74
75     private static CacheBuilder<Long, MultiCollectorObject> initCacheBuilder(final int timeout) {
76         return CacheBuilder.newBuilder()
77                 .expireAfterAccess(timeout, TimeUnit.SECONDS)
78                 .removalListener(getRemovalListener())
79                 .initialCapacity(200)
80                 .maximumSize(500)
81                 .concurrencyLevel(1);
82     }
83
84     @Override
85     public void registerMultipartRequestContext(final RequestContext<List<MultipartReply>> requestContext) {
86         cache.put(requestContext.getXid().getValue(), new MultiCollectorObject(requestContext));
87     }
88
89     @Override
90     public void addMultipartMsg(final MultipartReply reply) {
91         Preconditions.checkNotNull(reply);
92         LOG.trace("Try to add Multipart reply msg with XID {}", reply.getXid());
93         final long xid = reply.getXid();
94         final MultiCollectorObject cachedRef = cache.getIfPresent(xid);
95         if (cachedRef == null) {
96             MultipartType multipartType = reply.getType();
97             LOG.trace("Orphaned multipart msg with XID : {} of type {}", xid, multipartType);
98             deviceReplyProcessor.processException(new Xid(xid),
99                     new DeviceDataException("unknown xid received for multipart of type " + multipartType));
100             return;
101         }
102
103         try {
104             cachedRef.add(reply);
105             LOG.trace("Multipart reply msg with XID {} added successfully.", reply.getXid());
106             if (!reply.getFlags().isOFPMPFREQMORE()) {
107                 // flag OFPMFFREEQMORE false says "I'm a last one'
108                 cachedRef.publishCollection(xid); // settable future has now whole collection
109                 cache.invalidate(xid);              // we don't need a reference anymore - remove explicitly
110             }
111         } catch (DeviceDataException e) {
112             deviceReplyProcessor.processException(new Xid(xid), e);
113         }
114     }
115
116     @Override
117     public void setDeviceReplyProcessor(final DeviceReplyProcessor deviceReplyProcessor) {
118         this.deviceReplyProcessor = deviceReplyProcessor;
119     }
120
121     @Override
122     public void invalidateRequestContext(final RequestContext<List<MultipartReply>> requestContext) {
123         MultiCollectorObject  multiCollectorObject = cache.getIfPresent(requestContext);
124         if (null != multiCollectorObject){
125             multiCollectorObject.invalidateFutureByTimeout(requestContext.getXid().getValue());
126         }
127     }
128
129     private class MultiCollectorObject {
130         private final List<MultipartReply> replyCollection = new ArrayList<>();
131         private final RequestContext<List<MultipartReply>> requestContext;
132         private MultipartType msgType;
133
134         MultiCollectorObject(final RequestContext<List<MultipartReply>> requestContext) {
135             this.requestContext = Preconditions.checkNotNull(requestContext);
136         }
137
138         void add(final MultipartReply reply) throws DeviceDataException {
139             /* Rise possible exception if it possible */
140             msgTypeValidation(reply.getType(), reply.getXid());
141             replyCollection.add(reply);
142         }
143
144         void publishCollection(final long xid) {
145             final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
146                     .<List<MultipartReply>>success()
147                     .withResult(replyCollection)
148                     .build();
149             requestContext.setResult(rpcResult);
150             try {
151                 requestContext.close();
152             } catch (final Exception e) {
153                 LOG.warn("Closing RequestContext failed: {}", e.getMessage());
154                 LOG.debug("Closing RequestContext failed.. ", e);
155             }
156             deviceReplyProcessor.processReply(new Xid(xid), replyCollection);
157         }
158
159         void invalidateFutureByTimeout(final long key) {
160             final String msg = "MultiMsgCollector can not wait for last multipart any more";
161             DeviceDataException deviceDataException = new DeviceDataException(msg);
162             final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
163                     .<List<MultipartReply>>failed()
164                     .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
165                     .build();
166             requestContext.setResult(rpcResult);
167             try {
168                 requestContext.close();
169             } catch (final Exception e) {
170                 LOG.warn("Closing RequestContext failed: ", e);
171                 LOG.debug("Closing RequestContext failed..", e);
172             }
173             deviceReplyProcessor.processException(new Xid(key), deviceDataException);
174         }
175
176
177         private void msgTypeValidation(final MultipartType type, final long key) throws DeviceDataException {
178             if (msgType == null) {
179                 msgType = type;
180                 return;
181             }
182             if (!msgType.equals(type)) {
183                 final String msg = "MultiMsgCollector get incorrect multipart msg with type {}"
184                         + " but expected type is {}";
185                 LOG.trace(msg, type, msgType);
186                 throw new DeviceDataException("multipart message type mismatch");
187             }
188         }
189     }
190 }