a1b5e19d7d11e95f39dda3e7201eca813d45b985
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / AbstractMultipartRequestOnTheFlyCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.services;
9
10 import com.google.common.util.concurrent.Service;
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.Objects;
14 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
15 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
16 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
17 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
18 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
19 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
20 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
21 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
22 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
23 import org.opendaylight.openflowplugin.impl.statistics.StatisticsGatheringUtils;
24 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
25 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
29 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
30 import org.opendaylight.yangtools.yang.common.RpcError;
31 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public abstract class AbstractMultipartRequestOnTheFlyCallback<T extends OfHeader>
36                                                         extends AbstractMultipartRequestCallback<T> {
37     private static final Logger LOG = LoggerFactory.getLogger(AbstractMultipartRequestOnTheFlyCallback.class);
38     private final DeviceInfo deviceInfo;
39     private final EventIdentifier doneEventIdentifier;
40     private final TxFacade txFacade;
41     private final MultipartWriterProvider statisticsWriterProvider;
42     private final DeviceRegistry deviceRegistry;
43     private volatile Service.State gatheringState = Service.State.NEW;
44     private ConvertorExecutor convertorExecutor;
45
46     public AbstractMultipartRequestOnTheFlyCallback(final RequestContext<List<T>> context, Class<?> requestType,
47                                                     final DeviceContext deviceContext,
48                                                     final EventIdentifier eventIdentifier,
49                                                     final MultipartWriterProvider statisticsWriterProvider,
50                                                     final ConvertorExecutor convertorExecutor) {
51         super(context, requestType, deviceContext, eventIdentifier);
52         deviceInfo = deviceContext.getDeviceInfo();
53         doneEventIdentifier =
54                 new EventIdentifier(getMultipartType().name(), deviceContext.getDeviceInfo().getNodeId().toString());
55         txFacade = deviceContext;
56         deviceRegistry = deviceContext;
57         this.statisticsWriterProvider = statisticsWriterProvider;
58         this.convertorExecutor = convertorExecutor;
59     }
60
61     @Override
62     @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch"})
63     public void onSuccess(final OfHeader result) {
64         if (Objects.isNull(result)) {
65             LOG.warn("Response received was null.");
66
67             if (!Service.State.TERMINATED.equals(gatheringState)) {
68                 endCollecting(true);
69             }
70
71             return;
72         } else if (Service.State.TERMINATED.equals(gatheringState)) {
73             LOG.warn("Unexpected response received: xid={}, {}", result.getXid(), result.getImplementedInterface());
74             return;
75         }
76
77         if (!isMultipart(result)) {
78             LOG.warn("Unexpected response type received: {}.", result.getClass());
79             setResult(RpcResultBuilder.<List<T>>failed().withError(RpcError.ErrorType.APPLICATION,
80                     String.format("Unexpected response type received: %s.", result.getClass())).build());
81             endCollecting(false);
82         } else {
83             final T resultCast = (T) result;
84
85             if (Service.State.NEW.equals(gatheringState)) {
86                 startCollecting();
87             }
88
89             try {
90                 MultipartReplyTranslatorUtil
91                         .translate(resultCast, deviceInfo, convertorExecutor, null)
92                         .ifPresent(reply -> {
93                             try {
94                                 statisticsWriterProvider
95                                         .lookup(getMultipartType())
96                                         .ifPresent(writer -> writer.write(reply, false));
97                             } catch (final Exception ex) {
98                                 LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step",
99                                         getMultipartType(), deviceInfo.getLOGValue(), ex);
100                             }
101                         });
102             } catch (final Exception ex) {
103                 LOG.warn("Unexpected exception occurred while translating response: {}.", result.getClass(), ex);
104                 setResult(RpcResultBuilder.<List<T>>failed().withError(RpcError.ErrorType.APPLICATION,
105                         String.format("Unexpected exception occurred while translating response: %s. %s",
106                                       result.getClass(),
107                                       ex)).build());
108                 endCollecting(false);
109                 return;
110             }
111
112             if (!isReqMore(resultCast)) {
113                 endCollecting(true);
114             }
115         }
116     }
117
118     /**
119      * Get tx facade.
120      * @return tx facade
121      */
122     protected TxFacade getTxFacade() {
123         return txFacade;
124     }
125
126     /**
127      * Starts collecting of multipart data.
128      */
129     private synchronized void startCollecting() {
130         EventsTimeCounter.markStart(doneEventIdentifier);
131         gatheringState = Service.State.RUNNING;
132
133         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
134                 .getNodeInstanceIdentifier()
135                 .augmentation(FlowCapableNode.class);
136
137         switch (getMultipartType()) {
138             case OFPMPFLOW:
139                 StatisticsGatheringUtils.deleteAllKnownFlows(
140                         getTxFacade(),
141                         instanceIdentifier,
142                         deviceRegistry.getDeviceFlowRegistry());
143                 break;
144             case OFPMPMETERCONFIG:
145                 StatisticsGatheringUtils.deleteAllKnownMeters(
146                         getTxFacade(),
147                         instanceIdentifier,
148                         deviceRegistry.getDeviceMeterRegistry());
149                 break;
150             case OFPMPGROUPDESC:
151                 StatisticsGatheringUtils.deleteAllKnownGroups(
152                         getTxFacade(),
153                         instanceIdentifier,
154                         deviceRegistry.getDeviceGroupRegistry());
155                 break;
156             default:
157                 // no operation
158         }
159     }
160
161     /**
162      * Ends collecting of multipart data.
163      * @param setResult set empty success result
164      */
165     private void endCollecting(final boolean setResult) {
166         gatheringState = Service.State.TERMINATED;
167         EventsTimeCounter.markEnd(doneEventIdentifier);
168         EventsTimeCounter.markEnd(getEventIdentifier());
169         spyMessage(MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
170
171         if (setResult) {
172             setResult(RpcResultBuilder.success(Collections.<T>emptyList()).build());
173         }
174
175         txFacade.submitTransaction();
176
177         switch (getMultipartType()) {
178             case OFPMPFLOW:
179                 deviceRegistry.getDeviceFlowRegistry().processMarks();
180                 break;
181             case OFPMPMETERCONFIG:
182                 deviceRegistry.getDeviceMeterRegistry().processMarks();
183                 break;
184             case OFPMPGROUPDESC:
185                 deviceRegistry.getDeviceGroupRegistry().processMarks();
186                 break;
187             default:
188                 // no operation
189         }
190     }
191
192     /**
193      * Get multipart type.
194      * @return multipart type
195      */
196     protected abstract MultipartType getMultipartType();
197 }