Add single-layer deserializer for PacketIn msg
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Verify;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import javax.annotation.Nonnull;
31 import javax.annotation.Nullable;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
39 import org.opendaylight.openflowplugin.api.ConnectionException;
40 import org.opendaylight.openflowplugin.api.OFConstants;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
45 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
46 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
47 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
48 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
49 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
50 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
52 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
53 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
54 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
55 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
56 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
58 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
59 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
60 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
61 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
62 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
63 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
64 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
65 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
66 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
67 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
68 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
69 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
70 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
71 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
72 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
73 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
74 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
75 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
76 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
77 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
78 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
79 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
80 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
81 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
82 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
83 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
84 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
118 import org.opendaylight.yangtools.yang.binding.DataContainer;
119 import org.opendaylight.yangtools.yang.binding.DataObject;
120 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
121 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
122 import org.opendaylight.yangtools.yang.common.RpcResult;
123 import org.slf4j.Logger;
124 import org.slf4j.LoggerFactory;
125
126 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
127
128     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
129
130     // TODO: drain factor should be parametrized
131     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
132     // TODO: low water mark factor should be parametrized
133     private static final float LOW_WATERMARK_FACTOR = 0.75f;
134     // TODO: high water mark factor should be parametrized
135     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
136
137     // Timeout in seconds after what we will give up on propagating role
138     private static final int SET_ROLE_TIMEOUT = 10;
139
140     // Timeout in milliseconds after what we will give up on initializing device
141     private static final int DEVICE_INIT_TIMEOUT = 9000;
142
143     private static final int LOW_WATERMARK = 1000;
144     private static final int HIGH_WATERMARK = 2000;
145
146     private final MultipartWriterProvider writerProvider;
147     private final HashedWheelTimer hashedWheelTimer;
148     private final DeviceState deviceState;
149     private final DataBroker dataBroker;
150     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
151     private final MessageSpy messageSpy;
152     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
153     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
154     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
155     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
156     private final TranslatorLibrary translatorLibrary;
157     private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
158     private final ConvertorExecutor convertorExecutor;
159     private final DeviceInitializerProvider deviceInitializerProvider;
160     private final PacketInRateLimiter packetInLimiter;
161     private final DeviceInfo deviceInfo;
162     private final boolean skipTableFeatures;
163     private final boolean switchFeaturesMandatory;
164     private final boolean isFlowRemovedNotificationOn;
165     private final boolean useSingleLayerSerialization;
166     private NotificationPublishService notificationPublishService;
167     private TransactionChainManager transactionChainManager;
168     private DeviceFlowRegistry deviceFlowRegistry;
169     private DeviceGroupRegistry deviceGroupRegistry;
170     private DeviceMeterRegistry deviceMeterRegistry;
171     private ExtensionConverterProvider extensionConverterProvider;
172     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
173     private SalRoleService salRoleService;
174     private boolean initialized;
175     private boolean hasState;
176     private boolean isInitialTransactionSubmitted;
177     private volatile ConnectionContext primaryConnectionContext;
178     private volatile ContextState state;
179
180     DeviceContextImpl(
181             @Nonnull final ConnectionContext primaryConnectionContext,
182             @Nonnull final DataBroker dataBroker,
183             @Nonnull final MessageSpy messageSpy,
184             @Nonnull final TranslatorLibrary translatorLibrary,
185             final ConvertorExecutor convertorExecutor,
186             final boolean skipTableFeatures,
187             final HashedWheelTimer hashedWheelTimer,
188             final boolean useSingleLayerSerialization,
189             final DeviceInitializerProvider deviceInitializerProvider,
190             final boolean isFlowRemovedNotificationOn,
191             final boolean switchFeaturesMandatory) {
192
193         this.primaryConnectionContext = primaryConnectionContext;
194         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
195         this.hashedWheelTimer = hashedWheelTimer;
196         this.deviceInitializerProvider = deviceInitializerProvider;
197         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
198         this.switchFeaturesMandatory = switchFeaturesMandatory;
199         this.deviceState = new DeviceStateImpl();
200         this.dataBroker = dataBroker;
201         this.messageSpy = messageSpy;
202
203         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
204                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
205
206         this.translatorLibrary = translatorLibrary;
207         this.portStatusTranslator = translatorLibrary.lookupTranslator(
208                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
209         this.packetInTranslator = translatorLibrary.lookupTranslator(
210                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
211                         .PacketIn.class.getName()));
212         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
213                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
214
215         this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
216         this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
217         this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
218         this.state = ContextState.INITIALIZATION;
219         this.convertorExecutor = convertorExecutor;
220         this.skipTableFeatures = skipTableFeatures;
221         this.useSingleLayerSerialization = useSingleLayerSerialization;
222         this.initialized = false;
223         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
224     }
225
226     @Override
227     public boolean initialSubmitTransaction() {
228         return (initialized &&(isInitialTransactionSubmitted =
229                 transactionChainManager.initialSubmitWriteTransaction()));
230     }
231
232     @Override
233     public DeviceState getDeviceState() {
234         return deviceState;
235     }
236
237     @Override
238     public ReadOnlyTransaction getReadTransaction() {
239         return dataBroker.newReadOnlyTransaction();
240     }
241
242     @Override
243     public boolean isTransactionsEnabled() {
244         return isInitialTransactionSubmitted;
245     }
246
247     @Override
248     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
249                                                           final InstanceIdentifier<T> path,
250                                                           final T data){
251         if (initialized) {
252             transactionChainManager.writeToTransaction(store, path, data, false);
253         }
254     }
255
256     @Override
257     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
258                                                                          final InstanceIdentifier<T> path,
259                                                                          final T data){
260         if (initialized) {
261             transactionChainManager.writeToTransaction(store, path, data, true);
262         }
263     }
264
265     @Override
266     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
267         if (initialized) {
268             transactionChainManager.addDeleteOperationTotTxChain(store, path);
269         }
270     }
271
272     @Override
273     public boolean submitTransaction() {
274         return initialized && transactionChainManager.submitWriteTransaction();
275     }
276
277     @Override
278     public ConnectionContext getPrimaryConnectionContext() {
279         return primaryConnectionContext;
280     }
281
282     @Override
283     public DeviceFlowRegistry getDeviceFlowRegistry() {
284         return deviceFlowRegistry;
285     }
286
287     @Override
288     public DeviceGroupRegistry getDeviceGroupRegistry() {
289         return deviceGroupRegistry;
290     }
291
292     @Override
293     public DeviceMeterRegistry getDeviceMeterRegistry() {
294         return deviceMeterRegistry;
295     }
296
297     @Override
298     public void processReply(final OfHeader ofHeader) {
299         messageSpy.spyMessage(
300                 ofHeader.getImplementedInterface(),
301                 (ofHeader instanceof Error)
302                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
303                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
304     }
305
306     @Override
307     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
308         ofHeaderList.forEach(header -> messageSpy.spyMessage(
309                 header.getImplementedInterface(),
310                 (header instanceof Error)
311                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
312                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
313     }
314
315     @Override
316     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
317         //1. translate to general flow (table, priority, match, cookie)
318         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
319                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
320
321         if (isFlowRemovedNotificationOn) {
322             // Trigger off a notification
323             notificationPublishService.offerNotification(flowRemovedNotification);
324         }
325
326         final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
327         if (itemLifecycleListener != null) {
328             //2. create registry key
329             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification);
330             //3. lookup flowId
331             final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
332             //4. if flowId present:
333             if (flowDescriptor != null) {
334                 // a) construct flow path
335                 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
336                         .augmentation(FlowCapableNode.class)
337                         .child(Table.class, flowDescriptor.getTableKey())
338                         .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
339                 // b) notify listener
340                 itemLifecycleListener.onRemoved(flowPath);
341             } else {
342                 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
343                         getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
344             }
345         }
346     }
347
348     @Override
349     public void processPortStatusMessage(final PortStatusMessage portStatus) {
350         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
351
352         if (initialized) {
353             try {
354                 writePortStatusMessage(portStatus);
355                 submitTransaction();
356             } catch (final Exception e) {
357                 LOG.warn("Error processing port status message for port {} on device {}",
358                         portStatus.getPortNo(), getDeviceInfo().getLOGValue(), e);
359             }
360         } else if (!hasState) {
361             primaryConnectionContext.handlePortStatusMessage(portStatus);
362         }
363     }
364
365     private void writePortStatusMessage(final PortStatus portStatusMessage) {
366         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
367                 .translate(portStatusMessage, getDeviceInfo(), null);
368
369         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
370                 .getNodeInstanceIdentifier()
371                 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
372                         .nodeConnectorIdfromDatapathPortNo(
373                                 deviceInfo.getDatapathId(),
374                                 portStatusMessage.getPortNo(),
375                                 OpenflowVersion.get(deviceInfo.getVersion()))));
376
377         if (PortReason.OFPPRADD.equals(portStatusMessage.getReason()) || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
378             // because of ADD status node connector has to be created
379             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
380                     .setKey(iiToNodeConnector.getKey())
381                     .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build())
382                     .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
383                     .build());
384         } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
385             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
386         }
387     }
388
389     @Override
390     public void processPacketInMessage(final PacketInMessage packetInMessage) {
391         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
392         handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
393     }
394
395     private void handlePacketInMessage(final PacketIn packetIn,
396                                        final Class<?> implementedInterface,
397                                        final Match match) {
398         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
399         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
400
401         if (packetIn == null) {
402             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
403             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
404             return;
405         }
406
407         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
408
409         // Try to get ingress from match
410         final Optional<NodeConnectorRef> nodeConnectorRef = Optional.ofNullable(match)
411                 .flatMap(m -> Optional.ofNullable(m.getInPort()))
412                 .flatMap(nodeConnectorId -> Optional.ofNullable(InventoryDataServiceUtil
413                         .portNumberfromNodeConnectorId(
414                                 openflowVersion,
415                                 nodeConnectorId)))
416                 .map(portNumber -> InventoryDataServiceUtil
417                         .nodeConnectorRefFromDatapathIdPortno(
418                                 deviceInfo.getDatapathId(),
419                                 portNumber,
420                                 openflowVersion));
421
422         if (!nodeConnectorRef.isPresent()) {
423             LOG.debug("Received packet from switch {}  but couldn't find an input port", connectionAdapter.getRemoteAddress());
424             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
425             return;
426         }
427
428         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
429
430         if (!packetInLimiter.acquirePermit()) {
431             LOG.debug("Packet limited");
432             // TODO: save packet into emergency slot if possible
433             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
434             return;
435         }
436
437         final ListenableFuture<?> offerNotification = notificationPublishService
438                 .offerNotification(new PacketReceivedBuilder(packetIn)
439                         .setIngress(nodeConnectorRef.get())
440                         .setMatch(MatchUtil.transformMatch(match,
441                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
442                                         .Match.class))
443                         .build());
444
445         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
446             LOG.debug("notification offer rejected");
447             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
448             packetInLimiter.drainLowWaterMark();
449             packetInLimiter.releasePermit();
450             return;
451         }
452
453         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
454             @Override
455             public void onSuccess(final Object result) {
456                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
457                 packetInLimiter.releasePermit();
458             }
459
460             @Override
461             public void onFailure(final Throwable t) {
462                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
463                 LOG.debug("notification offer failed: {}", t.getMessage());
464                 LOG.trace("notification offer failed..", t);
465                 packetInLimiter.releasePermit();
466             }
467         });
468     }
469
470     @Override
471     public void processExperimenterMessage(final ExperimenterMessage notification) {
472         // lookup converter
473         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
474         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
475                 getDeviceInfo().getVersion(),
476                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
477         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter = extensionConverterProvider.getMessageConverter(key);
478         if (messageConverter == null) {
479             LOG.warn("custom converter for {}[OF:{}] not found",
480                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
481                     getDeviceInfo().getVersion());
482             return;
483         }
484         // build notification
485         final ExperimenterMessageOfChoice messageOfChoice;
486         try {
487             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
488             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
489                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
490                     .setExperimenterMessageOfChoice(messageOfChoice);
491             // publish
492             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
493         } catch (final ConversionException e) {
494             LOG.error("Conversion of experimenter notification failed", e);
495         }
496     }
497
498     @Override
499     public void processAlienMessage(final OfHeader message) {
500         final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
501
502         if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
503                 .PacketInMessage.class.equals(implementedInterface)) {
504             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
505                     .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
506                     .PacketInMessage.class.cast(message);
507             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
508         }
509     }
510
511     @Override
512     public TranslatorLibrary oook() {
513         return translatorLibrary;
514     }
515
516     @Override
517     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
518         this.notificationPublishService = notificationPublishService;
519     }
520
521     @Override
522     public MessageSpy getMessageSpy() {
523         return messageSpy;
524     }
525
526     @Override
527     public void onPublished() {
528         Verify.verify(ContextState.INITIALIZATION.equals(state));
529         this.state = ContextState.WORKING;
530         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
531     }
532
533     @Override
534     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>> requestContext) {
535         return new MultiMsgCollectorImpl<>(this, requestContext);
536     }
537
538     @Override
539     public void updatePacketInRateLimit(final long upperBound) {
540         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound), (int) (HIGH_WATERMARK_FACTOR * upperBound));
541     }
542
543     @Override
544     public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
545         return itemLifeCycleSourceRegistry;
546     }
547
548     @Override
549     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
550         this.extensionConverterProvider = extensionConverterProvider;
551     }
552
553     @Override
554     public ExtensionConverterProvider getExtensionConverterProvider() {
555         return extensionConverterProvider;
556     }
557
558     @VisibleForTesting
559     TransactionChainManager getTransactionChainManager() {
560         return this.transactionChainManager;
561     }
562
563     @Override
564     public ListenableFuture<Void> stopClusterServices() {
565         return initialized
566                 ? transactionChainManager.deactivateTransactionManager()
567                 : Futures.immediateFuture(null);
568     }
569
570     @Override
571     public ServiceGroupIdentifier getServiceIdentifier() {
572         return this.deviceInfo.getServiceIdentifier();
573     }
574
575     @Override
576     public DeviceInfo getDeviceInfo() {
577         return this.deviceInfo;
578     }
579
580     @Override
581     public void close() {
582         if (ContextState.TERMINATION.equals(state)) {
583             if (LOG.isDebugEnabled()) {
584                 LOG.debug("DeviceContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
585             }
586
587             return;
588         }
589
590         state = ContextState.TERMINATION;
591
592         // Close all datastore registries and transactions
593         if (initialized) {
594             initialized = false;
595             deviceGroupRegistry.close();
596             deviceFlowRegistry.close();
597             deviceMeterRegistry.close();
598
599             Futures.addCallback(transactionChainManager.shuttingDown(), new FutureCallback<Void>() {
600                 @Override
601                 public void onSuccess(@Nullable final Void result) {
602                     transactionChainManager.close();
603                     transactionChainManager = null;
604                 }
605
606                 @Override
607                 public void onFailure(final Throwable t) {
608                     transactionChainManager.close();
609                     transactionChainManager = null;
610                 }
611             });
612         }
613
614         for (final Iterator<RequestContext<?>> iterator = Iterators
615                 .consumingIterator(requestContexts.iterator()); iterator.hasNext();) {
616             RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), "Connection closed.");
617         }
618     }
619
620     @Override
621     public boolean canUseSingleLayerSerialization() {
622         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
623     }
624
625     @Override
626     public void setSalRoleService(@Nonnull SalRoleService salRoleService) {
627         this.salRoleService = salRoleService;
628     }
629
630     @Override
631     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
632         this.clusterInitializationPhaseHandler = handler;
633     }
634
635     @Override
636     public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
637         LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
638         lazyTransactionManagerInitialization();
639
640         try {
641             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
642                     .retrieveAndClearPortStatusMessages();
643
644             portStatusMessages.forEach(this::writePortStatusMessage);
645             submitTransaction();
646         } catch (final Exception ex) {
647             LOG.warn("Error processing port status messages from device {}", getDeviceInfo().getLOGValue(), ex);
648             return false;
649         }
650
651         try {
652             final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
653                     .lookup(deviceInfo.getVersion());
654
655             if (initializer.isPresent()) {
656                 initializer
657                         .get()
658                         .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor)
659                         .get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
660             } else {
661                 throw new ExecutionException(new ConnectionException("Unsupported version " + deviceInfo.getVersion()));
662             }
663         } catch (ExecutionException | InterruptedException | TimeoutException ex) {
664             LOG.warn("Device {} cannot be initialized: {}", deviceInfo.getLOGValue(), ex.getMessage());
665             LOG.trace("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), ex);
666             return false;
667         }
668
669         Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER),
670                 new RpcResultFutureCallback(mastershipChangeListener));
671
672         final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();
673         Futures.addCallback(deviceFlowRegistryFill,
674                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, mastershipChangeListener));
675
676         return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
677     }
678
679     @VisibleForTesting
680     void lazyTransactionManagerInitialization() {
681         if (!this.initialized) {
682             if (LOG.isDebugEnabled()) {
683                 LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
684             }
685             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
686             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier());
687             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
688             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
689             this.transactionChainManager.activateTransactionManager();
690             this.initialized = true;
691         }
692     }
693
694     @Nullable
695     @Override
696     public <T> RequestContext<T> createRequestContext() {
697         final Long xid = deviceInfo.reserveXidForDeviceMessage();
698
699         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
700             @Override
701             public void close() {
702                 requestContexts.remove(this);
703             }
704         };
705
706         requestContexts.add(abstractRequestContext);
707         return abstractRequestContext;
708     }
709
710     private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
711         if (LOG.isDebugEnabled()) {
712             LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
713         }
714
715         final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
716
717         if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
718             final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
719                     .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
720
721             setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
722
723             final TimerTask timerTask = timeout -> {
724                 if (!setRoleOutputFuture.isDone()) {
725                     LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT);
726                     setRoleOutputFuture.cancel(true);
727                 }
728             };
729
730             hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
731         } else {
732             LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
733             return Futures.immediateFuture(null);
734         }
735
736         return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
737     }
738
739     @Override
740     public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
741         return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
742     }
743
744     @Override
745     public void onStateAcquired(final ContextChainState state) {
746         hasState = true;
747     }
748
749     private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
750
751         private final MastershipChangeListener mastershipChangeListener;
752
753         RpcResultFutureCallback(final MastershipChangeListener mastershipChangeListener) {
754             this.mastershipChangeListener = mastershipChangeListener;
755         }
756
757         @Override
758         public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
759             this.mastershipChangeListener.onMasterRoleAcquired(
760                     deviceInfo,
761                     ContextChainMastershipState.MASTER_ON_DEVICE
762             );
763             if (LOG.isDebugEnabled()) {
764                 LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
765             }
766         }
767
768         @Override
769         public void onFailure(final Throwable throwable) {
770             mastershipChangeListener.onNotAbleToStartMastershipMandatory(
771                     deviceInfo,
772                     "Was not able to set MASTER role on device");
773         }
774     }
775
776     private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base.Optional<FlowCapableNode>>> {
777         private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
778         private final MastershipChangeListener mastershipChangeListener;
779
780         DeviceFlowRegistryCallback(
781                 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
782                 MastershipChangeListener mastershipChangeListener) {
783             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
784             this.mastershipChangeListener = mastershipChangeListener;
785         }
786
787         @Override
788         public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
789             if (LOG.isDebugEnabled()) {
790                 // Count all flows we read from datastore for debugging purposes.
791                 // This number do not always represent how many flows were actually added
792                 // to DeviceFlowRegistry, because of possible duplicates.
793                 long flowCount = Optional.ofNullable(result)
794                         .map(Collections::singleton)
795                         .orElse(Collections.emptySet())
796                         .stream()
797                         .flatMap(Collection::stream)
798                         .filter(Objects::nonNull)
799                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
800                         .filter(Objects::nonNull)
801                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
802                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
803                         .filter(Objects::nonNull)
804                         .filter(table -> Objects.nonNull(table.getFlow()))
805                         .flatMap(table -> table.getFlow().stream())
806                         .filter(Objects::nonNull)
807                         .count();
808
809                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getLOGValue());
810             }
811             this.mastershipChangeListener.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_FLOW_REGISTRY_FILL);
812         }
813
814         @Override
815         public void onFailure(Throwable t) {
816             if (deviceFlowRegistryFill.isCancelled()) {
817                 if (LOG.isDebugEnabled()) {
818                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
819                 }
820             } else {
821                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t);
822             }
823             mastershipChangeListener.onNotAbleToStartMastership(
824                     deviceInfo,
825                     "Was not able to fill flow registry on device",
826                     false);
827         }
828     }
829
830 }