Bump MRI upstreams
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / AbstractMultipartRequestOnTheFlyCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.services;
9
10 import com.google.common.util.concurrent.Service;
11 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
12 import java.util.Collections;
13 import java.util.List;
14 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
15 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
16 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
17 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
18 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
19 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
20 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
21 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
22 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
23 import org.opendaylight.openflowplugin.impl.statistics.StatisticsGatheringUtils;
24 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
25 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
29 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
30 import org.opendaylight.yangtools.yang.common.ErrorType;
31 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public abstract class AbstractMultipartRequestOnTheFlyCallback<T extends OfHeader>
36                                                         extends AbstractMultipartRequestCallback<T> {
37     private static final Logger LOG = LoggerFactory.getLogger(AbstractMultipartRequestOnTheFlyCallback.class);
38
39     private final DeviceInfo deviceInfo;
40     private final EventIdentifier doneEventIdentifier;
41     private final TxFacade txFacade;
42     private final MultipartWriterProvider statisticsWriterProvider;
43     private final DeviceRegistry deviceRegistry;
44     private final ConvertorExecutor convertorExecutor;
45
46     private volatile Service.State gatheringState = Service.State.NEW;
47
48     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
49         justification = "getMultipartType() should be okay to call")
50     public AbstractMultipartRequestOnTheFlyCallback(final RequestContext<List<T>> context, final Class<?> requestType,
51                                                     final DeviceContext deviceContext,
52                                                     final EventIdentifier eventIdentifier,
53                                                     final MultipartWriterProvider statisticsWriterProvider,
54                                                     final ConvertorExecutor convertorExecutor) {
55         super(context, requestType, deviceContext, eventIdentifier);
56         deviceInfo = deviceContext.getDeviceInfo();
57         doneEventIdentifier =
58                 new EventIdentifier(getMultipartType().name(), deviceContext.getDeviceInfo().getNodeId().toString());
59         txFacade = deviceContext;
60         deviceRegistry = deviceContext;
61         this.statisticsWriterProvider = statisticsWriterProvider;
62         this.convertorExecutor = convertorExecutor;
63     }
64
65     @Override
66     @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch"})
67     public void onSuccess(final OfHeader result) {
68         if (result == null) {
69             LOG.warn("Response received was null.");
70
71             if (!Service.State.TERMINATED.equals(gatheringState)) {
72                 endCollecting(true);
73             }
74
75             return;
76         } else if (Service.State.TERMINATED.equals(gatheringState)) {
77             LOG.warn("Unexpected response received: xid={}, {}", result.getXid(), result.implementedInterface());
78             return;
79         }
80
81         if (!isMultipart(result)) {
82             LOG.warn("Unexpected response type received: {}.", result.getClass());
83             setResult(RpcResultBuilder.<List<T>>failed().withError(ErrorType.APPLICATION,
84                     String.format("Unexpected response type received: %s.", result.getClass())).build());
85             endCollecting(false);
86         } else {
87             final T resultCast = (T) result;
88
89             if (Service.State.NEW.equals(gatheringState)) {
90                 startCollecting();
91             }
92
93             try {
94                 MultipartReplyTranslatorUtil
95                         .translate(resultCast, deviceInfo, convertorExecutor, null)
96                         .ifPresent(reply -> {
97                             try {
98                                 statisticsWriterProvider
99                                         .lookup(getMultipartType())
100                                         .ifPresent(writer -> writer.write(reply, false));
101                             } catch (final Exception ex) {
102                                 LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step",
103                                         getMultipartType(), deviceInfo, ex);
104                             }
105                         });
106             } catch (final Exception ex) {
107                 LOG.warn("Unexpected exception occurred while translating response: {}.", result.getClass(), ex);
108                 setResult(RpcResultBuilder.<List<T>>failed().withError(ErrorType.APPLICATION,
109                         String.format("Unexpected exception occurred while translating response: %s. %s",
110                                       result.getClass(),
111                                       ex)).build());
112                 endCollecting(false);
113                 return;
114             }
115
116             if (!isReqMore(resultCast)) {
117                 endCollecting(true);
118             }
119         }
120     }
121
122     /**
123      * Get tx facade.
124      * @return tx facade
125      */
126     protected TxFacade getTxFacade() {
127         return txFacade;
128     }
129
130     /**
131      * Starts collecting of multipart data.
132      */
133     private synchronized void startCollecting() {
134         EventsTimeCounter.markStart(doneEventIdentifier);
135         gatheringState = Service.State.RUNNING;
136
137         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
138                 .getNodeInstanceIdentifier()
139                 .augmentation(FlowCapableNode.class);
140
141         switch (getMultipartType()) {
142             case OFPMPFLOW:
143                 StatisticsGatheringUtils.deleteAllKnownFlows(
144                         getTxFacade(),
145                         instanceIdentifier,
146                         deviceRegistry.getDeviceFlowRegistry());
147                 deviceRegistry.getDeviceFlowRegistry().processMarks();
148                 break;
149             case OFPMPMETERCONFIG:
150                 StatisticsGatheringUtils.deleteAllKnownMeters(
151                         getTxFacade(),
152                         instanceIdentifier,
153                         deviceRegistry.getDeviceMeterRegistry());
154                 deviceRegistry.getDeviceMeterRegistry().processMarks();
155                 break;
156             case OFPMPGROUPDESC:
157                 StatisticsGatheringUtils.deleteAllKnownGroups(
158                         getTxFacade(),
159                         instanceIdentifier,
160                         deviceRegistry.getDeviceGroupRegistry());
161                 deviceRegistry.getDeviceGroupRegistry().processMarks();
162                 break;
163             default:
164                 // no operation
165         }
166     }
167
168     /**
169      * Ends collecting of multipart data.
170      * @param setResult set empty success result
171      */
172     private void endCollecting(final boolean setResult) {
173         gatheringState = Service.State.TERMINATED;
174         EventsTimeCounter.markEnd(doneEventIdentifier);
175         EventsTimeCounter.markEnd(getEventIdentifier());
176         spyMessage(MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
177
178         if (setResult) {
179             setResult(RpcResultBuilder.success(Collections.<T>emptyList()).build());
180         }
181
182         txFacade.submitTransaction();
183
184         switch (getMultipartType()) {
185             case OFPMPFLOW:
186                 deviceRegistry.getDeviceFlowRegistry().processMarks();
187                 break;
188             case OFPMPMETERCONFIG:
189                 deviceRegistry.getDeviceMeterRegistry().processMarks();
190                 break;
191             case OFPMPGROUPDESC:
192                 deviceRegistry.getDeviceGroupRegistry().processMarks();
193                 break;
194             default:
195                 // no operation
196         }
197     }
198
199     /**
200      * Get multipart type.
201      * @return multipart type
202      */
203     protected abstract MultipartType getMultipartType();
204 }