Merge "BUG-1952 Flow update rpc"
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / sal / OFRpcTaskUtil.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * 
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core.sal;
9
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.concurrent.Future;
13
14 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
15 import org.opendaylight.openflowplugin.api.OFConstants;
16 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
17 import org.opendaylight.openflowplugin.api.openflow.md.core.sal.NotificationComposer;
18 import org.opendaylight.openflowplugin.api.statistics.MessageSpy;
19 import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory;
20 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
21 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
25 import org.opendaylight.yangtools.yang.binding.DataContainer;
26 import org.opendaylight.yangtools.yang.binding.Notification;
27 import org.opendaylight.yangtools.yang.common.RpcError;
28 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
29 import org.opendaylight.yangtools.yang.common.RpcResult;
30 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
31
32 import com.google.common.base.Objects;
33 import com.google.common.collect.Lists;
34 import com.google.common.util.concurrent.FutureCallback;
35 import com.google.common.util.concurrent.Futures;
36 import com.google.common.util.concurrent.ListenableFuture;
37 import com.google.common.util.concurrent.SettableFuture;
38
39 /**
40  * 
41  */
42 public abstract class OFRpcTaskUtil {
43
44     /**
45      * @param taskContext 
46      * @param isBarrier 
47      * @param cookie 
48      * @return rpcResult of given type, containing wrapped errors of barrier sending (if any) or success
49      */
50     public static Collection<RpcError> manageBarrier(OFRpcTaskContext taskContext, Boolean isBarrier,
51             SwitchConnectionDistinguisher cookie) {
52         Collection<RpcError> errors = null;
53         if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) {
54             Future<RpcResult<BarrierOutput>> barrierFuture = sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService());
55             try {
56                 RpcResult<BarrierOutput> barrierResult = barrierFuture.get(
57                         taskContext.getMaxTimeout(), taskContext.getMaxTimeoutUnit());
58                 if (!barrierResult.isSuccessful()) {
59                     errors = barrierResult.getErrors();
60                 }
61             } catch (Exception e) {
62                 RpcError rpcError = RpcResultBuilder.newWarning(
63                         ErrorType.RPC, 
64                         OFConstants.ERROR_TAG_TIMEOUT, 
65                         "barrier sending failed", 
66                         OFConstants.APPLICATION_TAG, 
67                         "switch failed to respond on barrier request - message ordering is not preserved", 
68                         e);
69                 errors = Lists.newArrayList(rpcError);
70             }
71         } 
72         
73         if (errors == null) {
74             errors = Collections.emptyList();
75         }
76         
77         return errors;
78     }
79
80     /**
81      * @param session
82      * @param cookie
83      * @param messageService
84      * @return barrier response
85      */
86     private static Future<RpcResult<BarrierOutput>> sendBarrier(SessionContext session, 
87             SwitchConnectionDistinguisher cookie, IMessageDispatchService messageService) {
88         BarrierInput barrierInput = MessageFactory.createBarrier(
89                 session.getFeatures().getVersion(), session.getNextXid());
90         return messageService.barrier(barrierInput, cookie);
91     }
92
93     /**
94      * @param result rpcResult with success = false, errors = given collection
95      * @param barrierErrors
96      */
97     public static <T> void wrapBarrierErrors(SettableFuture<RpcResult<T>> result,
98             Collection<RpcError> barrierErrors) {
99         result.set(RpcResultBuilder.<T>failed().withRpcErrors(barrierErrors).build());
100     }
101     
102     /**
103      * @param task of rpc
104      * @param originalResult
105      * @param notificationProviderService
106      * @param notificationComposer lazy notification composer
107      */
108     public static <R extends RpcResult<? extends TransactionAware>, N extends Notification, INPUT extends DataContainer> 
109     void hookFutureNotification(
110             final OFRpcTask<INPUT, R> task,
111             ListenableFuture<R> originalResult, 
112             final NotificationProviderService notificationProviderService, 
113             final NotificationComposer<N> notificationComposer) {
114         Futures.addCallback(originalResult, new FutureCallback<R>() {
115             @Override
116             public void onSuccess(R result) {
117                 if (null != notificationProviderService) {
118                     notificationProviderService.publish(notificationComposer.compose(result.getResult().getTransactionId()));
119                 }
120                 task.getTaskContext().getMessageSpy().spyMessage(
121                         task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS);
122             }
123             
124             @Override
125             public void onFailure(Throwable t) {
126                 task.getTaskContext().getMessageSpy().spyMessage(
127                         task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE);
128             }
129         });
130     }
131
132 }