From 8081eb379aa0128eae6826d234b76ece66fcfbfd Mon Sep 17 00:00:00 2001 From: Michal Rehak Date: Mon, 22 Sep 2014 22:39:12 +0200 Subject: [PATCH] BUG-1997: moving barrier after message - before .isBarrier rendered into barrier exchange before message now barrier exchange occurs after message Change-Id: I1e4c4738eedef50b737a2d395123873156740df6 Signed-off-by: Michal Rehak --- .../openflow/md/core/sal/OFRpcTask.java | 7 + .../md/core/sal/OFRpcTaskFactory.java | 343 +++++++++--------- .../openflow/md/core/sal/OFRpcTaskUtil.java | 90 ++++- .../openflow/md/util/RpcInputOutputTuple.java | 42 +++ 4 files changed, 298 insertions(+), 184 deletions(-) create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/RpcInputOutputTuple.java diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java index b43e1586dc..4ef57265e3 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java @@ -129,4 +129,11 @@ public abstract class OFRpcTask implements Callable> { ListenableFuture> compoundResult = getTaskContext().getRpcPool().submit(this); return Futures.dereference(compoundResult); } + + /** + * @return required barrier value + */ + public Boolean isBarrier() { + return null; + } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java index 0bc2b67bbe..5c5e9de7cc 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java @@ -7,17 +7,16 @@ */ package org.opendaylight.openflowplugin.openflow.md.core.sal; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; import org.opendaylight.openflowjava.protocol.api.util.BinContent; import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.sal.NotificationComposer; +import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.GroupConvertor; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.MeterConvertor; @@ -26,7 +25,6 @@ import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.TableFeatu import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.match.MatchReactor; import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil; import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil; -import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded; @@ -158,19 +156,18 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216. import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutputBuilder; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Future; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; /** * @@ -194,24 +191,26 @@ public abstract class OFRpcTaskFactory { public ListenableFuture> call() { ListenableFuture> result = SettableFuture.create(); - Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie()); - if (!barrierErrors.isEmpty()) { - OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); - } else { - // Convert the AddFlowInput to FlowModInput - List ofFlowModInputs = FlowConvertor.toFlowModInputs(getInput(), - getVersion(), getSession().getFeatures().getDatapathId()); + // Convert the AddFlowInput to FlowModInput + List ofFlowModInputs = FlowConvertor.toFlowModInputs(getInput(), + getVersion(), getSession().getFeatures().getDatapathId()); - logger.debug("Number of flows to push to switch: {}", ofFlowModInputs.size()); + logger.debug("Number of flows to push to switch: {}", ofFlowModInputs.size()); - result = chainFlowMods(ofFlowModInputs, 0, getTaskContext(), getCookie()); + result = chainFlowMods(ofFlowModInputs, 0, getTaskContext(), getCookie()); - OFRpcTaskUtil.hookFutureNotification(this, result, - getRpcNotificationProviderService(), - createFlowAddedNotification(getInput())); - } + + result = OFRpcTaskUtil.chainFutureBarrier(this, result); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), + createFlowAddedNotification(getInput())); return result; } + + @Override + public Boolean isBarrier() { + return getInput().isBarrier(); + } }; return task; } @@ -295,45 +294,44 @@ public abstract class OFRpcTaskFactory { @Override public ListenableFuture> call() { ListenableFuture> result = null; - Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), - getInput().getUpdatedFlow().isBarrier(), getCookie()); - if (!barrierErrors.isEmpty()) { - OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); - - } else { - boolean updatedFlow = (getInput().getUpdatedFlow().getMatch().equals(getInput().getOriginalFlow().getMatch())) && - (getInput().getUpdatedFlow().getPriority().equals(getInput().getOriginalFlow().getPriority())); + + boolean updatedFlow = (getInput().getUpdatedFlow().getMatch().equals(getInput().getOriginalFlow().getMatch())) && + (getInput().getUpdatedFlow().getPriority().equals(getInput().getOriginalFlow().getPriority())); - List allFlowMods = new ArrayList<>(); - List ofFlowModInputs; + List allFlowMods = new ArrayList<>(); + List ofFlowModInputs; - if (updatedFlow == false) { - // if neither match nor priority matches, then we would need to remove the flow and add it - //remove flow - RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(getInput().getOriginalFlow()); - List ofFlowRemoveInput = FlowConvertor.toFlowModInputs(removeflow.build(), + if (updatedFlow == false) { + // if neither match nor priority matches, then we would need to remove the flow and add it + //remove flow + RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(getInput().getOriginalFlow()); + List ofFlowRemoveInput = FlowConvertor.toFlowModInputs(removeflow.build(), getVersion(),getSession().getFeatures().getDatapathId()); - // remove flow should be the first - allFlowMods.addAll(ofFlowRemoveInput); - AddFlowInputBuilder addFlowInputBuilder = new AddFlowInputBuilder(getInput().getUpdatedFlow()); - ofFlowModInputs = FlowConvertor.toFlowModInputs(addFlowInputBuilder.build(), - getVersion(), getSession().getFeatures().getDatapathId()); - } else { - ofFlowModInputs = FlowConvertor.toFlowModInputs(getInput().getUpdatedFlow(), - getVersion(), getSession().getFeatures().getDatapathId()); - } + // remove flow should be the first + allFlowMods.addAll(ofFlowRemoveInput); + AddFlowInputBuilder addFlowInputBuilder = new AddFlowInputBuilder(getInput().getUpdatedFlow()); + ofFlowModInputs = FlowConvertor.toFlowModInputs(addFlowInputBuilder.build(), + getVersion(), getSession().getFeatures().getDatapathId()); + } else { + ofFlowModInputs = FlowConvertor.toFlowModInputs(getInput().getUpdatedFlow(), + getVersion(), getSession().getFeatures().getDatapathId()); + } - allFlowMods.addAll(ofFlowModInputs); - logger.debug("Number of flows to push to switch: {}", allFlowMods.size()); - result = chainFlowMods(allFlowMods, 0, getTaskContext(), getCookie()); + allFlowMods.addAll(ofFlowModInputs); + logger.debug("Number of flows to push to switch: {}", allFlowMods.size()); + result = chainFlowMods(allFlowMods, 0, getTaskContext(), getCookie()); - - OFRpcTaskUtil.hookFutureNotification(this, result, + result = OFRpcTaskUtil.chainFutureBarrier(this, result); + OFRpcTaskUtil.hookFutureNotification(this, result, getRpcNotificationProviderService(), createFlowUpdatedNotification(getInput())); - } return result; } + + @Override + public Boolean isBarrier() { + return getInput().getUpdatedFlow().isBarrier(); + } }; return task; } @@ -372,26 +370,27 @@ public abstract class OFRpcTaskFactory { public ListenableFuture> call() { ListenableFuture> result = SettableFuture.create(); - Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie()); - if (!barrierErrors.isEmpty()) { - OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); - } else { - // Convert the AddGroupInput to GroupModInput - GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(), - getVersion(), getSession().getFeatures().getDatapathId()); - final Long xId = getSession().getNextXid(); - ofGroupModInput.setXid(xId); - - Future> resultFromOFLib = getMessageService() - .groupMod(ofGroupModInput.build(), getCookie()); - result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - - OFRpcTaskUtil.hookFutureNotification(this, result, - getRpcNotificationProviderService(), createGroupAddedNotification(getInput())); - } + // Convert the AddGroupInput to GroupModInput + GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(), + getVersion(), getSession().getFeatures().getDatapathId()); + final Long xId = getSession().getNextXid(); + ofGroupModInput.setXid(xId); + + Future> resultFromOFLib = getMessageService() + .groupMod(ofGroupModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + result = OFRpcTaskUtil.chainFutureBarrier(this, result); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createGroupAddedNotification(getInput())); return result; } + + @Override + public Boolean isBarrier() { + return getInput().isBarrier(); + } }; return task; @@ -431,25 +430,26 @@ public abstract class OFRpcTaskFactory { public ListenableFuture> call() { ListenableFuture> result = SettableFuture.create(); - Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie()); - if (!barrierErrors.isEmpty()) { - OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); - } else { - // Convert the AddGroupInput to GroupModInput - MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion()); - final Long xId = getSession().getNextXid(); - ofMeterModInput.setXid(xId); - - Future> resultFromOFLib = getMessageService() - .meterMod(ofMeterModInput.build(), getCookie()); - result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - - OFRpcTaskUtil.hookFutureNotification(this, result, - getRpcNotificationProviderService(), createMeterAddedNotification(getInput())); - } + // Convert the AddGroupInput to GroupModInput + MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion()); + final Long xId = getSession().getNextXid(); + ofMeterModInput.setXid(xId); + + Future> resultFromOFLib = getMessageService() + .meterMod(ofMeterModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + result = OFRpcTaskUtil.chainFutureBarrier(this, result); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createMeterAddedNotification(getInput())); return result; } + + @Override + public Boolean isBarrier() { + return getInput().isBarrier(); + } }; return task; @@ -488,25 +488,22 @@ public abstract class OFRpcTaskFactory { @Override public ListenableFuture> call() { ListenableFuture> result = null; - Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), - getInput().getUpdatedGroup().isBarrier(), getCookie()); - if (!barrierErrors.isEmpty()) { - OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); - } else { - // Convert the UpdateGroupInput to GroupModInput - GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput( - getInput().getUpdatedGroup(), getVersion(), - getSession().getFeatures().getDatapathId()); - final Long xId = getSession().getNextXid(); - ofGroupModInput.setXid(xId); - - Future> resultFromOFLib = - getMessageService().groupMod(ofGroupModInput.build(), getCookie()); - result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - - OFRpcTaskUtil.hookFutureNotification(this, result, - getRpcNotificationProviderService(), createGroupUpdatedNotification(getInput())); - } + + // Convert the UpdateGroupInput to GroupModInput + GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput( + getInput().getUpdatedGroup(), getVersion(), + getSession().getFeatures().getDatapathId()); + final Long xId = getSession().getNextXid(); + ofGroupModInput.setXid(xId); + + Future> resultFromOFLib = + getMessageService().groupMod(ofGroupModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + result = OFRpcTaskUtil.chainFutureBarrier(this, result); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createGroupUpdatedNotification(getInput())); + return result; } }; @@ -545,24 +542,20 @@ public abstract class OFRpcTaskFactory { @Override public ListenableFuture> call() { ListenableFuture> result = null; - Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), - getInput().getUpdatedMeter().isBarrier(), getCookie()); - if (!barrierErrors.isEmpty()) { - OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); - } else { - // Convert the UpdateMeterInput to MeterModInput - MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput( - getInput().getUpdatedMeter(), getVersion()); - final Long xId = getSession().getNextXid(); - ofMeterModInput.setXid(xId); - - Future> resultFromOFLib = - getMessageService().meterMod(ofMeterModInput.build(), getCookie()); - result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - - OFRpcTaskUtil.hookFutureNotification(this, result, - getRpcNotificationProviderService(), createMeterUpdatedNotification(getInput())); - } + + // Convert the UpdateMeterInput to MeterModInput + MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput( + getInput().getUpdatedMeter(), getVersion()); + final Long xId = getSession().getNextXid(); + ofMeterModInput.setXid(xId); + + Future> resultFromOFLib = + getMessageService().meterMod(ofMeterModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + result = OFRpcTaskUtil.chainFutureBarrier(this, result); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createMeterUpdatedNotification(getInput())); return result; } }; @@ -602,24 +595,20 @@ public abstract class OFRpcTaskFactory { @Override public ListenableFuture> call() { ListenableFuture> result = SettableFuture.create(); - - Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie()); - if (!barrierErrors.isEmpty()) { - OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); - } else { - // Convert the AddFlowInput to FlowModInput - FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(), - getVersion(), getSession().getFeatures().getDatapathId()); - final Long xId = getSession().getNextXid(); - ofFlowModInput.setXid(xId); - - Future> resultFromOFLib = - getMessageService().flowMod(ofFlowModInput.build(), getCookie()); - result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - - OFRpcTaskUtil.hookFutureNotification(this, result, - getRpcNotificationProviderService(), createFlowRemovedNotification(getInput())); - } + + // Convert the AddFlowInput to FlowModInput + FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(), + getVersion(), getSession().getFeatures().getDatapathId()); + final Long xId = getSession().getNextXid(); + ofFlowModInput.setXid(xId); + + Future> resultFromOFLib = + getMessageService().flowMod(ofFlowModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + result = OFRpcTaskUtil.chainFutureBarrier(this, result); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createFlowRemovedNotification(getInput())); return result; } @@ -661,29 +650,25 @@ public abstract class OFRpcTaskFactory { @Override public ListenableFuture> call() { ListenableFuture> result = SettableFuture.create(); - - Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie()); - if (!barrierErrors.isEmpty()) { - OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); - } else { - // Convert the AddGroupInput to GroupModInput - GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(), - getVersion(), getSession().getFeatures().getDatapathId()); - final Long xId = getSession().getNextXid(); - ofGroupModInput.setXid(xId); - - Future> resultFromOFLib = getMessageService() - .groupMod(ofGroupModInput.build(), getCookie()); - result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - - OFRpcTaskUtil.hookFutureNotification(this, result, - getRpcNotificationProviderService(), createGroupRemovedNotification(getInput())); - } + + // Convert the AddGroupInput to GroupModInput + GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(), + getVersion(), getSession().getFeatures().getDatapathId()); + final Long xId = getSession().getNextXid(); + ofGroupModInput.setXid(xId); + + Future> resultFromOFLib = getMessageService() + .groupMod(ofGroupModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + result = OFRpcTaskUtil.chainFutureBarrier(this, result); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createGroupRemovedNotification(getInput())); return result; } }; - + return task; } @@ -719,23 +704,19 @@ public abstract class OFRpcTaskFactory { @Override public ListenableFuture> call() { ListenableFuture> result = SettableFuture.create(); - - Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie()); - if (!barrierErrors.isEmpty()) { - OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); - } else { - // Convert the AddGroupInput to GroupModInput - MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion()); - final Long xId = getSession().getNextXid(); - ofMeterModInput.setXid(xId); - - Future> resultFromOFLib = getMessageService() - .meterMod(ofMeterModInput.build(), getCookie()); - result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - - OFRpcTaskUtil.hookFutureNotification(this, result, - getRpcNotificationProviderService(), createMeterRemovedNotification(getInput())); - } + + // Convert the AddGroupInput to GroupModInput + MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion()); + final Long xId = getSession().getNextXid(); + ofMeterModInput.setXid(xId); + + Future> resultFromOFLib = getMessageService() + .meterMod(ofMeterModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + result = OFRpcTaskUtil.chainFutureBarrier(this, result); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createMeterRemovedNotification(getInput())); return result; } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java index c44d16db32..da5bbe947e 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java @@ -7,8 +7,10 @@ */ package org.opendaylight.openflowplugin.openflow.md.core.sal; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.concurrent.Future; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; @@ -19,6 +21,7 @@ import org.opendaylight.openflowplugin.api.statistics.MessageSpy; import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory; import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; +import org.opendaylight.openflowplugin.openflow.md.util.RpcInputOutputTuple; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; @@ -29,10 +32,13 @@ import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import com.google.common.base.Function; import com.google.common.base.Objects; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -51,7 +57,9 @@ public abstract class OFRpcTaskUtil { SwitchConnectionDistinguisher cookie) { Collection errors = null; if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) { - Future> barrierFuture = sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService()); + RpcInputOutputTuple>> sendBarrierRpc = + sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService()); + Future> barrierFuture = sendBarrierRpc.getOutput(); try { RpcResult barrierResult = barrierFuture.get( taskContext.getMaxTimeout(), taskContext.getMaxTimeoutUnit()); @@ -83,11 +91,14 @@ public abstract class OFRpcTaskUtil { * @param messageService * @return barrier response */ - private static Future> sendBarrier(SessionContext session, + protected static RpcInputOutputTuple>> sendBarrier(SessionContext session, SwitchConnectionDistinguisher cookie, IMessageDispatchService messageService) { BarrierInput barrierInput = MessageFactory.createBarrier( session.getFeatures().getVersion(), session.getNextXid()); - return messageService.barrier(barrierInput, cookie); + Future> barrierResult = messageService.barrier(barrierInput, cookie); + ListenableFuture> output = JdkFutureAdapters.listenInPoolThread(barrierResult); + + return new RpcInputOutputTuple<>(barrierInput, output); } /** @@ -123,10 +134,83 @@ public abstract class OFRpcTaskUtil { @Override public void onFailure(Throwable t) { + //TODO: good place to notify MD-SAL about errors task.getTaskContext().getMessageSpy().spyMessage( task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE); } }); } + + /** + * @param task of rpc + * @param originalResult + * @param notificationProviderService + * @param notificationComposer lazy notification composer + * @return chained result with barrier + */ + public static + ListenableFuture> chainFutureBarrier( + final OFRpcTask> task, + final ListenableFuture> originalResult) { + + ListenableFuture> chainResult = originalResult; + if (Objects.firstNonNull(task.isBarrier(), Boolean.FALSE)) { + + chainResult = Futures.transform(originalResult, new AsyncFunction, RpcResult>() { + @Override + public ListenableFuture> apply(final RpcResult input) throws Exception { + if (input.isSuccessful()) { + RpcInputOutputTuple>> sendBarrierRpc = sendBarrier( + task.getSession(), task.getCookie(), task.getMessageService()); + ListenableFuture> barrierTxResult = Futures.transform( + sendBarrierRpc.getOutput(), + transformBarrierToTransactionAware(input, sendBarrierRpc.getInput())); + return barrierTxResult; + } else { + return Futures.immediateFuture(input); + } + } + + }); + } + + return chainResult; + } + + /** + * @param originalInput + * @return + */ + protected static Function, RpcResult> transformBarrierToTransactionAware( + final RpcResult originalInput, final BarrierInput barrierInput) { + return new Function, RpcResult>() { + + @Override + public RpcResult apply(final RpcResult barrierResult) { + RpcResultBuilder rpcBuilder = null; + if (barrierResult.isSuccessful()) { + rpcBuilder = RpcResultBuilder.success(); + } else { + rpcBuilder = RpcResultBuilder.failed(); + RpcError rpcError = RpcResultBuilder.newWarning( + ErrorType.RPC, + OFConstants.ERROR_TAG_TIMEOUT, + "barrier sending failed", + OFConstants.APPLICATION_TAG, + "switch failed to respond on barrier request, barrier.xid = "+barrierInput.getXid(), + null); + List chainedErrors = new ArrayList<>(); + chainedErrors.add(rpcError); + chainedErrors.addAll(barrierResult.getErrors()); + rpcBuilder.withRpcErrors(chainedErrors); + } + + rpcBuilder.withResult(originalInput.getResult()); + + return rpcBuilder.build(); + } + + }; + } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/RpcInputOutputTuple.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/RpcInputOutputTuple.java new file mode 100644 index 0000000000..42360e9fc1 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/RpcInputOutputTuple.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.util; + +/** + * @param rpc input value type + * @param rpc output value type + */ +public class RpcInputOutputTuple { + + private IN input; + private OUT output; + + + /** + * @param input + * @param output + */ + public RpcInputOutputTuple(IN input, OUT output) { + this.input = input; + this.output = output; + } + + /** + * @return the input + */ + public IN getInput() { + return input; + } + /** + * @return the output + */ + public OUT getOutput() { + return output; + } + +} -- 2.36.6