Merge "BUG-6118: making the OFentityListener aware of the InJeopardy() flag"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.Timeout;
17 import java.math.BigInteger;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.ExecutionException;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
25 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
28 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
29 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
30 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
35 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
36 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
38 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
39 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
40 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
41 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
42 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
43 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
44 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
45 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
46 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
47 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
48 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
49 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
50 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
51 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
52 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
53 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
54 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
55 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
56 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
57 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
58 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
59 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
60 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
61 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
62 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
63 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
64 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
65 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
66 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
67 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
95 import org.opendaylight.yangtools.yang.binding.DataObject;
96 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
97 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
98 import org.slf4j.Logger;
99 import org.slf4j.LoggerFactory;
100
101 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper{
102
103     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
104
105     // TODO: drain factor should be parametrized
106     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
107     // TODO: low water mark factor should be parametrized
108     private static final float LOW_WATERMARK_FACTOR = 0.75f;
109     // TODO: high water mark factor should be parametrized
110     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
111     private boolean initialized;
112
113     private ConnectionContext primaryConnectionContext;
114     private final DeviceState deviceState;
115     private final DataBroker dataBroker;
116     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
117     private TransactionChainManager transactionChainManager;
118     private DeviceFlowRegistry deviceFlowRegistry;
119     private DeviceGroupRegistry deviceGroupRegistry;
120     private DeviceMeterRegistry deviceMeterRegistry;
121     private final PacketInRateLimiter packetInLimiter;
122     private final MessageSpy messageSpy;
123     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
124     private NotificationPublishService notificationPublishService;
125     private Timeout barrierTaskTimeout;
126     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
127     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
128     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
129     private final TranslatorLibrary translatorLibrary;
130     private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
131     private ExtensionConverterProvider extensionConverterProvider;
132     private final DeviceManager deviceManager;
133     private boolean skipTableFeatures;
134     private boolean switchFeaturesMandatory;
135     private final DeviceInfo deviceInfo;
136     private final ConvertorExecutor convertorExecutor;
137     private volatile CONTEXT_STATE state;
138     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
139
140     DeviceContextImpl(
141             @Nonnull final ConnectionContext primaryConnectionContext,
142             @Nonnull final DataBroker dataBroker,
143             @Nonnull final MessageSpy messageSpy,
144             @Nonnull final TranslatorLibrary translatorLibrary,
145             @Nonnull final DeviceManager manager,
146             final ConvertorExecutor convertorExecutor,
147             final boolean skipTableFeatures) {
148         this.primaryConnectionContext = primaryConnectionContext;
149         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
150         this.deviceState = new DeviceStateImpl();
151         this.dataBroker = dataBroker;
152         this.auxiliaryConnectionContexts = new HashMap<>();
153         this.messageSpy = Preconditions.checkNotNull(messageSpy);
154         this.deviceManager = manager;
155
156         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
157                 /*initial*/ 1000, /*initial*/2000, this.messageSpy, REJECTED_DRAIN_FACTOR);
158
159         this.translatorLibrary = translatorLibrary;
160         this.portStatusTranslator = translatorLibrary.lookupTranslator(
161                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
162         this.packetInTranslator = translatorLibrary.lookupTranslator(
163                 new TranslatorKey(deviceInfo.getVersion(), PacketIn.class.getName()));
164         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
165                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
166
167         this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
168         this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
169         this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
170         this.state = CONTEXT_STATE.INITIALIZATION;
171         this.convertorExecutor = convertorExecutor;
172         this.skipTableFeatures = skipTableFeatures;
173         this.initialized = false;
174     }
175
176     @Override
177     public void initialSubmitTransaction() {
178         if (initialized) {
179             transactionChainManager.initialSubmitWriteTransaction();
180         }
181     }
182
183     @Override
184     public void addAuxiliaryConnectionContext(final ConnectionContext connectionContext) {
185         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
186         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
187     }
188
189     private static SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) {
190         return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
191     }
192
193     @Override
194     public void removeAuxiliaryConnectionContext(final ConnectionContext connectionContext) {
195         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
196         LOG.debug("auxiliary connection dropped: {}, nodeId:{}", connectionContext.getConnectionAdapter()
197                 .getRemoteAddress(), getDeviceInfo().getLOGValue());
198         auxiliaryConnectionContexts.remove(connectionDistinguisher);
199     }
200
201     @Override
202     public DeviceState getDeviceState() {
203         return deviceState;
204     }
205
206     @Override
207     public ReadOnlyTransaction getReadTransaction() {
208         return dataBroker.newReadOnlyTransaction();
209     }
210
211     @Override
212     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
213                                                           final InstanceIdentifier<T> path,
214                                                           final T data){
215         if (initialized) {
216             transactionChainManager.writeToTransaction(store, path, data, false);
217         }
218     }
219
220     @Override
221     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
222                                                                          final InstanceIdentifier<T> path,
223                                                                          final T data){
224         if (initialized) {
225             transactionChainManager.writeToTransaction(store, path, data, true);
226         }
227     }
228
229     @Override
230     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
231         if (initialized) {
232             transactionChainManager.addDeleteOperationTotTxChain(store, path);
233         }
234     }
235
236     @Override
237     public boolean submitTransaction() {
238         return initialized && transactionChainManager.submitWriteTransaction();
239     }
240
241     @Override
242     public ConnectionContext getPrimaryConnectionContext() {
243         return primaryConnectionContext;
244     }
245
246     @Override
247     public ConnectionContext getAuxiliaryConnectionContexts(final BigInteger cookie) {
248         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
249     }
250
251     @Override
252     public DeviceFlowRegistry getDeviceFlowRegistry() {
253         return deviceFlowRegistry;
254     }
255
256     @Override
257     public DeviceGroupRegistry getDeviceGroupRegistry() {
258         return deviceGroupRegistry;
259     }
260
261     @Override
262     public DeviceMeterRegistry getDeviceMeterRegistry() {
263         return deviceMeterRegistry;
264     }
265
266     @Override
267     public void processReply(final OfHeader ofHeader) {
268         if (ofHeader instanceof Error) {
269             messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
270         } else {
271             messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
272         }
273     }
274
275     @Override
276     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
277         for (final MultipartReply multipartReply : ofHeaderList) {
278             messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
279         }
280     }
281
282     @Override
283     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
284         //1. translate to general flow (table, priority, match, cookie)
285         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
286                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
287
288         if(!deviceManager.getIsNotificationFlowRemovedOff()) {
289             // Trigger off a notification
290             notificationPublishService.offerNotification(flowRemovedNotification);
291         } else if(LOG.isDebugEnabled()) {
292             LOG.debug("For nodeId={} isNotificationFlowRemovedOff={}", getDeviceInfo().getLOGValue(), deviceManager.getIsNotificationFlowRemovedOff());
293         }
294
295         final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
296         if (itemLifecycleListener != null) {
297             //2. create registry key
298             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(flowRemovedNotification);
299             //3. lookup flowId
300             final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey);
301             //4. if flowId present:
302             if (flowDescriptor != null) {
303                 // a) construct flow path
304                 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
305                         .augmentation(FlowCapableNode.class)
306                         .child(Table.class, flowDescriptor.getTableKey())
307                         .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
308                 // b) notify listener
309                 itemLifecycleListener.onRemoved(flowPath);
310             } else {
311                 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
312                         getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
313             }
314         }
315     }
316
317     @Override
318     public void processPortStatusMessage(final PortStatusMessage portStatus) {
319         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
320         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator.translate(portStatus, getDeviceInfo(), null);
321
322         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
323         try {
324             if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
325                 // because of ADD status node connector has to be created
326                 final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
327                 nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
328                 nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
329                 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
330             } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
331                 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
332             }
333             submitTransaction();
334         } catch (final Exception e) {
335             LOG.warn("Error processing port status message for port {} on device {} : {}", portStatus.getPortNo(),
336                     getDeviceInfo().getNodeId().toString(), e);
337         }
338     }
339
340     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
341         final InstanceIdentifier<Node> iiToNodes = getDeviceInfo().getNodeInstanceIdentifier();
342         final BigInteger dataPathId = getDeviceInfo().getDatapathId();
343         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
344         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
345     }
346
347     @Override
348     public void processPacketInMessage(final PacketInMessage packetInMessage) {
349         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
350         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
351         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
352
353         if (packetReceived == null) {
354             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
355             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
356             return;
357         } else {
358             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
359         }
360
361         if (!packetInLimiter.acquirePermit()) {
362             LOG.debug("Packet limited");
363             // TODO: save packet into emergency slot if possible
364             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
365             return;
366         }
367
368         final ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(packetReceived);
369         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
370             LOG.debug("notification offer rejected");
371             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
372             packetInLimiter.drainLowWaterMark();
373             packetInLimiter.releasePermit();
374             return;
375         }
376
377         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
378             @Override
379             public void onSuccess(final Object result) {
380                 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
381                 packetInLimiter.releasePermit();
382             }
383
384             @Override
385             public void onFailure(final Throwable t) {
386                 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
387                 LOG.debug("notification offer failed: {}", t.getMessage());
388                 LOG.trace("notification offer failed..", t);
389                 packetInLimiter.releasePermit();
390             }
391         });
392     }
393
394     @Override
395     public void processExperimenterMessage(final ExperimenterMessage notification) {
396         // lookup converter
397         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
398         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
399                 getDeviceInfo().getVersion(),
400                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
401         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter = extensionConverterProvider.getMessageConverter(key);
402         if (messageConverter == null) {
403             LOG.warn("custom converter for {}[OF:{}] not found",
404                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
405                     getDeviceInfo().getVersion());
406             return;
407         }
408         // build notification
409         final ExperimenterMessageOfChoice messageOfChoice;
410         try {
411             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
412             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
413                 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
414                     .setExperimenterMessageOfChoice(messageOfChoice);
415             // publish
416             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
417         } catch (final ConversionException e) {
418             LOG.error("Conversion of experimenter notification failed", e);
419         }
420     }
421
422     @Override
423     public TranslatorLibrary oook() {
424         return translatorLibrary;
425     }
426
427     @Override
428     public synchronized void close() {
429         LOG.debug("closing deviceContext: {}, nodeId:{}",
430                 getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress(),
431                 getDeviceInfo().getLOGValue());
432         // NOOP
433         throw new UnsupportedOperationException("Autocloseble.close will be removed soon");
434     }
435
436     @Override
437     public void setCurrentBarrierTimeout(final Timeout timeout) {
438         barrierTaskTimeout = timeout;
439     }
440
441     @Override
442     public Timeout getBarrierTaskTimeout() {
443         return barrierTaskTimeout;
444     }
445
446     @Override
447     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
448         this.notificationPublishService = notificationPublishService;
449     }
450
451     @Override
452     public MessageSpy getMessageSpy() {
453         return messageSpy;
454     }
455
456     @Override
457     public void onPublished() {
458         Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
459         setState(CONTEXT_STATE.WORKING);
460         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
461         for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
462             switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
463         }
464     }
465
466     @Override
467     public MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext) {
468         return new MultiMsgCollectorImpl(this, requestContext);
469     }
470
471     @Override
472     public void updatePacketInRateLimit(final long upperBound) {
473         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound), (int) (HIGH_WATERMARK_FACTOR * upperBound));
474     }
475
476     @Override
477     public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
478         return itemLifeCycleSourceRegistry;
479     }
480
481     @Override
482     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
483         this.extensionConverterProvider = extensionConverterProvider;
484     }
485
486     @Override
487     public ExtensionConverterProvider getExtensionConverterProvider() {
488         return extensionConverterProvider;
489     }
490
491     @Override
492     public synchronized void shutdownConnection() {
493         if (LOG.isDebugEnabled()) {
494             LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue());
495         }
496         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
497             LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getLOGValue());
498             return;
499         }
500
501         if (ConnectionContext.CONNECTION_STATE.RIP.equals(getPrimaryConnectionContext().getConnectionState())) {
502             LOG.debug("ConnectionCtx for Node {} is in RIP state.", getDeviceInfo().getLOGValue());
503             return;
504         }
505
506         // Terminate Auxiliary Connection
507         for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
508             LOG.debug("Closing auxiliary connection {}", connectionContext.getNodeId());
509             connectionContext.closeConnection(false);
510         }
511
512         // Terminate Primary Connection
513         getPrimaryConnectionContext().closeConnection(true);
514
515         // Close all datastore registries
516         if (initialized) {
517             deviceGroupRegistry.close();
518             deviceFlowRegistry.close();
519             deviceMeterRegistry.close();
520         }
521     }
522
523     @Override
524     public ListenableFuture<Void> shuttingDownDataStoreTransactions() {
525         return initialized
526                 ? this.transactionChainManager.shuttingDown()
527                 : Futures.immediateFuture(null);
528     }
529
530     @VisibleForTesting
531     TransactionChainManager getTransactionChainManager() {
532         return this.transactionChainManager;
533     }
534
535     @Override
536     public void setSwitchFeaturesMandatory(boolean switchFeaturesMandatory) {
537         this.switchFeaturesMandatory = switchFeaturesMandatory;
538     }
539
540     @Override
541     public CONTEXT_STATE getState() {
542         return this.state;
543     }
544
545     @Override
546     public void setState(CONTEXT_STATE state) {
547         this.state = state;
548     }
549
550     @Override
551     public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
552         return initialized
553                 ? this.transactionChainManager.deactivateTransactionManager()
554                 : Futures.immediateFuture(null);
555     }
556
557     @Override
558     public ServiceGroupIdentifier getServiceIdentifier() {
559         return this.deviceInfo.getServiceIdentifier();
560     }
561
562     @Override
563     public DeviceInfo getDeviceInfo() {
564         return this.deviceInfo;
565     }
566
567     @Override
568     public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){
569         if (initialized) {
570             this.transactionChainManager.setLifecycleService(lifecycleService);
571         }
572     }
573
574     @Override
575     public void replaceConnectionContext(final ConnectionContext connectionContext){
576         // Act like we are initializing the context
577         setState(CONTEXT_STATE.INITIALIZATION);
578         this.primaryConnectionContext = connectionContext;
579         this.onPublished();
580     }
581
582     @Override
583     public boolean isSkipTableFeatures() {
584         return this.skipTableFeatures;
585     }
586
587     @Override
588     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
589         this.clusterInitializationPhaseHandler = handler;
590     }
591
592     @Override
593     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
594
595         if (getPrimaryConnectionContext().getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
596             LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
597             return false;
598         }
599
600         LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
601
602         lazyTransactionManagerInitialization();
603
604         this.transactionChainManager.activateTransactionManager();
605
606         try {
607             DeviceInitializationUtils.initializeNodeInformation(this, switchFeaturesMandatory, this.convertorExecutor);
608         } catch (ExecutionException | InterruptedException e) {
609             LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), e);
610             return false;
611         }
612
613         return this.clusterInitializationPhaseHandler.onContextInstantiateService(getPrimaryConnectionContext());
614     }
615
616     @VisibleForTesting
617     void lazyTransactionManagerInitialization() {
618         if (!this.initialized) {
619             if (LOG.isDebugEnabled()) {
620                 LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
621             }
622             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
623             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier());
624             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
625             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
626             this.initialized = true;
627         }
628     }
629 }