03662fd36accb49e127022bb2ce890e0fd8e79c1
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / sal / OFRpcTaskUtil.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * 
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core.sal;
9
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.concurrent.Future;
13
14 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
15 import org.opendaylight.controller.sal.common.util.RpcErrors;
16 import org.opendaylight.controller.sal.common.util.Rpcs;
17 import org.opendaylight.openflowplugin.openflow.md.OFConstants;
18 import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory;
19 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
20 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
21 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
24 import org.opendaylight.yangtools.yang.binding.Notification;
25 import org.opendaylight.yangtools.yang.common.RpcError;
26 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
27 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
28 import org.opendaylight.yangtools.yang.common.RpcResult;
29
30 import com.google.common.base.Objects;
31 import com.google.common.collect.Lists;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36
37 /**
38  * 
39  */
40 public abstract class OFRpcTaskUtil {
41
42     /**
43      * @param taskContext 
44      * @param isBarrier 
45      * @param cookie 
46      * @return rpcResult of given type, containing wrapped errors of barrier sending (if any) or success
47      */
48     public static Collection<RpcError> manageBarrier(OFRpcTaskContext taskContext, Boolean isBarrier, 
49             SwitchConnectionDistinguisher cookie) {
50         Collection<RpcError> errors = null;
51         if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) {
52             Future<RpcResult<BarrierOutput>> barrierFuture = sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService());
53             try {
54                 RpcResult<BarrierOutput> barrierResult = barrierFuture.get(
55                         taskContext.getMaxTimeout(), taskContext.getMaxTimeoutUnit());
56                 if (!barrierResult.isSuccessful()) {
57                     errors = barrierResult.getErrors();
58                 }
59             } catch (Exception e) {
60                 RpcError rpcError = RpcErrors.getRpcError(
61                         OFConstants.APPLICATION_TAG, OFConstants.ERROR_TAG_TIMEOUT, 
62                         "barrier sending failed", ErrorSeverity.WARNING, 
63                         "switch failed to respond on barrier request - message ordering is not preserved", ErrorType.RPC, e);
64                 errors = Lists.newArrayList(rpcError);
65             }
66         } 
67         
68         if (errors == null) {
69             errors = Collections.emptyList();
70         }
71         
72         return errors;
73     }
74
75     /**
76      * @param session
77      * @param cookie
78      * @param messageService
79      * @return barrier response
80      */
81     private static Future<RpcResult<BarrierOutput>> sendBarrier(SessionContext session, 
82             SwitchConnectionDistinguisher cookie, IMessageDispatchService messageService) {
83         BarrierInput barrierInput = MessageFactory.createBarrier(
84                 session.getFeatures().getVersion(), session.getNextXid());
85         return messageService.barrier(barrierInput, cookie);
86     }
87
88     /**
89      * @param result rpcResult with success = false, errors = given collection
90      * @param barrierErrors
91      */
92     public static <T> void wrapBarrierErrors(SettableFuture<RpcResult<T>> result,
93             Collection<RpcError> barrierErrors) {
94         result.set(Rpcs.<T>getRpcResult(false, barrierErrors));
95     }
96     
97     /**
98      * @param originalResult
99      * @param notificationProviderService
100      * @param notificationComposer lazy notification composer
101      */
102     public static <R, N extends Notification> void hookFutureNotification(ListenableFuture<R> originalResult, 
103             final NotificationProviderService notificationProviderService, 
104             final NotificationComposer<N> notificationComposer) {
105         Futures.addCallback(originalResult, new FutureCallback<R>() {
106             @Override
107             public void onSuccess(R result) {
108                 if (null != notificationProviderService) {
109                     notificationProviderService.publish(notificationComposer.compose());
110                 }
111             }
112             
113             @Override
114             public void onFailure(Throwable t) {
115                 //NOOP
116             }
117         });
118         
119     }
120
121 }