Merge "BUG-2188: To populate the port_number of switches - yang model"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.Sets;
12 import com.google.common.util.concurrent.AsyncFunction;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import java.math.BigInteger;
18 import java.net.Inet4Address;
19 import java.net.Inet6Address;
20 import java.net.InetAddress;
21 import java.net.InetSocketAddress;
22 import java.util.Arrays;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ScheduledThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31 import javax.annotation.CheckForNull;
32 import javax.annotation.Nonnull;
33 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
34 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
35 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
36 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
39 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
40 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
41 import org.opendaylight.openflowplugin.api.ConnectionException;
42 import org.opendaylight.openflowplugin.api.OFConstants;
43 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
44 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
46 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
47 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
48 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
49 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
50 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
51 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
52 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
53 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
54 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
55 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
57 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
58 import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory;
59 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
60 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
61 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
62 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
63 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
64 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
65 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
66 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv6Address;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.CapabilitiesV10;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.MultipartReplyBody;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyGroupFeaturesCase;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyMeterFeaturesCase;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyPortDescCase;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyTableFeaturesCase;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDesc;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.group.features._case.MultipartReplyGroupFeatures;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.meter.features._case.MultipartReplyMeterFeatures;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.port.desc._case.MultipartReplyPortDesc;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.table.features._case.MultipartReplyTableFeatures;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
106 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcResult;
109 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
110 import org.slf4j.Logger;
111 import org.slf4j.LoggerFactory;
112
113 /**
114  *
115  */
116 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper, AutoCloseable {
117
118     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
119
120     private static final long TICK_DURATION = 10; // 0.5 sec.
121     private final long globalNotificationQuota;
122     private ScheduledThreadPoolExecutor spyPool;
123     private final int spyRate = 10;
124
125     private final DataBroker dataBroker;
126     private final HashedWheelTimer hashedWheelTimer;
127     private TranslatorLibrary translatorLibrary;
128     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
129     private NotificationService notificationService;
130     private NotificationPublishService notificationPublishService;
131
132     private final Set<DeviceContext> deviceContexts = Sets.newConcurrentHashSet();
133     private final MessageIntelligenceAgency messageIntelligenceAgency;
134
135     private final long barrierNanos = TimeUnit.MILLISECONDS.toNanos(500);
136     private final int maxQueueDepth = 25600;
137     private final boolean switchFeaturesMandatory;
138     private final DeviceTransactionChainManagerProvider deviceTransactionChainManagerProvider;
139     private ExtensionConverterProvider extensionConverterProvider;
140
141     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
142                              @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency,
143                              final boolean switchFeaturesMandatory,
144                              final long globalNotificationQuota) {
145         this.globalNotificationQuota = globalNotificationQuota;
146         this.dataBroker = Preconditions.checkNotNull(dataBroker);
147         hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500);
148         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
149         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
150
151         final NodesBuilder nodesBuilder = new NodesBuilder();
152         nodesBuilder.setNode(Collections.<Node>emptyList());
153         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
154         try {
155             tx.submit().get();
156         } catch (ExecutionException | InterruptedException e) {
157             LOG.error("Creation of node failed.", e);
158             throw new IllegalStateException(e);
159         }
160
161         this.messageIntelligenceAgency = messageIntelligenceAgency;
162         this.switchFeaturesMandatory = switchFeaturesMandatory;
163         deviceTransactionChainManagerProvider = new DeviceTransactionChainManagerProvider(dataBroker);
164     }
165
166
167     @Override
168     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
169         deviceInitPhaseHandler = handler;
170     }
171
172     @Override
173     public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
174         // final phase - we have to add new Device to MD-SAL DataStore
175         Preconditions.checkNotNull(deviceContext);
176         try {
177
178             if (deviceContext.getDeviceState().getRole() != OfpRole.BECOMESLAVE) {
179                 ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
180                 deviceContext.onPublished();
181
182             } else {
183                 //if role = slave
184                 try {
185                     ((DeviceContextImpl) deviceContext).cancelTransaction();
186                 } catch (Exception e) {
187                     //TODO: how can we avoid it. pingpong does not have cancel
188                     LOG.debug("Expected Exception: Cancel Txn exception thrown for slaves", e);
189                 }
190
191             }
192
193         } catch (final Exception e) {
194             LOG.warn("Node {} can not be add to OPERATIONAL DataStore yet because {} ", deviceContext.getDeviceState().getNodeId(), e.getMessage());
195             LOG.trace("Problem with add node {} to OPERATIONAL DataStore", deviceContext.getDeviceState().getNodeId(), e);
196             try {
197                 deviceContext.close();
198             } catch (final Exception e1) {
199                 LOG.warn("Device context close FAIL - " + deviceContext.getDeviceState().getNodeId());
200             }
201         }
202     }
203
204     @Override
205     public void deviceConnected(@CheckForNull final ConnectionContext connectionContext) {
206         Preconditions.checkArgument(connectionContext != null);
207
208         ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler = new ReadyForNewTransactionChainHandlerImpl(this, connectionContext);
209         DeviceTransactionChainManagerProvider.TransactionChainManagerRegistration transactionChainManagerRegistration = deviceTransactionChainManagerProvider.provideTransactionChainManager(connectionContext);
210         TransactionChainManager transactionChainManager = transactionChainManagerRegistration.getTransactionChainManager();
211
212         if (transactionChainManagerRegistration.ownedByInvokingConnectionContext()) {
213             //this actually is new registration for currently processed connection context
214             initializeDeviceContext(connectionContext, transactionChainManager);
215         }
216         else if (TransactionChainManager.TransactionChainManagerStatus.WORKING.equals(transactionChainManager.getTransactionChainManagerStatus())) {
217             //this means there already exists connection described by same NodeId and it is not current connection contexts' registration
218             LOG.info("In deviceConnected, ownedByInvokingConnectionContext is false and  TransactionChainManagerStatus.WORKING. Closing connection to device to start again.");
219             connectionContext.closeConnection(false);
220         }
221         else if (!transactionChainManager.attemptToRegisterHandler(readyForNewTransactionChainHandler)) {
222             //previous connection is shutting down, we will try to register handler listening on new transaction chain ready
223             // new connection wil be closed if handler registration fails
224             LOG.info("In deviceConnected, ownedByInvokingConnectionContext is false, TransactionChainManagerStatus is not shutting down or readyForNewTransactionChainHandler is null. " +
225                     "Closing connection to device to start again.");
226             connectionContext.closeConnection(false);
227         }
228     }
229
230     private void initializeDeviceContext(final ConnectionContext connectionContext, final TransactionChainManager transactionChainManager) {
231
232         // Cache this for clarity
233         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
234
235         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
236         connectionAdapter.setPacketInFiltering(true);
237
238         final Short version = connectionContext.getFeatures().getVersion();
239         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version);
240
241         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
242         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
243                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
244         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
245
246         final NodeId nodeId = connectionContext.getNodeId();
247         final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), nodeId);
248
249         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
250                 hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, transactionChainManager);
251         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
252         deviceContext.setNotificationService(notificationService);
253         deviceContext.setNotificationPublishService(notificationPublishService);
254         final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId()).setNodeConnector(Collections.<NodeConnector>emptyList());
255         try {
256             deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier(), nodeBuilder.build());
257         } catch (final Exception e) {
258             LOG.debug("Failed to write node to DS ", e);
259         }
260
261         connectionContext.setDeviceDisconnectedHandler(deviceContext);
262         deviceContext.addDeviceContextClosedHandler(this);
263         deviceContexts.add(deviceContext);
264
265         updatePacketInRateLimiters();
266
267         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
268                 connectionAdapter, deviceContext);
269         connectionAdapter.setMessageListener(messageListener);
270
271         final ListenableFuture<List<RpcResult<List<MultipartReply>>>> deviceFeaturesFuture;
272         if (OFConstants.OFP_VERSION_1_0 == version) {
273             final CapabilitiesV10 capabilitiesV10 = connectionContext.getFeatures().getCapabilitiesV10();
274
275             DeviceStateUtil.setDeviceStateBasedOnV10Capabilities(deviceState, capabilitiesV10);
276
277             deviceFeaturesFuture = createDeviceFeaturesForOF10(deviceContext, deviceState);
278             // create empty tables after device description is processed
279             chainTableTrunkWriteOF10(deviceContext, deviceFeaturesFuture);
280
281             final short ofVersion = deviceContext.getDeviceState().getVersion();
282             final TranslatorKey translatorKey = new TranslatorKey(ofVersion, PortGrouping.class.getName());
283             final MessageTranslator<PortGrouping, FlowCapableNodeConnector> translator = deviceContext.oook().lookupTranslator(translatorKey);
284             final BigInteger dataPathId = deviceContext.getPrimaryConnectionContext().getFeatures().getDatapathId();
285
286             for (final PortGrouping port : connectionContext.getFeatures().getPhyPort()) {
287                 final FlowCapableNodeConnector fcNodeConnector = translator.translate(port, deviceContext, null);
288
289                 final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), port.getPortNo(), ofVersion);
290                 final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder().setId(nodeConnectorId);
291                 ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcNodeConnector);
292                 ncBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
293                 final NodeConnector connector = ncBuilder.build();
294                 final InstanceIdentifier<NodeConnector> connectorII = deviceState.getNodeInstanceIdentifier().child(NodeConnector.class, connector.getKey());
295                 try {
296                     deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, connectorII, connector);
297                 } catch (final Exception e) {
298                     LOG.debug("Failed to write node {} to DS ", deviceContext.getDeviceState().getNodeId().toString(), e);
299                 }
300
301             }
302         } else if (OFConstants.OFP_VERSION_1_3 == version) {
303             final Capabilities capabilities = connectionContext.getFeatures().getCapabilities();
304             LOG.debug("Setting capabilities for device {}", deviceContext.getDeviceState().getNodeId());
305             DeviceStateUtil.setDeviceStateBasedOnV13Capabilities(deviceState, capabilities);
306             deviceFeaturesFuture = createDeviceFeaturesForOF13(deviceContext, deviceState);
307         } else {
308             deviceFeaturesFuture = Futures.immediateFailedFuture(new ConnectionException("Unsupported version " + version));
309         }
310
311         Futures.addCallback(deviceFeaturesFuture, new FutureCallback<List<RpcResult<List<MultipartReply>>>>() {
312             @Override
313             public void onSuccess(final List<RpcResult<List<MultipartReply>>> result) {
314                 deviceCtxLevelUp(deviceContext);
315             }
316
317             @Override
318             public void onFailure(final Throwable t) {
319                 LOG.trace("Device capabilities gathering future failed.");
320                 LOG.trace("more info in exploration failure..", t);
321                 try {
322                     deviceContext.close();
323                 } catch (Exception e) {
324                     LOG.warn("Failed to close device context: {}", deviceContext.getDeviceState().getNodeId(), t);
325                 }
326             }
327         });
328     }
329
330     private void updatePacketInRateLimiters() {
331         synchronized (deviceContexts) {
332             final int deviceContextsSize = deviceContexts.size();
333             if (deviceContextsSize > 0) {
334                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
335                 if (freshNotificationLimit < 100) {
336                     freshNotificationLimit = 100;
337                 }
338                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
339                 for (DeviceContext deviceContext : deviceContexts) {
340                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
341                 }
342             }
343         }
344     }
345
346     void deviceCtxLevelUp(final DeviceContext deviceContext) {
347         deviceContext.getDeviceState().setValid(true);
348         LOG.trace("Device context level up called.");
349         deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
350     }
351
352     static void chainTableTrunkWriteOF10(final DeviceContext deviceContext, final ListenableFuture<List<RpcResult<List<MultipartReply>>>> deviceFeaturesFuture) {
353         Futures.addCallback(deviceFeaturesFuture, new FutureCallback<List<RpcResult<List<MultipartReply>>>>() {
354             @Override
355             public void onSuccess(final List<RpcResult<List<MultipartReply>>> results) {
356                 boolean allSucceeded = true;
357                 for (final RpcResult<List<MultipartReply>> rpcResult : results) {
358                     allSucceeded &= rpcResult.isSuccessful();
359                 }
360                 if (allSucceeded) {
361                     createEmptyFlowCapableNodeInDs(deviceContext);
362                     makeEmptyTables(deviceContext, deviceContext.getDeviceState().getNodeInstanceIdentifier(),
363                             deviceContext.getDeviceState().getFeatures().getTables());
364                 }
365             }
366
367             @Override
368             public void onFailure(final Throwable t) {
369                 //NOOP
370             }
371         });
372     }
373
374
375     static ListenableFuture<List<RpcResult<List<MultipartReply>>>> createDeviceFeaturesForOF10(final DeviceContext deviceContext,
376                                                                                                        final DeviceState deviceState) {
377         final ListenableFuture<RpcResult<List<MultipartReply>>> replyDesc = getNodeStaticInfo(MultipartType.OFPMPDESC,
378                 deviceContext,
379                 deviceState.getNodeInstanceIdentifier(),
380                 deviceState.getVersion());
381
382         return Futures.allAsList(Arrays.asList(replyDesc));
383     }
384
385     ListenableFuture<List<RpcResult<List<MultipartReply>>>> createDeviceFeaturesForOF13(final DeviceContext deviceContext,
386                                                                                                 final DeviceState deviceState) {
387
388         final ListenableFuture<RpcResult<List<MultipartReply>>> replyDesc = getNodeStaticInfo(MultipartType.OFPMPDESC,
389                 deviceContext,
390                 deviceState.getNodeInstanceIdentifier(),
391                 deviceState.getVersion());
392
393         //first process description reply, write data to DS and write consequent data if successful
394         return Futures.transform(replyDesc, new AsyncFunction<RpcResult<List<MultipartReply>>, List<RpcResult<List<MultipartReply>>>>() {
395             @Override
396             public ListenableFuture<List<RpcResult<List<MultipartReply>>>> apply(final RpcResult<List<MultipartReply>> rpcResult) throws Exception {
397
398                 translateAndWriteReply(MultipartType.OFPMPDESC, deviceContext, deviceState.getNodeInstanceIdentifier(), rpcResult.getResult());
399
400                 final ListenableFuture<RpcResult<List<MultipartReply>>> replyMeterFeature = getNodeStaticInfo(MultipartType.OFPMPMETERFEATURES,
401                         deviceContext,
402                         deviceState.getNodeInstanceIdentifier(),
403                         deviceState.getVersion());
404
405                 createSuccessProcessingCallback(MultipartType.OFPMPMETERFEATURES,
406                         deviceContext,
407                         deviceState.getNodeInstanceIdentifier(),
408                         replyMeterFeature);
409
410                 final ListenableFuture<RpcResult<List<MultipartReply>>> replyGroupFeatures = getNodeStaticInfo(MultipartType.OFPMPGROUPFEATURES,
411                         deviceContext,
412                         deviceState.getNodeInstanceIdentifier(),
413                         deviceState.getVersion());
414                 createSuccessProcessingCallback(MultipartType.OFPMPGROUPFEATURES,
415                         deviceContext,
416                         deviceState.getNodeInstanceIdentifier(),
417                         replyGroupFeatures);
418
419                 final ListenableFuture<RpcResult<List<MultipartReply>>> replyTableFeatures = getNodeStaticInfo(MultipartType.OFPMPTABLEFEATURES,
420                         deviceContext,
421                         deviceState.getNodeInstanceIdentifier(),
422                         deviceState.getVersion());
423                 createSuccessProcessingCallback(MultipartType.OFPMPTABLEFEATURES,
424                         deviceContext,
425                         deviceState.getNodeInstanceIdentifier(),
426                         replyTableFeatures);
427
428                 final ListenableFuture<RpcResult<List<MultipartReply>>> replyPortDescription = getNodeStaticInfo(MultipartType.OFPMPPORTDESC,
429                         deviceContext,
430                         deviceState.getNodeInstanceIdentifier(),
431                         deviceState.getVersion());
432                 createSuccessProcessingCallback(MultipartType.OFPMPPORTDESC,
433                         deviceContext,
434                         deviceState.getNodeInstanceIdentifier(),
435                         replyPortDescription);
436                 if (switchFeaturesMandatory) {
437                     return Futures.allAsList(Arrays.asList(
438                             replyMeterFeature,
439                             replyGroupFeatures,
440                             replyTableFeatures,
441                             replyPortDescription));
442                 } else {
443                     return Futures.successfulAsList(Arrays.asList(
444                             replyMeterFeature,
445                             replyGroupFeatures,
446                             replyTableFeatures,
447                             replyPortDescription));
448                 }
449             }
450         });
451
452     }
453
454     @Override
455     public TranslatorLibrary oook() {
456         return translatorLibrary;
457     }
458
459     @Override
460     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
461         this.translatorLibrary = translatorLibrary;
462     }
463
464     static ListenableFuture<RpcResult<List<MultipartReply>>> getNodeStaticInfo(final MultipartType type, final DeviceContext deviceContext,
465                                                                                        final InstanceIdentifier<Node> nodeII, final short version) {
466
467         final OutboundQueue queue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider();
468
469         final Long reserved = deviceContext.getReservedXid();
470         final RequestContext<List<MultipartReply>> requestContext = new AbstractRequestContext<List<MultipartReply>>(reserved) {
471             @Override
472             public void close() {
473                 //NOOP
474             }
475         };
476
477         final Xid xid = requestContext.getXid();
478
479         LOG.trace("Hooking xid {} to device context - precaution.", reserved);
480
481         final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector(requestContext);
482         queue.commitEntry(xid.getValue(), MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type), new FutureCallback<OfHeader>() {
483             @Override
484             public void onSuccess(final OfHeader ofHeader) {
485                 if (ofHeader instanceof MultipartReply) {
486                     final MultipartReply multipartReply = (MultipartReply) ofHeader;
487                     multiMsgCollector.addMultipartMsg(multipartReply);
488                 } else if (null != ofHeader) {
489                     LOG.info("Unexpected response type received {}.", ofHeader.getClass());
490                 } else {
491                     multiMsgCollector.endCollecting();
492                     LOG.info("Response received is null.");
493                 }
494             }
495
496             @Override
497             public void onFailure(final Throwable t) {
498                 LOG.info("Fail response from OutboundQueue for multipart type {}.", type);
499                 final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder.<List<MultipartReply>>failed().build();
500                 requestContext.setResult(rpcResult);
501                 if (MultipartType.OFPMPTABLEFEATURES.equals(type)) {
502                     makeEmptyTables(deviceContext, nodeII, deviceContext.getPrimaryConnectionContext().getFeatures().getTables());
503                 }
504                 requestContext.close();
505             }
506         });
507
508         return requestContext.getFuture();
509     }
510
511     static void createSuccessProcessingCallback(final MultipartType type, final DeviceContext deviceContext, final InstanceIdentifier<Node> nodeII, final ListenableFuture<RpcResult<List<MultipartReply>>> requestContextFuture) {
512         Futures.addCallback(requestContextFuture, new FutureCallback<RpcResult<List<MultipartReply>>>() {
513             @Override
514             public void onSuccess(final RpcResult<List<MultipartReply>> rpcResult) {
515                 final List<MultipartReply> result = rpcResult.getResult();
516                 if (result != null) {
517                     LOG.info("Static node {} info: {} collected", deviceContext.getDeviceState().getNodeId(), type);
518                     translateAndWriteReply(type, deviceContext, nodeII, result);
519                 } else {
520                     final Iterator<RpcError> rpcErrorIterator = rpcResult.getErrors().iterator();
521                     while (rpcErrorIterator.hasNext()) {
522                         final RpcError rpcError = rpcErrorIterator.next();
523                         LOG.info("Failed to retrieve static node {} info: {}", type, rpcError.getMessage());
524                         if (null != rpcError.getCause()) {
525                             LOG.trace("Detailed error:", rpcError.getCause());
526                         }
527                     }
528                     if (MultipartType.OFPMPTABLEFEATURES.equals(type)) {
529                         makeEmptyTables(deviceContext, nodeII, deviceContext.getPrimaryConnectionContext().getFeatures().getTables());
530                     }
531                 }
532             }
533
534             @Override
535             public void onFailure(final Throwable throwable) {
536                 LOG.info("Request of type {} for static info of node {} failed.", type, nodeII);
537             }
538         });
539     }
540
541     // FIXME : remove after ovs tableFeatures fix
542     static void makeEmptyTables(final DeviceContext dContext, final InstanceIdentifier<Node> nodeII, final Short nrOfTables) {
543         LOG.debug("About to create {} empty tables.", nrOfTables);
544         for (int i = 0; i < nrOfTables; i++) {
545             final short tId = (short) i;
546             final InstanceIdentifier<Table> tableII = nodeII.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tId));
547             final TableBuilder tableBuilder = new TableBuilder().setId(tId).addAugmentation(FlowTableStatisticsData.class, new FlowTableStatisticsDataBuilder().build());
548
549             try {
550                 dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableII, tableBuilder.build());
551             } catch (final Exception e) {
552                 LOG.debug("Failed to write node {} to DS ", dContext.getDeviceState().getNodeId().toString(), e);
553             }
554
555         }
556     }
557
558     private static IpAddress getIpAddressOf(final DeviceContext deviceContext) {
559
560         InetSocketAddress remoteAddress = deviceContext.getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress();
561
562         if (remoteAddress == null) {
563             LOG.warn("IP address of the node {} cannot be obtained. No connection with switch.", deviceContext.getDeviceState().getNodeId());
564             return null;
565         }
566         LOG.info("IP address of switch is :"+remoteAddress);
567
568         final InetAddress address = remoteAddress.getAddress();
569         String hostAddress = address.getHostAddress();
570         if (address instanceof Inet4Address) {
571             return new IpAddress(new Ipv4Address(hostAddress));
572         }
573         if (address instanceof Inet6Address) {
574             return new IpAddress(new Ipv6Address(hostAddress));
575         }
576         LOG.info("Illegal IP address {} of switch:{} ", address, deviceContext.getDeviceState().getNodeId());
577         return null;
578
579     }
580
581     static void translateAndWriteReply(final MultipartType type, final DeviceContext dContext,
582                                                final InstanceIdentifier<Node> nodeII, final Collection<MultipartReply> result) {
583         try {
584             for (final MultipartReply reply : result) {
585                 final MultipartReplyBody body = reply.getMultipartReplyBody();
586                 switch (type) {
587                     case OFPMPDESC:
588                         Preconditions.checkArgument(body instanceof MultipartReplyDescCase);
589                         final MultipartReplyDesc replyDesc = ((MultipartReplyDescCase) body).getMultipartReplyDesc();
590                         final FlowCapableNode fcNode = NodeStaticReplyTranslatorUtil.nodeDescTranslator(replyDesc, getIpAddressOf(dContext));
591                         final InstanceIdentifier<FlowCapableNode> fNodeII = nodeII.augmentation(FlowCapableNode.class);
592                         dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, fNodeII, fcNode);
593                         break;
594
595                     case OFPMPTABLEFEATURES:
596                         Preconditions.checkArgument(body instanceof MultipartReplyTableFeaturesCase);
597                         final MultipartReplyTableFeatures tableFeatures = ((MultipartReplyTableFeaturesCase) body).getMultipartReplyTableFeatures();
598                         final List<TableFeatures> tables = NodeStaticReplyTranslatorUtil.nodeTableFeatureTranslator(tableFeatures);
599                         for (final TableFeatures table : tables) {
600                             final Short tableId = table.getTableId();
601                             final InstanceIdentifier<Table> tableII = nodeII.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId));
602                             final TableBuilder tableBuilder = new TableBuilder().setId(tableId).setTableFeatures(Collections.singletonList(table));
603                             tableBuilder.addAugmentation(FlowTableStatisticsData.class, new FlowTableStatisticsDataBuilder().build());
604                             dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableII, tableBuilder.build());
605                         }
606                         break;
607
608                     case OFPMPMETERFEATURES:
609                         Preconditions.checkArgument(body instanceof MultipartReplyMeterFeaturesCase);
610                         final MultipartReplyMeterFeatures meterFeatures = ((MultipartReplyMeterFeaturesCase) body).getMultipartReplyMeterFeatures();
611                         final NodeMeterFeatures mFeature = NodeStaticReplyTranslatorUtil.nodeMeterFeatureTranslator(meterFeatures);
612                         final InstanceIdentifier<NodeMeterFeatures> mFeatureII = nodeII.augmentation(NodeMeterFeatures.class);
613                         dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, mFeatureII, mFeature);
614                         if (0L < mFeature.getMeterFeatures().getMaxMeter().getValue()) {
615                             dContext.getDeviceState().setMeterAvailable(true);
616                         }
617                         break;
618
619                     case OFPMPGROUPFEATURES:
620                         Preconditions.checkArgument(body instanceof MultipartReplyGroupFeaturesCase);
621                         final MultipartReplyGroupFeatures groupFeatures = ((MultipartReplyGroupFeaturesCase) body).getMultipartReplyGroupFeatures();
622                         final NodeGroupFeatures gFeature = NodeStaticReplyTranslatorUtil.nodeGroupFeatureTranslator(groupFeatures);
623                         final InstanceIdentifier<NodeGroupFeatures> gFeatureII = nodeII.augmentation(NodeGroupFeatures.class);
624                         dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, gFeatureII, gFeature);
625                         break;
626
627                     case OFPMPPORTDESC:
628                         Preconditions.checkArgument(body instanceof MultipartReplyPortDescCase);
629                         final MultipartReplyPortDesc portDesc = ((MultipartReplyPortDescCase) body).getMultipartReplyPortDesc();
630                         for (final PortGrouping port : portDesc.getPorts()) {
631                             final short ofVersion = dContext.getDeviceState().getVersion();
632                             final TranslatorKey translatorKey = new TranslatorKey(ofVersion, PortGrouping.class.getName());
633                             final MessageTranslator<PortGrouping, FlowCapableNodeConnector> translator = dContext.oook().lookupTranslator(translatorKey);
634                             final FlowCapableNodeConnector fcNodeConnector = translator.translate(port, dContext, null);
635
636                             final BigInteger dataPathId = dContext.getPrimaryConnectionContext().getFeatures().getDatapathId();
637                             final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), port.getPortNo(), ofVersion);
638                             final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder().setId(nodeConnectorId);
639                             ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcNodeConnector);
640
641                             ncBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
642                             final NodeConnector connector = ncBuilder.build();
643
644                             final InstanceIdentifier<NodeConnector> connectorII = nodeII.child(NodeConnector.class, connector.getKey());
645                             dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, connectorII, connector);
646                         }
647
648                         break;
649
650                     default:
651                         throw new IllegalArgumentException("Unnexpected MultipartType " + type);
652                 }
653             }
654         } catch (final Exception e) {
655             LOG.debug("Failed to write node {} to DS ", dContext.getDeviceState().getNodeId().toString(), e);
656         }
657     }
658
659     @Override
660     public void setNotificationService(final NotificationService notificationServiceParam) {
661         notificationService = notificationServiceParam;
662     }
663
664     @Override
665     public void setNotificationPublishService(final NotificationPublishService notificationService) {
666         notificationPublishService = notificationService;
667     }
668
669     @Override
670     public void close() throws Exception {
671         for (final DeviceContext deviceContext : deviceContexts) {
672             deviceContext.close();
673         }
674     }
675
676     static void createEmptyFlowCapableNodeInDs(final DeviceContext deviceContext) {
677         final FlowCapableNodeBuilder flowCapableNodeBuilder = new FlowCapableNodeBuilder();
678         final InstanceIdentifier<FlowCapableNode> fNodeII = deviceContext.getDeviceState().getNodeInstanceIdentifier().augmentation(FlowCapableNode.class);
679         try {
680             deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, fNodeII, flowCapableNodeBuilder.build());
681         } catch (final Exception e) {
682             LOG.debug("Failed to write node {} to DS ", deviceContext.getDeviceState().getNodeId().toString(), e);
683         }
684     }
685
686     @Override
687     public void onDeviceContextClosed(final DeviceContext deviceContext) {
688         deviceContexts.remove(deviceContext);
689         updatePacketInRateLimiters();
690     }
691
692     @Override
693     public void initialize() {
694         spyPool = new ScheduledThreadPoolExecutor(1);
695         spyPool.scheduleAtFixedRate(messageIntelligenceAgency, spyRate, spyRate, TimeUnit.SECONDS);
696     }
697
698     @Override
699     public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
700         this.extensionConverterProvider = extensionConverterProvider;
701     }
702
703     @Override
704     public ExtensionConverterProvider getExtensionConverterProvider() {
705         return extensionConverterProvider;
706     }
707 }