2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.Sets;
12 import com.google.common.util.concurrent.AsyncFunction;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import java.math.BigInteger;
18 import java.net.Inet4Address;
19 import java.net.Inet6Address;
20 import java.net.InetAddress;
21 import java.net.InetSocketAddress;
22 import java.util.Arrays;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.Iterator;
26 import java.util.List;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ScheduledThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31 import javax.annotation.CheckForNull;
32 import javax.annotation.Nonnull;
33 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
34 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
35 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
36 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
39 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
40 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
41 import org.opendaylight.openflowplugin.api.ConnectionException;
42 import org.opendaylight.openflowplugin.api.OFConstants;
43 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
44 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
46 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
47 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
48 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
49 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
50 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
51 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
52 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
53 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
54 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
55 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
57 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
58 import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory;
59 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
60 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
61 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
62 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
63 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
64 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
65 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
66 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv6Address;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.CapabilitiesV10;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.MultipartReplyBody;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyGroupFeaturesCase;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyMeterFeaturesCase;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyPortDescCase;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyTableFeaturesCase;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDesc;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.group.features._case.MultipartReplyGroupFeatures;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.meter.features._case.MultipartReplyMeterFeatures;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.port.desc._case.MultipartReplyPortDesc;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.table.features._case.MultipartReplyTableFeatures;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
106 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcResult;
109 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
110 import org.slf4j.Logger;
111 import org.slf4j.LoggerFactory;
116 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper, AutoCloseable {
118 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
120 private static final long TICK_DURATION = 10; // 0.5 sec.
121 private final long globalNotificationQuota;
122 private ScheduledThreadPoolExecutor spyPool;
123 private final int spyRate = 10;
125 private final DataBroker dataBroker;
126 private final HashedWheelTimer hashedWheelTimer;
127 private TranslatorLibrary translatorLibrary;
128 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
129 private NotificationService notificationService;
130 private NotificationPublishService notificationPublishService;
132 private final Set<DeviceContext> deviceContexts = Sets.newConcurrentHashSet();
133 private final MessageIntelligenceAgency messageIntelligenceAgency;
135 private final long barrierNanos = TimeUnit.MILLISECONDS.toNanos(500);
136 private final int maxQueueDepth = 25600;
137 private final boolean switchFeaturesMandatory;
138 private final DeviceTransactionChainManagerProvider deviceTransactionChainManagerProvider;
139 private ExtensionConverterProvider extensionConverterProvider;
141 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
142 @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency,
143 final boolean switchFeaturesMandatory,
144 final long globalNotificationQuota) {
145 this.globalNotificationQuota = globalNotificationQuota;
146 this.dataBroker = Preconditions.checkNotNull(dataBroker);
147 hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500);
148 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
149 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
151 final NodesBuilder nodesBuilder = new NodesBuilder();
152 nodesBuilder.setNode(Collections.<Node>emptyList());
153 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
156 } catch (ExecutionException | InterruptedException e) {
157 LOG.error("Creation of node failed.", e);
158 throw new IllegalStateException(e);
161 this.messageIntelligenceAgency = messageIntelligenceAgency;
162 this.switchFeaturesMandatory = switchFeaturesMandatory;
163 deviceTransactionChainManagerProvider = new DeviceTransactionChainManagerProvider(dataBroker);
168 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
169 deviceInitPhaseHandler = handler;
173 public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
174 // final phase - we have to add new Device to MD-SAL DataStore
175 Preconditions.checkNotNull(deviceContext);
178 if (deviceContext.getDeviceState().getRole() != OfpRole.BECOMESLAVE) {
179 ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
180 deviceContext.onPublished();
185 ((DeviceContextImpl) deviceContext).cancelTransaction();
186 } catch (Exception e) {
187 //TODO: how can we avoid it. pingpong does not have cancel
188 LOG.debug("Expected Exception: Cancel Txn exception thrown for slaves", e);
193 } catch (final Exception e) {
194 LOG.warn("Node {} can not be add to OPERATIONAL DataStore yet because {} ", deviceContext.getDeviceState().getNodeId(), e.getMessage());
195 LOG.trace("Problem with add node {} to OPERATIONAL DataStore", deviceContext.getDeviceState().getNodeId(), e);
197 deviceContext.close();
198 } catch (final Exception e1) {
199 LOG.warn("Device context close FAIL - " + deviceContext.getDeviceState().getNodeId());
205 public void deviceConnected(@CheckForNull final ConnectionContext connectionContext) {
206 Preconditions.checkArgument(connectionContext != null);
208 ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler = new ReadyForNewTransactionChainHandlerImpl(this, connectionContext);
209 DeviceTransactionChainManagerProvider.TransactionChainManagerRegistration transactionChainManagerRegistration = deviceTransactionChainManagerProvider.provideTransactionChainManager(connectionContext);
210 TransactionChainManager transactionChainManager = transactionChainManagerRegistration.getTransactionChainManager();
212 if (transactionChainManagerRegistration.ownedByInvokingConnectionContext()) {
213 //this actually is new registration for currently processed connection context
214 initializeDeviceContext(connectionContext, transactionChainManager);
216 else if (TransactionChainManager.TransactionChainManagerStatus.WORKING.equals(transactionChainManager.getTransactionChainManagerStatus())) {
217 //this means there already exists connection described by same NodeId and it is not current connection contexts' registration
218 LOG.info("In deviceConnected, ownedByInvokingConnectionContext is false and TransactionChainManagerStatus.WORKING. Closing connection to device to start again.");
219 connectionContext.closeConnection(false);
221 else if (!transactionChainManager.attemptToRegisterHandler(readyForNewTransactionChainHandler)) {
222 //previous connection is shutting down, we will try to register handler listening on new transaction chain ready
223 // new connection wil be closed if handler registration fails
224 LOG.info("In deviceConnected, ownedByInvokingConnectionContext is false, TransactionChainManagerStatus is not shutting down or readyForNewTransactionChainHandler is null. " +
225 "Closing connection to device to start again.");
226 connectionContext.closeConnection(false);
230 private void initializeDeviceContext(final ConnectionContext connectionContext, final TransactionChainManager transactionChainManager) {
232 // Cache this for clarity
233 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
235 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
236 connectionAdapter.setPacketInFiltering(true);
238 final Short version = connectionContext.getFeatures().getVersion();
239 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version);
241 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
242 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
243 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
244 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
246 final NodeId nodeId = connectionContext.getNodeId();
247 final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), nodeId);
249 final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
250 hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, transactionChainManager);
251 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
252 deviceContext.setNotificationService(notificationService);
253 deviceContext.setNotificationPublishService(notificationPublishService);
254 final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId()).setNodeConnector(Collections.<NodeConnector>emptyList());
256 deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier(), nodeBuilder.build());
257 } catch (final Exception e) {
258 LOG.debug("Failed to write node to DS ", e);
261 connectionContext.setDeviceDisconnectedHandler(deviceContext);
262 deviceContext.addDeviceContextClosedHandler(this);
263 deviceContexts.add(deviceContext);
265 updatePacketInRateLimiters();
267 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
268 connectionAdapter, deviceContext);
269 connectionAdapter.setMessageListener(messageListener);
271 final ListenableFuture<List<RpcResult<List<MultipartReply>>>> deviceFeaturesFuture;
272 if (OFConstants.OFP_VERSION_1_0 == version) {
273 final CapabilitiesV10 capabilitiesV10 = connectionContext.getFeatures().getCapabilitiesV10();
275 DeviceStateUtil.setDeviceStateBasedOnV10Capabilities(deviceState, capabilitiesV10);
277 deviceFeaturesFuture = createDeviceFeaturesForOF10(deviceContext, deviceState);
278 // create empty tables after device description is processed
279 chainTableTrunkWriteOF10(deviceContext, deviceFeaturesFuture);
281 final short ofVersion = deviceContext.getDeviceState().getVersion();
282 final TranslatorKey translatorKey = new TranslatorKey(ofVersion, PortGrouping.class.getName());
283 final MessageTranslator<PortGrouping, FlowCapableNodeConnector> translator = deviceContext.oook().lookupTranslator(translatorKey);
284 final BigInteger dataPathId = deviceContext.getPrimaryConnectionContext().getFeatures().getDatapathId();
286 for (final PortGrouping port : connectionContext.getFeatures().getPhyPort()) {
287 final FlowCapableNodeConnector fcNodeConnector = translator.translate(port, deviceContext, null);
289 final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), port.getPortNo(), ofVersion);
290 final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder().setId(nodeConnectorId);
291 ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcNodeConnector);
292 ncBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
293 final NodeConnector connector = ncBuilder.build();
294 final InstanceIdentifier<NodeConnector> connectorII = deviceState.getNodeInstanceIdentifier().child(NodeConnector.class, connector.getKey());
296 deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, connectorII, connector);
297 } catch (final Exception e) {
298 LOG.debug("Failed to write node {} to DS ", deviceContext.getDeviceState().getNodeId().toString(), e);
302 } else if (OFConstants.OFP_VERSION_1_3 == version) {
303 final Capabilities capabilities = connectionContext.getFeatures().getCapabilities();
304 LOG.debug("Setting capabilities for device {}", deviceContext.getDeviceState().getNodeId());
305 DeviceStateUtil.setDeviceStateBasedOnV13Capabilities(deviceState, capabilities);
306 deviceFeaturesFuture = createDeviceFeaturesForOF13(deviceContext, deviceState);
308 deviceFeaturesFuture = Futures.immediateFailedFuture(new ConnectionException("Unsupported version " + version));
311 Futures.addCallback(deviceFeaturesFuture, new FutureCallback<List<RpcResult<List<MultipartReply>>>>() {
313 public void onSuccess(final List<RpcResult<List<MultipartReply>>> result) {
314 deviceCtxLevelUp(deviceContext);
318 public void onFailure(final Throwable t) {
319 LOG.trace("Device capabilities gathering future failed.");
320 LOG.trace("more info in exploration failure..", t);
322 deviceContext.close();
323 } catch (Exception e) {
324 LOG.warn("Failed to close device context: {}", deviceContext.getDeviceState().getNodeId(), t);
330 private void updatePacketInRateLimiters() {
331 synchronized (deviceContexts) {
332 final int deviceContextsSize = deviceContexts.size();
333 if (deviceContextsSize > 0) {
334 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
335 if (freshNotificationLimit < 100) {
336 freshNotificationLimit = 100;
338 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
339 for (DeviceContext deviceContext : deviceContexts) {
340 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
346 void deviceCtxLevelUp(final DeviceContext deviceContext) {
347 deviceContext.getDeviceState().setValid(true);
348 LOG.trace("Device context level up called.");
349 deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
352 static void chainTableTrunkWriteOF10(final DeviceContext deviceContext, final ListenableFuture<List<RpcResult<List<MultipartReply>>>> deviceFeaturesFuture) {
353 Futures.addCallback(deviceFeaturesFuture, new FutureCallback<List<RpcResult<List<MultipartReply>>>>() {
355 public void onSuccess(final List<RpcResult<List<MultipartReply>>> results) {
356 boolean allSucceeded = true;
357 for (final RpcResult<List<MultipartReply>> rpcResult : results) {
358 allSucceeded &= rpcResult.isSuccessful();
361 createEmptyFlowCapableNodeInDs(deviceContext);
362 makeEmptyTables(deviceContext, deviceContext.getDeviceState().getNodeInstanceIdentifier(),
363 deviceContext.getDeviceState().getFeatures().getTables());
368 public void onFailure(final Throwable t) {
375 static ListenableFuture<List<RpcResult<List<MultipartReply>>>> createDeviceFeaturesForOF10(final DeviceContext deviceContext,
376 final DeviceState deviceState) {
377 final ListenableFuture<RpcResult<List<MultipartReply>>> replyDesc = getNodeStaticInfo(MultipartType.OFPMPDESC,
379 deviceState.getNodeInstanceIdentifier(),
380 deviceState.getVersion());
382 return Futures.allAsList(Arrays.asList(replyDesc));
385 ListenableFuture<List<RpcResult<List<MultipartReply>>>> createDeviceFeaturesForOF13(final DeviceContext deviceContext,
386 final DeviceState deviceState) {
388 final ListenableFuture<RpcResult<List<MultipartReply>>> replyDesc = getNodeStaticInfo(MultipartType.OFPMPDESC,
390 deviceState.getNodeInstanceIdentifier(),
391 deviceState.getVersion());
393 //first process description reply, write data to DS and write consequent data if successful
394 return Futures.transform(replyDesc, new AsyncFunction<RpcResult<List<MultipartReply>>, List<RpcResult<List<MultipartReply>>>>() {
396 public ListenableFuture<List<RpcResult<List<MultipartReply>>>> apply(final RpcResult<List<MultipartReply>> rpcResult) throws Exception {
398 translateAndWriteReply(MultipartType.OFPMPDESC, deviceContext, deviceState.getNodeInstanceIdentifier(), rpcResult.getResult());
400 final ListenableFuture<RpcResult<List<MultipartReply>>> replyMeterFeature = getNodeStaticInfo(MultipartType.OFPMPMETERFEATURES,
402 deviceState.getNodeInstanceIdentifier(),
403 deviceState.getVersion());
405 createSuccessProcessingCallback(MultipartType.OFPMPMETERFEATURES,
407 deviceState.getNodeInstanceIdentifier(),
410 final ListenableFuture<RpcResult<List<MultipartReply>>> replyGroupFeatures = getNodeStaticInfo(MultipartType.OFPMPGROUPFEATURES,
412 deviceState.getNodeInstanceIdentifier(),
413 deviceState.getVersion());
414 createSuccessProcessingCallback(MultipartType.OFPMPGROUPFEATURES,
416 deviceState.getNodeInstanceIdentifier(),
419 final ListenableFuture<RpcResult<List<MultipartReply>>> replyTableFeatures = getNodeStaticInfo(MultipartType.OFPMPTABLEFEATURES,
421 deviceState.getNodeInstanceIdentifier(),
422 deviceState.getVersion());
423 createSuccessProcessingCallback(MultipartType.OFPMPTABLEFEATURES,
425 deviceState.getNodeInstanceIdentifier(),
428 final ListenableFuture<RpcResult<List<MultipartReply>>> replyPortDescription = getNodeStaticInfo(MultipartType.OFPMPPORTDESC,
430 deviceState.getNodeInstanceIdentifier(),
431 deviceState.getVersion());
432 createSuccessProcessingCallback(MultipartType.OFPMPPORTDESC,
434 deviceState.getNodeInstanceIdentifier(),
435 replyPortDescription);
436 if (switchFeaturesMandatory) {
437 return Futures.allAsList(Arrays.asList(
441 replyPortDescription));
443 return Futures.successfulAsList(Arrays.asList(
447 replyPortDescription));
455 public TranslatorLibrary oook() {
456 return translatorLibrary;
460 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
461 this.translatorLibrary = translatorLibrary;
464 static ListenableFuture<RpcResult<List<MultipartReply>>> getNodeStaticInfo(final MultipartType type, final DeviceContext deviceContext,
465 final InstanceIdentifier<Node> nodeII, final short version) {
467 final OutboundQueue queue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider();
469 final Long reserved = deviceContext.getReservedXid();
470 final RequestContext<List<MultipartReply>> requestContext = new AbstractRequestContext<List<MultipartReply>>(reserved) {
472 public void close() {
477 final Xid xid = requestContext.getXid();
479 LOG.trace("Hooking xid {} to device context - precaution.", reserved);
481 final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector(requestContext);
482 queue.commitEntry(xid.getValue(), MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type), new FutureCallback<OfHeader>() {
484 public void onSuccess(final OfHeader ofHeader) {
485 if (ofHeader instanceof MultipartReply) {
486 final MultipartReply multipartReply = (MultipartReply) ofHeader;
487 multiMsgCollector.addMultipartMsg(multipartReply);
488 } else if (null != ofHeader) {
489 LOG.info("Unexpected response type received {}.", ofHeader.getClass());
491 multiMsgCollector.endCollecting();
492 LOG.info("Response received is null.");
497 public void onFailure(final Throwable t) {
498 LOG.info("Fail response from OutboundQueue for multipart type {}.", type);
499 final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder.<List<MultipartReply>>failed().build();
500 requestContext.setResult(rpcResult);
501 if (MultipartType.OFPMPTABLEFEATURES.equals(type)) {
502 makeEmptyTables(deviceContext, nodeII, deviceContext.getPrimaryConnectionContext().getFeatures().getTables());
504 requestContext.close();
508 return requestContext.getFuture();
511 static void createSuccessProcessingCallback(final MultipartType type, final DeviceContext deviceContext, final InstanceIdentifier<Node> nodeII, final ListenableFuture<RpcResult<List<MultipartReply>>> requestContextFuture) {
512 Futures.addCallback(requestContextFuture, new FutureCallback<RpcResult<List<MultipartReply>>>() {
514 public void onSuccess(final RpcResult<List<MultipartReply>> rpcResult) {
515 final List<MultipartReply> result = rpcResult.getResult();
516 if (result != null) {
517 LOG.info("Static node {} info: {} collected", deviceContext.getDeviceState().getNodeId(), type);
518 translateAndWriteReply(type, deviceContext, nodeII, result);
520 final Iterator<RpcError> rpcErrorIterator = rpcResult.getErrors().iterator();
521 while (rpcErrorIterator.hasNext()) {
522 final RpcError rpcError = rpcErrorIterator.next();
523 LOG.info("Failed to retrieve static node {} info: {}", type, rpcError.getMessage());
524 if (null != rpcError.getCause()) {
525 LOG.trace("Detailed error:", rpcError.getCause());
528 if (MultipartType.OFPMPTABLEFEATURES.equals(type)) {
529 makeEmptyTables(deviceContext, nodeII, deviceContext.getPrimaryConnectionContext().getFeatures().getTables());
535 public void onFailure(final Throwable throwable) {
536 LOG.info("Request of type {} for static info of node {} failed.", type, nodeII);
541 // FIXME : remove after ovs tableFeatures fix
542 static void makeEmptyTables(final DeviceContext dContext, final InstanceIdentifier<Node> nodeII, final Short nrOfTables) {
543 LOG.debug("About to create {} empty tables.", nrOfTables);
544 for (int i = 0; i < nrOfTables; i++) {
545 final short tId = (short) i;
546 final InstanceIdentifier<Table> tableII = nodeII.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tId));
547 final TableBuilder tableBuilder = new TableBuilder().setId(tId).addAugmentation(FlowTableStatisticsData.class, new FlowTableStatisticsDataBuilder().build());
550 dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableII, tableBuilder.build());
551 } catch (final Exception e) {
552 LOG.debug("Failed to write node {} to DS ", dContext.getDeviceState().getNodeId().toString(), e);
558 private static IpAddress getIpAddressOf(final DeviceContext deviceContext) {
560 InetSocketAddress remoteAddress = deviceContext.getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress();
562 if (remoteAddress == null) {
563 LOG.warn("IP address of the node {} cannot be obtained. No connection with switch.", deviceContext.getDeviceState().getNodeId());
566 LOG.info("IP address of switch is :"+remoteAddress);
568 final InetAddress address = remoteAddress.getAddress();
569 String hostAddress = address.getHostAddress();
570 if (address instanceof Inet4Address) {
571 return new IpAddress(new Ipv4Address(hostAddress));
573 if (address instanceof Inet6Address) {
574 return new IpAddress(new Ipv6Address(hostAddress));
576 LOG.info("Illegal IP address {} of switch:{} ", address, deviceContext.getDeviceState().getNodeId());
581 static void translateAndWriteReply(final MultipartType type, final DeviceContext dContext,
582 final InstanceIdentifier<Node> nodeII, final Collection<MultipartReply> result) {
584 for (final MultipartReply reply : result) {
585 final MultipartReplyBody body = reply.getMultipartReplyBody();
588 Preconditions.checkArgument(body instanceof MultipartReplyDescCase);
589 final MultipartReplyDesc replyDesc = ((MultipartReplyDescCase) body).getMultipartReplyDesc();
590 final FlowCapableNode fcNode = NodeStaticReplyTranslatorUtil.nodeDescTranslator(replyDesc, getIpAddressOf(dContext));
591 final InstanceIdentifier<FlowCapableNode> fNodeII = nodeII.augmentation(FlowCapableNode.class);
592 dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, fNodeII, fcNode);
595 case OFPMPTABLEFEATURES:
596 Preconditions.checkArgument(body instanceof MultipartReplyTableFeaturesCase);
597 final MultipartReplyTableFeatures tableFeatures = ((MultipartReplyTableFeaturesCase) body).getMultipartReplyTableFeatures();
598 final List<TableFeatures> tables = NodeStaticReplyTranslatorUtil.nodeTableFeatureTranslator(tableFeatures);
599 for (final TableFeatures table : tables) {
600 final Short tableId = table.getTableId();
601 final InstanceIdentifier<Table> tableII = nodeII.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId));
602 final TableBuilder tableBuilder = new TableBuilder().setId(tableId).setTableFeatures(Collections.singletonList(table));
603 tableBuilder.addAugmentation(FlowTableStatisticsData.class, new FlowTableStatisticsDataBuilder().build());
604 dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableII, tableBuilder.build());
608 case OFPMPMETERFEATURES:
609 Preconditions.checkArgument(body instanceof MultipartReplyMeterFeaturesCase);
610 final MultipartReplyMeterFeatures meterFeatures = ((MultipartReplyMeterFeaturesCase) body).getMultipartReplyMeterFeatures();
611 final NodeMeterFeatures mFeature = NodeStaticReplyTranslatorUtil.nodeMeterFeatureTranslator(meterFeatures);
612 final InstanceIdentifier<NodeMeterFeatures> mFeatureII = nodeII.augmentation(NodeMeterFeatures.class);
613 dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, mFeatureII, mFeature);
614 if (0L < mFeature.getMeterFeatures().getMaxMeter().getValue()) {
615 dContext.getDeviceState().setMeterAvailable(true);
619 case OFPMPGROUPFEATURES:
620 Preconditions.checkArgument(body instanceof MultipartReplyGroupFeaturesCase);
621 final MultipartReplyGroupFeatures groupFeatures = ((MultipartReplyGroupFeaturesCase) body).getMultipartReplyGroupFeatures();
622 final NodeGroupFeatures gFeature = NodeStaticReplyTranslatorUtil.nodeGroupFeatureTranslator(groupFeatures);
623 final InstanceIdentifier<NodeGroupFeatures> gFeatureII = nodeII.augmentation(NodeGroupFeatures.class);
624 dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, gFeatureII, gFeature);
628 Preconditions.checkArgument(body instanceof MultipartReplyPortDescCase);
629 final MultipartReplyPortDesc portDesc = ((MultipartReplyPortDescCase) body).getMultipartReplyPortDesc();
630 for (final PortGrouping port : portDesc.getPorts()) {
631 final short ofVersion = dContext.getDeviceState().getVersion();
632 final TranslatorKey translatorKey = new TranslatorKey(ofVersion, PortGrouping.class.getName());
633 final MessageTranslator<PortGrouping, FlowCapableNodeConnector> translator = dContext.oook().lookupTranslator(translatorKey);
634 final FlowCapableNodeConnector fcNodeConnector = translator.translate(port, dContext, null);
636 final BigInteger dataPathId = dContext.getPrimaryConnectionContext().getFeatures().getDatapathId();
637 final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), port.getPortNo(), ofVersion);
638 final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder().setId(nodeConnectorId);
639 ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcNodeConnector);
641 ncBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
642 final NodeConnector connector = ncBuilder.build();
644 final InstanceIdentifier<NodeConnector> connectorII = nodeII.child(NodeConnector.class, connector.getKey());
645 dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, connectorII, connector);
651 throw new IllegalArgumentException("Unnexpected MultipartType " + type);
654 } catch (final Exception e) {
655 LOG.debug("Failed to write node {} to DS ", dContext.getDeviceState().getNodeId().toString(), e);
660 public void setNotificationService(final NotificationService notificationServiceParam) {
661 notificationService = notificationServiceParam;
665 public void setNotificationPublishService(final NotificationPublishService notificationService) {
666 notificationPublishService = notificationService;
670 public void close() throws Exception {
671 for (final DeviceContext deviceContext : deviceContexts) {
672 deviceContext.close();
676 static void createEmptyFlowCapableNodeInDs(final DeviceContext deviceContext) {
677 final FlowCapableNodeBuilder flowCapableNodeBuilder = new FlowCapableNodeBuilder();
678 final InstanceIdentifier<FlowCapableNode> fNodeII = deviceContext.getDeviceState().getNodeInstanceIdentifier().augmentation(FlowCapableNode.class);
680 deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, fNodeII, flowCapableNodeBuilder.build());
681 } catch (final Exception e) {
682 LOG.debug("Failed to write node {} to DS ", deviceContext.getDeviceState().getNodeId().toString(), e);
687 public void onDeviceContextClosed(final DeviceContext deviceContext) {
688 deviceContexts.remove(deviceContext);
689 updatePacketInRateLimiters();
693 public void initialize() {
694 spyPool = new ScheduledThreadPoolExecutor(1);
695 spyPool.scheduleAtFixedRate(messageIntelligenceAgency, spyRate, spyRate, TimeUnit.SECONDS);
699 public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
700 this.extensionConverterProvider = extensionConverterProvider;
704 public ExtensionConverterProvider getExtensionConverterProvider() {
705 return extensionConverterProvider;