/** * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.openflowplugin.openflow.md.core.sal; import java.util.Collection; import java.util.Collections; import java.util.concurrent.Future; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.sal.NotificationComposer; import org.opendaylight.openflowplugin.api.statistics.MessageSpy; import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory; import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; /** * */ public abstract class OFRpcTaskUtil { /** * @param taskContext * @param isBarrier * @param cookie * @return rpcResult of given type, containing wrapped errors of barrier sending (if any) or success */ public static Collection manageBarrier(OFRpcTaskContext taskContext, Boolean isBarrier, SwitchConnectionDistinguisher cookie) { Collection errors = null; if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) { Future> barrierFuture = sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService()); try { RpcResult barrierResult = barrierFuture.get( taskContext.getMaxTimeout(), taskContext.getMaxTimeoutUnit()); if (!barrierResult.isSuccessful()) { errors = barrierResult.getErrors(); } } catch (Exception e) { RpcError rpcError = RpcResultBuilder.newWarning( ErrorType.RPC, OFConstants.ERROR_TAG_TIMEOUT, "barrier sending failed", OFConstants.APPLICATION_TAG, "switch failed to respond on barrier request - message ordering is not preserved", e); errors = Lists.newArrayList(rpcError); } } if (errors == null) { errors = Collections.emptyList(); } return errors; } /** * @param session * @param cookie * @param messageService * @return barrier response */ private static Future> sendBarrier(SessionContext session, SwitchConnectionDistinguisher cookie, IMessageDispatchService messageService) { BarrierInput barrierInput = MessageFactory.createBarrier( session.getFeatures().getVersion(), session.getNextXid()); return messageService.barrier(barrierInput, cookie); } /** * @param result rpcResult with success = false, errors = given collection * @param barrierErrors */ public static void wrapBarrierErrors(SettableFuture> result, Collection barrierErrors) { result.set(RpcResultBuilder.failed().withRpcErrors(barrierErrors).build()); } /** * @param task of rpc * @param originalResult * @param notificationProviderService * @param notificationComposer lazy notification composer */ public static , N extends Notification, INPUT extends DataContainer> void hookFutureNotification( final OFRpcTask task, ListenableFuture originalResult, final NotificationProviderService notificationProviderService, final NotificationComposer notificationComposer) { Futures.addCallback(originalResult, new FutureCallback() { @Override public void onSuccess(R result) { if (null != notificationProviderService) { notificationProviderService.publish(notificationComposer.compose(result.getResult().getTransactionId())); } task.getTaskContext().getMessageSpy().spyMessage( task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS); } @Override public void onFailure(Throwable t) { task.getTaskContext().getMessageSpy().spyMessage( task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE); } }); } }