038612c609436c6c9ef7576f227af6c1083746e4
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / AbstractMultipartRequestOnTheFlyCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.services;
9
10 import com.google.common.util.concurrent.Service;
11 import java.util.Collections;
12 import java.util.List;
13 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
14 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
15 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
16 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
17 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
18 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
19 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
20 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
21 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
22 import org.opendaylight.openflowplugin.impl.statistics.StatisticsGatheringUtils;
23 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
24 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
28 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
29 import org.opendaylight.yangtools.yang.common.RpcError;
30 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 public abstract class AbstractMultipartRequestOnTheFlyCallback<T extends OfHeader>
35                                                         extends AbstractMultipartRequestCallback<T> {
36     private static final Logger LOG = LoggerFactory.getLogger(AbstractMultipartRequestOnTheFlyCallback.class);
37     private final DeviceInfo deviceInfo;
38     private final EventIdentifier doneEventIdentifier;
39     private final TxFacade txFacade;
40     private final MultipartWriterProvider statisticsWriterProvider;
41     private final DeviceRegistry deviceRegistry;
42     private volatile Service.State gatheringState = Service.State.NEW;
43     private final ConvertorExecutor convertorExecutor;
44
45     public AbstractMultipartRequestOnTheFlyCallback(final RequestContext<List<T>> context, Class<?> requestType,
46                                                     final DeviceContext deviceContext,
47                                                     final EventIdentifier eventIdentifier,
48                                                     final MultipartWriterProvider statisticsWriterProvider,
49                                                     final ConvertorExecutor convertorExecutor) {
50         super(context, requestType, deviceContext, eventIdentifier);
51         deviceInfo = deviceContext.getDeviceInfo();
52         doneEventIdentifier =
53                 new EventIdentifier(getMultipartType().name(), deviceContext.getDeviceInfo().getNodeId().toString());
54         txFacade = deviceContext;
55         deviceRegistry = deviceContext;
56         this.statisticsWriterProvider = statisticsWriterProvider;
57         this.convertorExecutor = convertorExecutor;
58     }
59
60     @Override
61     @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch"})
62     public void onSuccess(final OfHeader result) {
63         if (result == null) {
64             LOG.warn("Response received was null.");
65
66             if (!Service.State.TERMINATED.equals(gatheringState)) {
67                 endCollecting(true);
68             }
69
70             return;
71         } else if (Service.State.TERMINATED.equals(gatheringState)) {
72             LOG.warn("Unexpected response received: xid={}, {}", result.getXid(), result.implementedInterface());
73             return;
74         }
75
76         if (!isMultipart(result)) {
77             LOG.warn("Unexpected response type received: {}.", result.getClass());
78             setResult(RpcResultBuilder.<List<T>>failed().withError(RpcError.ErrorType.APPLICATION,
79                     String.format("Unexpected response type received: %s.", result.getClass())).build());
80             endCollecting(false);
81         } else {
82             final T resultCast = (T) result;
83
84             if (Service.State.NEW.equals(gatheringState)) {
85                 startCollecting();
86             }
87
88             try {
89                 MultipartReplyTranslatorUtil
90                         .translate(resultCast, deviceInfo, convertorExecutor, null)
91                         .ifPresent(reply -> {
92                             try {
93                                 statisticsWriterProvider
94                                         .lookup(getMultipartType())
95                                         .ifPresent(writer -> writer.write(reply, false));
96                             } catch (final Exception ex) {
97                                 LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step",
98                                         getMultipartType(), deviceInfo, ex);
99                             }
100                         });
101             } catch (final Exception ex) {
102                 LOG.warn("Unexpected exception occurred while translating response: {}.", result.getClass(), ex);
103                 setResult(RpcResultBuilder.<List<T>>failed().withError(RpcError.ErrorType.APPLICATION,
104                         String.format("Unexpected exception occurred while translating response: %s. %s",
105                                       result.getClass(),
106                                       ex)).build());
107                 endCollecting(false);
108                 return;
109             }
110
111             if (!isReqMore(resultCast)) {
112                 endCollecting(true);
113             }
114         }
115     }
116
117     /**
118      * Get tx facade.
119      * @return tx facade
120      */
121     protected TxFacade getTxFacade() {
122         return txFacade;
123     }
124
125     /**
126      * Starts collecting of multipart data.
127      */
128     private synchronized void startCollecting() {
129         EventsTimeCounter.markStart(doneEventIdentifier);
130         gatheringState = Service.State.RUNNING;
131
132         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
133                 .getNodeInstanceIdentifier()
134                 .augmentation(FlowCapableNode.class);
135
136         switch (getMultipartType()) {
137             case OFPMPFLOW:
138                 StatisticsGatheringUtils.deleteAllKnownFlows(
139                         getTxFacade(),
140                         instanceIdentifier,
141                         deviceRegistry.getDeviceFlowRegistry());
142                 deviceRegistry.getDeviceFlowRegistry().processMarks();
143                 break;
144             case OFPMPMETERCONFIG:
145                 StatisticsGatheringUtils.deleteAllKnownMeters(
146                         getTxFacade(),
147                         instanceIdentifier,
148                         deviceRegistry.getDeviceMeterRegistry());
149                 deviceRegistry.getDeviceMeterRegistry().processMarks();
150                 break;
151             case OFPMPGROUPDESC:
152                 StatisticsGatheringUtils.deleteAllKnownGroups(
153                         getTxFacade(),
154                         instanceIdentifier,
155                         deviceRegistry.getDeviceGroupRegistry());
156                 deviceRegistry.getDeviceGroupRegistry().processMarks();
157                 break;
158             default:
159                 // no operation
160         }
161     }
162
163     /**
164      * Ends collecting of multipart data.
165      * @param setResult set empty success result
166      */
167     private void endCollecting(final boolean setResult) {
168         gatheringState = Service.State.TERMINATED;
169         EventsTimeCounter.markEnd(doneEventIdentifier);
170         EventsTimeCounter.markEnd(getEventIdentifier());
171         spyMessage(MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
172
173         if (setResult) {
174             setResult(RpcResultBuilder.success(Collections.<T>emptyList()).build());
175         }
176
177         txFacade.submitTransaction();
178
179         switch (getMultipartType()) {
180             case OFPMPFLOW:
181                 deviceRegistry.getDeviceFlowRegistry().processMarks();
182                 break;
183             case OFPMPMETERCONFIG:
184                 deviceRegistry.getDeviceMeterRegistry().processMarks();
185                 break;
186             case OFPMPGROUPDESC:
187                 deviceRegistry.getDeviceGroupRegistry().processMarks();
188                 break;
189             default:
190                 // no operation
191         }
192     }
193
194     /**
195      * Get multipart type.
196      * @return multipart type
197      */
198     protected abstract MultipartType getMultipartType();
199 }