/** * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.openflowplugin.openflow.md.core.sal; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.Future; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.sal.NotificationComposer; import org.opendaylight.openflowplugin.api.statistics.MessageSpy; import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory; import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; import org.opendaylight.openflowplugin.openflow.md.util.RpcInputOutputTuple; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; /** * */ public abstract class OFRpcTaskUtil { protected static final Logger LOG = LoggerFactory.getLogger(OFRpcTaskUtil.class); /** * @param taskContext * @param isBarrier * @param cookie * @return rpcResult of given type, containing wrapped errors of barrier sending (if any) or success */ public static Collection manageBarrier(OFRpcTaskContext taskContext, Boolean isBarrier, SwitchConnectionDistinguisher cookie) { Collection errors = null; if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) { RpcInputOutputTuple>> sendBarrierRpc = sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService()); Future> barrierFuture = sendBarrierRpc.getOutput(); try { RpcResult barrierResult = barrierFuture.get( taskContext.getMaxTimeout(), taskContext.getMaxTimeoutUnit()); if (!barrierResult.isSuccessful()) { errors = barrierResult.getErrors(); } } catch (Exception e) { RpcError rpcError = RpcResultBuilder.newWarning( ErrorType.RPC, OFConstants.ERROR_TAG_TIMEOUT, "barrier sending failed", OFConstants.APPLICATION_TAG, "switch failed to respond on barrier request - message ordering is not preserved", e); errors = Lists.newArrayList(rpcError); } } if (errors == null) { errors = Collections.emptyList(); } return errors; } /** * @param session * @param cookie * @param messageService * @return barrier response */ protected static RpcInputOutputTuple>> sendBarrier(SessionContext session, SwitchConnectionDistinguisher cookie, IMessageDispatchService messageService) { BarrierInput barrierInput = MessageFactory.createBarrier( session.getFeatures().getVersion(), session.getNextXid()); Future> barrierResult = messageService.barrier(barrierInput, cookie); ListenableFuture> output = JdkFutureAdapters.listenInPoolThread(barrierResult); return new RpcInputOutputTuple<>(barrierInput, output); } /** * @param task of rpc * @param originalResult * @param notificationProviderService * @param notificationComposer lazy notification composer */ public static , N extends Notification, INPUT extends DataContainer> void hookFutureNotification( final OFRpcTask task, ListenableFuture originalResult, final NotificationProviderService notificationProviderService, final NotificationComposer notificationComposer) { Futures.addCallback(originalResult, new FutureCallback() { @Override public void onSuccess(R result) { if(null == notificationProviderService) { LOG.warn("onSuccess(): notificationServiceProvider is null, could not publish result {}",result); } else if (notificationComposer == null) { LOG.warn("onSuccess(): notificationComposer is null, could not publish result {}",result); } else if(result == null) { LOG.warn("onSuccess(): result is null, could not publish result {}",result); } else if (result.getResult() == null) { LOG.warn("onSuccess(): result.getResult() is null, could not publish result {}",result); } else if (result.getResult().getTransactionId() == null) { LOG.warn("onSuccess(): result.getResult().getTransactionId() is null, could not publish result {}",result); } else { notificationProviderService.publish(notificationComposer.compose(result.getResult().getTransactionId())); task.getTaskContext().getMessageSpy().spyMessage( task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS); } } @Override public void onFailure(Throwable t) { //TODO: good place to notify MD-SAL about errors task.getTaskContext().getMessageSpy().spyMessage( task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE); } }); } /** * @param task of rpcl * @param originalResult * @param notificationProviderService * @param notificationComposer lazy notification composer * @return chained result with barrier */ public static ListenableFuture> chainFutureBarrier( final OFRpcTask> task, final ListenableFuture> originalResult) { ListenableFuture> chainResult = originalResult; if (Objects.firstNonNull(task.isBarrier(), Boolean.FALSE)) { chainResult = Futures.transform(originalResult, new AsyncFunction, RpcResult>() { @Override public ListenableFuture> apply(final RpcResult input) throws Exception { if (input.isSuccessful()) { RpcInputOutputTuple>> sendBarrierRpc = sendBarrier( task.getSession(), task.getCookie(), task.getMessageService()); ListenableFuture> barrierTxResult = Futures.transform( sendBarrierRpc.getOutput(), transformBarrierToTransactionAware(input, sendBarrierRpc.getInput())); return barrierTxResult; } else { return Futures.immediateFuture(input); } } }); } return chainResult; } /** * @param originalInput * @return */ protected static Function, RpcResult> transformBarrierToTransactionAware( final RpcResult originalInput, final BarrierInput barrierInput) { return new Function, RpcResult>() { @Override public RpcResult apply(final RpcResult barrierResult) { RpcResultBuilder rpcBuilder = null; if (barrierResult.isSuccessful()) { rpcBuilder = RpcResultBuilder.success(); } else { rpcBuilder = RpcResultBuilder.failed(); RpcError rpcError = RpcResultBuilder.newWarning( ErrorType.RPC, OFConstants.ERROR_TAG_TIMEOUT, "barrier sending failed", OFConstants.APPLICATION_TAG, "switch failed to respond on barrier request, barrier.xid = "+barrierInput.getXid(), null); List chainedErrors = new ArrayList<>(); chainedErrors.add(rpcError); chainedErrors.addAll(barrierResult.getErrors()); rpcBuilder.withRpcErrors(chainedErrors); } rpcBuilder.withResult(originalInput.getResult()); return rpcBuilder.build(); } }; } }