X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceContextImpl.java;h=01227abbb4d5f966e5234e882878d9c097678b7f;hb=fc2c4994e93731fa8c4cf685e3896bd63856e62b;hp=78d772c36da17694899edfa101d4a31382819ad8;hpb=bde7aff07dcf6a01cd19074339db65732a9a7e12;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index 78d772c36d..01227abbb4 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -7,41 +7,55 @@ */ package org.opendaylight.openflowplugin.impl.device; +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; + +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorBuilder; +import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil; +import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; import java.math.BigInteger; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; - +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; - -import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadTransaction; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceReplyProcessor; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.Xid; -import org.opendaylight.openflowplugin.api.openflow.device.XidGenerator; import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException; +import org.opendaylight.openflowplugin.api.openflow.device.listener.OpenflowMessageListenerFacade; +import org.opendaylight.openflowplugin.api.openflow.flow.registry.DeviceFlowRegistry; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; -import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion; -import org.opendaylight.openflowplugin.impl.device.translator.PacketReceivedTranslator; -import org.opendaylight.openflowplugin.impl.device.translator.PortUpdateTranslator; +import org.opendaylight.openflowplugin.impl.flow.registry.DeviceFlowRegistryImpl; +import org.opendaylight.openflowplugin.impl.translator.PacketReceivedTranslator; import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage; @@ -49,20 +63,17 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.Pa import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableFeatures; import org.opendaylight.yangtools.yang.binding.ChildOf; import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.SettableFuture; - /** * */ -public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor, TransactionChainListener { +public class DeviceContextImpl implements DeviceContext { private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class); @@ -70,23 +81,39 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor, T private final DeviceState deviceState; private final DataBroker dataBroker; private final XidGenerator xidGenerator; - private Map requests = - new HashMap(); + private final HashedWheelTimer hashedWheelTimer; + private Map requests = new TreeMap<>(); private final Map auxiliaryConnectionContexts; - private BindingTransactionChain txChainFactory; + private final TransactionChainManager txChainManager; private TranslatorLibrary translatorLibrary; + private OpenflowMessageListenerFacade openflowMessageListenerFacade; + private DeviceFlowRegistry deviceFlowRegistry; + private Timeout barrierTaskTimeout; + private NotificationProviderService notificationService; @VisibleForTesting DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext, - @Nonnull final DeviceState deviceState, @Nonnull final DataBroker dataBroker) { + @Nonnull final DeviceState deviceState, @Nonnull final DataBroker dataBroker, + @Nonnull final HashedWheelTimer hashedWheelTimer) { this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext); this.deviceState = Preconditions.checkNotNull(deviceState); this.dataBroker = Preconditions.checkNotNull(dataBroker); + this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer); xidGenerator = new XidGenerator(); - txChainFactory = dataBroker.createTransactionChain(DeviceContextImpl.this); + txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L); auxiliaryConnectionContexts = new HashMap<>(); requests = new HashMap<>(); + deviceFlowRegistry = new DeviceFlowRegistryImpl(); + } + + /** + * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish" + * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec). + */ + void submitTransaction() { + txChainManager.submitTransaction(); + txChainManager.enableCounter(); } @Override @@ -104,7 +131,6 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor, T @Override public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) { // TODO Auto-generated method stub - } @Override @@ -118,10 +144,14 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor, T } @Override - public WriteTransaction getWriteTransaction() { - // FIXME : we wana to have only one WriteTransaction exposed in one time - // so thing about blocking notification mechanism for wait to new transaction - return txChainFactory.newWriteOnlyTransaction(); + public void writeToTransaction(final LogicalDatastoreType store, + final InstanceIdentifier path, final T data) { + txChainManager.writeToTransaction(store, path, data); + } + + @Override + public void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier path) { + txChainManager.addDeleteOperationTotTxChain(store, path); } @Override @@ -145,116 +175,212 @@ public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor, T return xidGenerator.generate(); } + @Override public Map getRequests() { return requests; } @Override - public void hookRequestCtx(Xid xid, RequestContext requestFutureContext) { + public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) { // TODO Auto-generated method stub requests.put(xid.getValue(), requestFutureContext); } @Override - public void processReply(OfHeader ofHeader) { - RequestContext requestContext = getRequests().get(ofHeader.getXid()); - SettableFuture replyFuture = requestContext.getFuture(); - getRequests().remove(ofHeader.getXid()); - RpcResult rpcResult; - - if(ofHeader instanceof Error) { - Error error = (Error) ofHeader; - String message = "Operation on device failed"; - rpcResult= RpcResultBuilder - .failed() - .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error)) - .build(); + public void attachOpenflowMessageListener(final OpenflowMessageListenerFacade openflowMessageListenerFacade) { + this.openflowMessageListenerFacade = openflowMessageListenerFacade; + primaryConnectionContext.getConnectionAdapter().setMessageListener(openflowMessageListenerFacade); + } + + @Override + public OpenflowMessageListenerFacade getOpenflowMessageListenerFacade() { + return openflowMessageListenerFacade; + } + + @Override + public DeviceFlowRegistry getDeviceFlowRegistry() { + return deviceFlowRegistry; + } + + @Override + public void processReply(final OfHeader ofHeader) { + final RequestContext requestContext = getRequests().get(ofHeader.getXid()); + if (null != requestContext) { + final SettableFuture replyFuture = requestContext.getFuture(); + getRequests().remove(ofHeader.getXid()); + RpcResult rpcResult; + if (ofHeader instanceof Error) { + final Error error = (Error) ofHeader; + final String message = "Operation on device failed"; + rpcResult = RpcResultBuilder + .failed() + .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error)) + .build(); + } else { + rpcResult = RpcResultBuilder + .success() + .withResult(ofHeader) + .build(); + } + + replyFuture.set(rpcResult); + try { + requestContext.close(); + } catch (final Exception e) { + LOG.error("Closing RequestContext failed: ", e); + } } else { - rpcResult= RpcResultBuilder - .success() - .withResult(ofHeader) - .build(); + LOG.error("Can't find request context registered for xid : {}", ofHeader.getXid()); } + } - replyFuture.set(rpcResult); - try { - requestContext.close(); - } catch (Exception e) { - LOG.error("Closing RequestContext failed: ", e); + @Override + public void processReply(final Xid xid, final List ofHeaderList) { + final RequestContext requestContext = getRequests().get(xid.getValue()); + if (null != requestContext) { + final SettableFuture replyFuture = requestContext.getFuture(); + getRequests().remove(xid.getValue()); + final RpcResult> rpcResult = RpcResultBuilder + .>success() + .withResult(ofHeaderList) + .build(); + replyFuture.set(rpcResult); + try { + requestContext.close(); + } catch (final Exception e) { + LOG.error("Closing RequestContext failed: ", e); + } + } else { + LOG.error("Can't find request context registered for xid : {}", xid.getValue()); } } @Override - public void processReply(Xid xid, List ofHeaderList) { - RequestContext requestContext = getRequests().get(xid.getValue()); - SettableFuture replyFuture = requestContext.getFuture(); - getRequests().remove(xid.getValue()); - RpcResult> rpcResult= RpcResultBuilder - .>success() - .withResult(ofHeaderList) - .build(); - replyFuture.set(rpcResult); - try { - requestContext.close(); - } catch (Exception e) { - LOG.error("Closing RequestContext failed: ", e); + public void processException(final Xid xid, final DeviceDataException deviceDataException) { + + LOG.trace("Processing exception for xid : {}", xid.getValue()); + + final RequestContext requestContext = getRequests().get(xid.getValue()); + + if (null != requestContext) { + final SettableFuture replyFuture = requestContext.getFuture(); + getRequests().remove(xid.getValue()); + final RpcResult> rpcResult = RpcResultBuilder + .>failed() + .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException) + .build(); + replyFuture.set(rpcResult); + try { + requestContext.close(); + } catch (final Exception e) { + LOG.error("Closing RequestContext failed: ", e); + } + } else { + LOG.error("Can't find request context registered for xid : {}", xid.getValue()); } } @Override - public void processException(Xid xid, DeviceDataException deviceDataException) { - RequestContext requestContext = getRequests().get(xid.getValue()); - - SettableFuture replyFuture = requestContext.getFuture(); - getRequests().remove(xid.getValue()); - RpcResult> rpcResult= RpcResultBuilder - .>failed() - .withError(RpcError.ErrorType.APPLICATION, "Message processing failed", deviceDataException) - .build(); - replyFuture.set(rpcResult); - try { - requestContext.close(); - } catch (Exception e) { - LOG.error("Closing RequestContext failed: ", e); + public void processFlowRemovedMessage(final FlowRemoved flowRemoved) { + //TODO: will be defined later + } + + @Override + public void processPortStatusMessage(final PortStatusMessage portStatus) { + final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortStatusMessage.class.getName()); + final MessageTranslator messageTranslator = translatorLibrary.lookupTranslator(translatorKey); + FlowCapableNodeConnector flowCapableNodeConnector = messageTranslator.translate(portStatus, this, null); + + final KeyedInstanceIdentifier iiToNodeConnector = + provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion()); + if (portStatus.getReason().equals(PortReason.OFPPRADD) ) { + // because of ADD status node connector has to be created + createNodeConnectorInDS(iiToNodeConnector); + } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE) ) { + //only put operation over datastore is available. therefore delete is + //inserting of empty FlowCapableNodeConnector + flowCapableNodeConnector = new FlowCapableNodeConnectorBuilder().build(); } + + InstanceIdentifier iiToFlowCapableNodeConnector = iiToNodeConnector.augmentation(FlowCapableNodeConnector.class); + writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToFlowCapableNodeConnector, flowCapableNodeConnector); + } + + private void createNodeConnectorInDS(final KeyedInstanceIdentifier iiToNodeConnector) { + writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey()).build()); + } + + private KeyedInstanceIdentifier provideIIToNodes() { + return InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(deviceState.getNodeId())); + } + + private KeyedInstanceIdentifier provideIIToNodeConnector(final Long portNo, final Short version) { + final KeyedInstanceIdentifier iiToNodes = provideIIToNodes(); + final NodeConnectorId nodeConnectorId = InventoryDataServiceUtil.nodeConnectorIdfromDatapathPortNo( + deviceState.getFeatures().getDatapathId(), portNo, OpenflowVersion.get(version)); + final NodeConnectorKey nodeConnectorKey = new NodeConnectorKey(new NodeConnectorId(nodeConnectorId)); + return iiToNodes.child(NodeConnector.class, nodeConnectorKey); + } @Override - public void processFlowRemovedMessage(FlowRemoved flowRemoved) { - //TODO: will be defined later + public void processPacketInMessage(final PacketInMessage packetInMessage) { + final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketReceivedTranslator.class.getName()); + final MessageTranslator messageTranslator = translatorLibrary.lookupTranslator(translatorKey); + final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null); + notificationService.publish(packetReceived); } @Override - public void processPortStatusMessage(PortStatusMessage portStatus) { - TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortUpdateTranslator.class.getName()); - MessageTranslator messageTranslator = translatorLibrary.lookupTranslator(translatorKey); - FlowCapableNodeConnector nodeConnector = messageTranslator.translate(portStatus, this, null); - //TODO write into datastore + public TranslatorLibrary oook() { + return translatorLibrary; } @Override - public void processPacketInMessage(PacketInMessage packetInMessage) { - TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketReceivedTranslator.class.getName()); - MessageTranslator messageTranslator = translatorLibrary.lookupTranslator(translatorKey); - PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null); - //TODO publish to MD-SAL + public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) { + this.translatorLibrary = translatorLibrary; } @Override - public void onTransactionChainFailed(TransactionChain chain, - AsyncTransaction transaction, Throwable cause) { - txChainFactory.close(); - txChainFactory = dataBroker.createTransactionChain(DeviceContextImpl.this); + public HashedWheelTimer getTimer() { + return hashedWheelTimer; + } + + + private class XidGenerator { + + private final AtomicLong xid = new AtomicLong(0); + + public Xid generate() { + return new Xid(xid.incrementAndGet()); + } + } + @Override + public RequestContext extractNextOutstandingMessage(final long barrierXid) { + RequestContext nextMessage = null; + final Iterator keyIterator = requests.keySet().iterator(); + if (keyIterator.hasNext()) { + final Long oldestXid = keyIterator.next(); + if (oldestXid < barrierXid) { + nextMessage = requests.remove(oldestXid); + } + } + return nextMessage; } @Override - public void onTransactionChainSuccessful(TransactionChain chain) { - // NOOP - only yet, here is probably place for notification to get new WriteTransaction + public void setCurrentBarrierTimeout(Timeout timeout) { + barrierTaskTimeout = timeout; + } + @Override + public Timeout getBarrierTaskTimeout() { + return barrierTaskTimeout; } - public void setTranslatorLibrary(TranslatorLibrary translatorLibrary) { - this.translatorLibrary = translatorLibrary; + @Override + public void setNotificationService(final NotificationProviderService notificationServiceParam) { + notificationService = notificationServiceParam; } }