X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceContextImpl.java;h=b49773ac3f4a8b62ce1ff9e5e8582183ba723c33;hb=f55fba5cc124ab2ad5990e60c492f9e92e08085f;hp=d84a5e8e4930a75cf29b018bba90702682315710;hpb=38eaf1d1424b401f8614c18c25a7bf2e1346f88d;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index d84a5e8e49..b49773ac3f 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -7,25 +7,23 @@ */ package org.opendaylight.openflowplugin.impl.device; -import javax.annotation.Nonnull; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; import java.math.BigInteger; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.SettableFuture; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; +import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; @@ -34,40 +32,51 @@ import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.Xid; import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException; -import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor; +import org.opendaylight.openflowplugin.api.openflow.device.listener.OpenflowMessageListenerFacade; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; -import org.opendaylight.openflowplugin.impl.translator.PacketReceivedTranslator; +import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry; +import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry; +import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry; +import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; +import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil; +import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl; +import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl; +import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl; import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived; +import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; +import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableFeatures; import org.opendaylight.yangtools.yang.binding.ChildOf; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; -import java.math.BigInteger; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * */ -public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor { +public class DeviceContextImpl implements DeviceContext { private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class); @@ -81,19 +90,33 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor { private final Map auxiliaryConnectionContexts; private final TransactionChainManager txChainManager; private TranslatorLibrary translatorLibrary; + private OpenflowMessageListenerFacade openflowMessageListenerFacade; + private final DeviceFlowRegistry deviceFlowRegistry; + private final DeviceGroupRegistry deviceGroupRegistry; + private final DeviceMeterRegistry deviceMeterRegistry; + private Timeout barrierTaskTimeout; + private NotificationProviderService notificationService; + private final MessageSpy messageSpy; + @VisibleForTesting DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext, - @Nonnull final DeviceState deviceState, @Nonnull final DataBroker dataBroker, - @Nonnull final HashedWheelTimer hashedWheelTimer) { + @Nonnull final DeviceState deviceState, + @Nonnull final DataBroker dataBroker, + @Nonnull final HashedWheelTimer hashedWheelTimer, + @Nonnull final MessageSpy _messageSpy) { this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext); this.deviceState = Preconditions.checkNotNull(deviceState); this.dataBroker = Preconditions.checkNotNull(dataBroker); this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer); xidGenerator = new XidGenerator(); - txChainManager = new TransactionChainManager(dataBroker, 500L); + txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L); auxiliaryConnectionContexts = new HashMap<>(); requests = new HashMap<>(); + deviceFlowRegistry = new DeviceFlowRegistryImpl(); + deviceGroupRegistry = new DeviceGroupRegistryImpl(); + deviceMeterRegistry = new DeviceMeterRegistryImpl(); + messageSpy = _messageSpy; } /** @@ -103,12 +126,6 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor { void submitTransaction() { txChainManager.submitTransaction(); txChainManager.enableCounter(); - hashedWheelTimer.newTimeout(new TimerTask() { - @Override - public void run(final Timeout timeout) throws Exception { - submitTransaction(); - } - }, 0, TimeUnit.MILLISECONDS); } @Override @@ -144,6 +161,11 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor { txChainManager.writeToTransaction(store, path, data); } + @Override + public void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier path) { + txChainManager.addDeleteOperationTotTxChain(store, path); + } + @Override public TableFeatures getCapabilities() { // TODO Auto-generated method stub @@ -165,59 +187,110 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor { return xidGenerator.generate(); } - public Map getRequests() { - return requests; + @Override + public RequestContext lookupRequest(Xid xid) { + return requests.get(xid.getValue()); + } + + @Override + public int getNumberOfOutstandingRequests() { + return requests.size(); } @Override public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) { - // TODO Auto-generated method stub requests.put(xid.getValue(), requestFutureContext); } + @Override + public RequestContext unhookRequestCtx(Xid xid) { + return requests.remove(xid.getValue()); + } + + @Override + public void attachOpenflowMessageListener(final OpenflowMessageListenerFacade openflowMessageListenerFacade) { + this.openflowMessageListenerFacade = openflowMessageListenerFacade; + primaryConnectionContext.getConnectionAdapter().setMessageListener(openflowMessageListenerFacade); + } + + @Override + public OpenflowMessageListenerFacade getOpenflowMessageListenerFacade() { + return openflowMessageListenerFacade; + } + + @Override + public DeviceFlowRegistry getDeviceFlowRegistry() { + return deviceFlowRegistry; + } + + @Override + public DeviceGroupRegistry getDeviceGroupRegistry() { + return deviceGroupRegistry; + } + + @Override + public DeviceMeterRegistry getDeviceMeterRegistry() { + return deviceMeterRegistry; + } + @Override public void processReply(final OfHeader ofHeader) { - final RequestContext requestContext = getRequests().get(ofHeader.getXid()); - final SettableFuture replyFuture = requestContext.getFuture(); - getRequests().remove(ofHeader.getXid()); - RpcResult rpcResult; - - if (ofHeader instanceof Error) { - final Error error = (Error) ofHeader; - final String message = "Operation on device failed"; - rpcResult = RpcResultBuilder - .failed() - .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error)) - .build(); - } else { - rpcResult = RpcResultBuilder - .success() - .withResult(ofHeader) - .build(); - } + final RequestContext requestContext = requests.get(ofHeader.getXid()); + if (null != requestContext) { + final SettableFuture replyFuture = requestContext.getFuture(); + requests.remove(ofHeader.getXid()); + RpcResult rpcResult; + if (ofHeader instanceof Error) { + final Error error = (Error) ofHeader; + final String message = "Operation on device failed"; + rpcResult = RpcResultBuilder + .failed() + .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error)) + .build(); + messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); + } else { + rpcResult = RpcResultBuilder + .success() + .withResult(ofHeader) + .build(); + messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + } - replyFuture.set(rpcResult); - try { - requestContext.close(); - } catch (final Exception e) { - LOG.error("Closing RequestContext failed: ", e); + replyFuture.set(rpcResult); + try { + requestContext.close(); + } catch (final Exception e) { + LOG.error("Closing RequestContext failed: ", e); + } + } else { + LOG.error("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(), + getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress()); } } @Override public void processReply(final Xid xid, final List ofHeaderList) { - final RequestContext requestContext = getRequests().get(xid.getValue()); - final SettableFuture replyFuture = requestContext.getFuture(); - getRequests().remove(xid.getValue()); - final RpcResult> rpcResult = RpcResultBuilder - .>success() - .withResult(ofHeaderList) - .build(); - replyFuture.set(rpcResult); - try { - requestContext.close(); - } catch (final Exception e) { - LOG.error("Closing RequestContext failed: ", e); + final RequestContext requestContext = requests.get(xid.getValue()); + if (null != requestContext) { + final SettableFuture replyFuture = requestContext.getFuture(); + requests.remove(xid.getValue()); + final RpcResult> rpcResult = RpcResultBuilder + .>success() + .withResult(ofHeaderList) + .build(); + replyFuture.set(rpcResult); + for (MultipartReply multipartReply : ofHeaderList) { + messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); + } + + try { + requestContext.close(); + } catch (final Exception e) { + LOG.error("Closing RequestContext failed: ", e); + } + } else { + LOG.error("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(), + getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress()); } } @@ -226,23 +299,24 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor { LOG.trace("Processing exception for xid : {}", xid.getValue()); - final RequestContext requestContext = getRequests().get(xid.getValue()); + final RequestContext requestContext = requests.get(xid.getValue()); if (null != requestContext) { final SettableFuture replyFuture = requestContext.getFuture(); - getRequests().remove(xid.getValue()); + requests.remove(xid.getValue()); final RpcResult> rpcResult = RpcResultBuilder .>failed() .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException) .build(); replyFuture.set(rpcResult); + messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); try { requestContext.close(); } catch (final Exception e) { LOG.error("Closing RequestContext failed: ", e); } } else { - LOG.error("Can't find request context registered for xid : {}", xid); + LOG.error("Can't find request context registered for xid : {}", xid.getValue()); } } @@ -253,18 +327,37 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor { @Override public void processPortStatusMessage(final PortStatusMessage portStatus) { - final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortStatusMessage.class.getName()); - final MessageTranslator messageTranslator = translatorLibrary.lookupTranslator(translatorKey); - final FlowCapableNodeConnector nodeConnector = messageTranslator.translate(portStatus, this, null); - //TODO write into datastore + messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortGrouping.class.getName()); + final MessageTranslator messageTranslator = translatorLibrary.lookupTranslator(translatorKey); + final FlowCapableNodeConnector flowCapableNodeConnector = messageTranslator.translate(portStatus, this, null); + + final KeyedInstanceIdentifier iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion()); + if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) { + // because of ADD status node connector has to be created + final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey()); + nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build()); + nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector); + writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build()); + } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) { + addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector); + } + } + + private KeyedInstanceIdentifier provideIIToNodeConnector(final long portNo, final short version) { + final InstanceIdentifier iiToNodes = deviceState.getNodeInstanceIdentifier(); + final BigInteger dataPathId = deviceState.getFeatures().getDatapathId(); + final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version); + return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId)); } @Override public void processPacketInMessage(final PacketInMessage packetInMessage) { - final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketReceivedTranslator.class.getName()); + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName()); final MessageTranslator messageTranslator = translatorLibrary.lookupTranslator(translatorKey); final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null); - //TODO publish to MD-SAL + notificationService.publish(packetReceived); } @Override @@ -277,6 +370,18 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor { this.translatorLibrary = translatorLibrary; } + @Override + public HashedWheelTimer getTimer() { + return hashedWheelTimer; + } + + @Override + public void close() throws Exception { + for (Map.Entry entry : requests.entrySet()){ + entry.getValue().close(); + } + } + private class XidGenerator { @@ -288,15 +393,35 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor { } @Override - public RequestContext extractNextOutstandingMessage(long barrierXid) { + public RequestContext extractNextOutstandingMessage(final long barrierXid) { RequestContext nextMessage = null; - Iterator keyIterator = requests.keySet().iterator(); + final Iterator keyIterator = requests.keySet().iterator(); if (keyIterator.hasNext()) { - Long oldestXid = keyIterator.next(); + final Long oldestXid = keyIterator.next(); if (oldestXid < barrierXid) { nextMessage = requests.remove(oldestXid); } } return nextMessage; } + + @Override + public void setCurrentBarrierTimeout(final Timeout timeout) { + barrierTaskTimeout = timeout; + } + + @Override + public Timeout getBarrierTaskTimeout() { + return barrierTaskTimeout; + } + + @Override + public void setNotificationService(final NotificationProviderService notificationServiceParam) { + notificationService = notificationServiceParam; + } + + @Override + public MessageSpy getMessageSpy() { + return messageSpy; + } }