Merge "BUG-2661: Sonar issues for ofp-extension-api"
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / sal / OFRpcTaskUtil.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core.sal;
9
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.Future;
15
16 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
17 import org.opendaylight.openflowplugin.api.OFConstants;
18 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
19 import org.opendaylight.openflowplugin.api.openflow.md.core.sal.NotificationComposer;
20 import org.opendaylight.openflowplugin.api.statistics.MessageSpy;
21 import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
24 import org.opendaylight.openflowplugin.openflow.md.util.RpcInputOutputTuple;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
28 import org.opendaylight.yangtools.yang.binding.DataContainer;
29 import org.opendaylight.yangtools.yang.binding.Notification;
30 import org.opendaylight.yangtools.yang.common.RpcError;
31 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.base.Function;
38 import com.google.common.base.Objects;
39 import com.google.common.collect.Lists;
40 import com.google.common.util.concurrent.AsyncFunction;
41 import com.google.common.util.concurrent.FutureCallback;
42 import com.google.common.util.concurrent.Futures;
43 import com.google.common.util.concurrent.JdkFutureAdapters;
44 import com.google.common.util.concurrent.ListenableFuture;
45
46 /**
47  *
48  */
49 public abstract class OFRpcTaskUtil {
50     protected static final Logger LOG = LoggerFactory.getLogger(OFRpcTaskUtil.class);
51     /**
52      * @param taskContext
53      * @param isBarrier
54      * @param cookie
55      * @return rpcResult of given type, containing wrapped errors of barrier sending (if any) or success
56      */
57     private OFRpcTaskUtil() {
58         //hiding implicit constructor
59     }
60
61     public static Collection<RpcError> manageBarrier(OFRpcTaskContext taskContext, Boolean isBarrier,
62             SwitchConnectionDistinguisher cookie) {
63         Collection<RpcError> errors = null;
64         if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) {
65             RpcInputOutputTuple<BarrierInput, ListenableFuture<RpcResult<BarrierOutput>>> sendBarrierRpc =
66                     sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService());
67             Future<RpcResult<BarrierOutput>> barrierFuture = sendBarrierRpc.getOutput();
68             try {
69                 RpcResult<BarrierOutput> barrierResult = barrierFuture.get(
70                         taskContext.getMaxTimeout(), taskContext.getMaxTimeoutUnit());
71                 if (!barrierResult.isSuccessful()) {
72                     errors = barrierResult.getErrors();
73                 }
74             } catch (Exception e) {
75                 RpcError rpcError = RpcResultBuilder.newWarning(
76                         ErrorType.RPC,
77                         OFConstants.ERROR_TAG_TIMEOUT,
78                         "barrier sending failed",
79                         OFConstants.APPLICATION_TAG,
80                         "switch failed to respond on barrier request - message ordering is not preserved",
81                         e);
82                 errors = Lists.newArrayList(rpcError);
83             }
84         }
85
86         if (errors == null) {
87             errors = Collections.emptyList();
88         }
89
90         return errors;
91     }
92
93     /**
94      * @param session
95      * @param cookie
96      * @param messageService
97      * @return barrier response
98      */
99     protected static RpcInputOutputTuple<BarrierInput, ListenableFuture<RpcResult<BarrierOutput>>> sendBarrier(SessionContext session,
100             SwitchConnectionDistinguisher cookie, IMessageDispatchService messageService) {
101         BarrierInput barrierInput = MessageFactory.createBarrier(
102                 session.getFeatures().getVersion(), session.getNextXid());
103         Future<RpcResult<BarrierOutput>> barrierResult = messageService.barrier(barrierInput, cookie);
104         ListenableFuture<RpcResult<BarrierOutput>> output = JdkFutureAdapters.listenInPoolThread(barrierResult);
105
106         return new RpcInputOutputTuple<>(barrierInput, output);
107     }
108
109     /**
110      * @param task of rpc
111      * @param originalResult
112      * @param notificationProviderService
113      * @param notificationComposer lazy notification composer
114      */
115     public static <R extends RpcResult<? extends TransactionAware>, N extends Notification, I extends DataContainer>
116     void hookFutureNotification(
117             final OFRpcTask<I, R> task,
118             ListenableFuture<R> originalResult,
119             final NotificationProviderService notificationProviderService,
120             final NotificationComposer<N> notificationComposer) {
121
122         class FutureCallbackImpl implements FutureCallback<R> {
123             @Override
124             public void onSuccess(R result) {
125                 if(null == notificationProviderService) {
126                     LOG.warn("onSuccess(): notificationServiceProvider is null, could not publish result {}",result);
127                 } else if (notificationComposer == null) {
128                     LOG.warn("onSuccess(): notificationComposer is null, could not publish result {}",result);
129                 } else if(result == null) {
130                     LOG.warn("onSuccess(): result is null, could not publish result {}",result);
131                 } else if (result.getResult() == null) {
132                     LOG.warn("onSuccess(): result.getResult() is null, could not publish result {}",result);
133                 } else if (result.getResult().getTransactionId() == null) {
134                     LOG.warn("onSuccess(): result.getResult().getTransactionId() is null, could not publish result {}",result);
135                 } else {
136                     notificationProviderService.publish(notificationComposer.compose(result.getResult().getTransactionId()));
137                     task.getTaskContext().getMessageSpy().spyMessage(
138                             task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS);
139                 }
140             }
141
142             @Override
143             public void onFailure(Throwable t) {
144                 //TODO: good place to notify MD-SAL about errors
145                 task.getTaskContext().getMessageSpy().spyMessage(
146                         task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE);
147             }
148         }
149
150         Futures.addCallback(originalResult, new FutureCallbackImpl());
151     }
152
153     /**
154      * @param task of rpcl
155      * @param originalResult
156      * @param notificationProviderService
157      * @param notificationComposer lazy notification composer
158      * @return chained result with barrier
159      */
160     public static <T extends TransactionAware, I extends DataContainer>
161     ListenableFuture<RpcResult<T>> chainFutureBarrier(
162             final OFRpcTask<I, RpcResult<T>> task,
163             final ListenableFuture<RpcResult<T>> originalResult) {
164
165         ListenableFuture<RpcResult<T>> chainResult = originalResult;
166         if (Objects.firstNonNull(task.isBarrier(), Boolean.FALSE)) {
167
168             chainResult = Futures.transform(originalResult, new AsyncFunction<RpcResult<T>, RpcResult<T>>() {
169
170                 @Override
171                 public ListenableFuture<RpcResult<T>> apply(final RpcResult<T> input) throws Exception {
172                     if (input.isSuccessful()) {
173                         RpcInputOutputTuple<BarrierInput, ListenableFuture<RpcResult<BarrierOutput>>> sendBarrierRpc = sendBarrier(
174                                 task.getSession(), task.getCookie(), task.getMessageService());
175                         ListenableFuture<RpcResult<T>> barrierTxResult = Futures.transform(
176                                 sendBarrierRpc.getOutput(),
177                                 transformBarrierToTransactionAware(input, sendBarrierRpc.getInput()));
178                         return barrierTxResult;
179                     } else {
180                         return Futures.immediateFuture(input);
181                     }
182                 }
183
184             });
185         }
186
187         return chainResult;
188     }
189
190     /**
191      * @param originalInput
192      * @return
193      */
194     protected static <T extends TransactionAware> Function<RpcResult<BarrierOutput>, RpcResult<T>> transformBarrierToTransactionAware(
195             final RpcResult<T> originalInput, final BarrierInput barrierInput) {
196
197         class FunctionImpl implements Function<RpcResult<BarrierOutput>, RpcResult<T>> {
198
199             @Override
200             public RpcResult<T> apply(final RpcResult<BarrierOutput> barrierResult) {
201                 RpcResultBuilder<T> rpcBuilder = null;
202                 if (barrierResult.isSuccessful()) {
203                     rpcBuilder = RpcResultBuilder.<T>success();
204                 } else {
205                     rpcBuilder = RpcResultBuilder.<T>failed();
206                     RpcError rpcError = RpcResultBuilder.newWarning(
207                             ErrorType.RPC,
208                             OFConstants.ERROR_TAG_TIMEOUT,
209                             "barrier sending failed",
210                             OFConstants.APPLICATION_TAG,
211                             "switch failed to respond on barrier request, barrier.xid = "+barrierInput.getXid(),
212                             null);
213                     List<RpcError> chainedErrors = new ArrayList<>();
214                     chainedErrors.add(rpcError);
215                     chainedErrors.addAll(barrierResult.getErrors());
216                     rpcBuilder.withRpcErrors(chainedErrors);
217                 }
218
219                 rpcBuilder.withResult(originalInput.getResult());
220
221                 return rpcBuilder.build();
222             }
223         }
224
225         return new FunctionImpl();
226     }
227 }