Post "Clustering optimization" updates
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.AsyncFunction;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.FutureFallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import io.netty.util.HashedWheelTimer;
21 import io.netty.util.Timeout;
22 import java.math.BigInteger;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import javax.annotation.CheckForNull;
32 import javax.annotation.Nonnull;
33 import javax.annotation.Nullable;
34
35 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
36 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
38 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
39 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
42 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
43 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
44 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
45 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
46 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
47 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
48 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
49 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
50 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
51 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
52 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
53 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
54 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
55 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
56 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
58 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
59 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
60 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
61 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
62 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
63 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
64 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
65 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
66 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
67 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
68 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
69 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
70 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
71 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
72 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
73 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
74 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
75 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
76 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
77 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
78 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
79 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
80 import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils;
81 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
111 import org.opendaylight.yangtools.yang.binding.DataObject;
112 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
113 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
114 import org.slf4j.Logger;
115 import org.slf4j.LoggerFactory;
116
117 /**
118  *
119  */
120 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
121
122     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
123
124     // TODO: drain factor should be parametrized
125     public static final float REJECTED_DRAIN_FACTOR = 0.25f;
126     // TODO: low water mark factor should be parametrized
127     private static final float LOW_WATERMARK_FACTOR = 0.75f;
128     // TODO: high water mark factor should be parametrized
129     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
130
131     private final ConnectionContext primaryConnectionContext;
132     private final DeviceState deviceState;
133     private final DataBroker dataBroker;
134     private final HashedWheelTimer hashedWheelTimer;
135     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
136     private final TransactionChainManager transactionChainManager;
137     private final DeviceFlowRegistry deviceFlowRegistry;
138     private final DeviceGroupRegistry deviceGroupRegistry;
139     private final DeviceMeterRegistry deviceMeterRegistry;
140     private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
141     private final PacketInRateLimiter packetInLimiter;
142     private final MessageSpy messageSpy;
143     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
144     private NotificationPublishService notificationPublishService;
145     private NotificationService notificationService;
146     private final OutboundQueue outboundQueueProvider;
147     private Timeout barrierTaskTimeout;
148     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
149     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
150     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
151     private final TranslatorLibrary translatorLibrary;
152     private final Map<Long, NodeConnectorRef> nodeConnectorCache;
153     private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
154     private RpcContext rpcContext;
155     private ExtensionConverterProvider extensionConverterProvider;
156
157     private final boolean switchFeaturesMandatory;
158     private StatisticsContext statisticsContext;
159
160
161     @VisibleForTesting
162     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
163                       @Nonnull final DeviceState deviceState,
164                       @Nonnull final DataBroker dataBroker,
165                       @Nonnull final HashedWheelTimer hashedWheelTimer,
166                       @Nonnull final MessageSpy _messageSpy,
167                       @Nonnull final OutboundQueueProvider outboundQueueProvider,
168                       @Nonnull final TranslatorLibrary translatorLibrary,
169                       final boolean switchFeaturesMandatory) {
170         this.switchFeaturesMandatory = switchFeaturesMandatory;
171         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
172         this.deviceState = Preconditions.checkNotNull(deviceState);
173         this.dataBroker = Preconditions.checkNotNull(dataBroker);
174         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
175         this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider);
176         primaryConnectionContext.setDeviceDisconnectedHandler(DeviceContextImpl.this);
177         this.transactionChainManager = new TransactionChainManager(dataBroker, deviceState);
178         auxiliaryConnectionContexts = new HashMap<>();
179         deviceFlowRegistry = new DeviceFlowRegistryImpl();
180         deviceGroupRegistry = new DeviceGroupRegistryImpl();
181         deviceMeterRegistry = new DeviceMeterRegistryImpl();
182         messageSpy = _messageSpy;
183
184         packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
185                 /*initial*/ 1000, /*initial*/2000, messageSpy, REJECTED_DRAIN_FACTOR);
186
187         this.translatorLibrary = translatorLibrary;
188         portStatusTranslator = translatorLibrary.lookupTranslator(
189                 new TranslatorKey(deviceState.getVersion(), PortGrouping.class.getName()));
190         packetInTranslator = translatorLibrary.lookupTranslator(
191                 new TranslatorKey(deviceState.getVersion(), PacketIn.class.getName()));
192         flowRemovedTranslator = translatorLibrary.lookupTranslator(
193                 new TranslatorKey(deviceState.getVersion(), FlowRemoved.class.getName()));
194
195
196         nodeConnectorCache = new ConcurrentHashMap<>();
197
198         itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
199         flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
200         itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
201     }
202
203     /**
204      * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
205      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
206      */
207     void initialSubmitTransaction() {
208         transactionChainManager.initialSubmitWriteTransaction();
209     }
210
211     @Override
212     public Long reservedXidForDeviceMessage() {
213         return outboundQueueProvider.reserveEntry();
214     }
215
216     @Override
217     public void addAuxiliaryConnectionContext(final ConnectionContext connectionContext) {
218         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
219         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
220     }
221
222     private static SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) {
223         return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
224     }
225
226     @Override
227     public void removeAuxiliaryConnectionContext(final ConnectionContext connectionContext) {
228         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
229         if (null != connectionDistinguisher) {
230             auxiliaryConnectionContexts.remove(connectionDistinguisher);
231         }
232     }
233
234     @Override
235     public DeviceState getDeviceState() {
236         return deviceState;
237     }
238
239     @Override
240     public ReadOnlyTransaction getReadTransaction() {
241         return dataBroker.newReadOnlyTransaction();
242     }
243
244     @Override
245     public ListenableFuture<Void> onClusterRoleChange(final OfpRole oldRole, @CheckForNull final OfpRole role) {
246         LOG.trace("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
247         Preconditions.checkArgument(role != null);
248         if (role.equals(oldRole)) {
249             LOG.debug("Demanded role change for device {} is not change OldRole: {}, NewRole {}", deviceState.getNodeId(), oldRole, role);
250             return Futures.immediateFuture(null);
251         }
252         if (OfpRole.BECOMEMASTER.equals(role)) {
253             MdSalRegistrationUtils.registerMasterServices(getRpcContext(), DeviceContextImpl.this, role);
254             getRpcContext().registerStatCompatibilityServices();
255             if (!deviceState.deviceSynchronized()) {
256                 //TODO: no necessary code for yet - it needs for initialization phase only
257                 LOG.debug("Setup Empty TxManager {} for initialization phase", getDeviceState().getNodeId());
258                 transactionChainManager.activateTransactionManager();
259                 return Futures.immediateCheckedFuture(null);
260             }
261             /* Relevant for no initial Slave-to-Master scenario in cluster */
262             final ListenableFuture<Void> deviceInitialization = asyncClusterRoleChange();
263             Futures.addCallback(deviceInitialization, new FutureCallback<Void>() {
264
265                 @Override
266                 public void onSuccess(@Nullable Void aVoid) {
267                     //No operation
268                 }
269
270                 @Override
271                 public void onFailure(Throwable throwable) {
272                     LOG.debug("Device {} init unexpected fail. Unregister RPCs", getDeviceState().getNodeId());
273                     MdSalRegistrationUtils.unregisterServices(getRpcContext());
274                 }
275
276             });
277
278             return deviceInitialization;
279
280         } else if (OfpRole.BECOMESLAVE.equals(role)) {
281             if (null != rpcContext) {
282                 MdSalRegistrationUtils.registerSlaveServices(rpcContext, role);
283             }
284             return transactionChainManager.deactivateTransactionManager();
285         } else {
286             LOG.warn("Unknown OFCluster Role {} for Node {}", role, deviceState.getNodeId());
287             if (null != rpcContext) {
288                 MdSalRegistrationUtils.unregisterServices(rpcContext);
289             }
290             return transactionChainManager.deactivateTransactionManager();
291         }
292     }
293
294     /*
295      * we don't have active TxManager so anything will not be stored to DS yet, but we have
296      * check all NodeInformation for statistics otherwise statistics will not contains
297      * all possible MultipartTypes for polling in StatTypeList
298      */
299     private ListenableFuture<Void> asyncClusterRoleChange() {
300         if (statisticsContext == null) {
301             final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceState.getNodeId());
302             LOG.warn(errMsg);
303             return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
304         }
305         if (rpcContext == null) {
306             final String errMsg = String.format("DeviceCtx %s is up but we are missing RpcContext", deviceState.getNodeId());
307             LOG.warn(errMsg);
308             return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
309         }
310
311         final InstanceIdentifier<FlowCapableNode> ofNodeII = deviceState.getNodeInstanceIdentifier()
312                 .augmentation(FlowCapableNode.class);
313         final ReadOnlyTransaction readTx = getReadTransaction();
314         final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readOfNodeFuture = readTx.read(
315                 LogicalDatastoreType.OPERATIONAL, ofNodeII);
316
317         final ListenableFuture<Void> nodeInitInfoFuture = Futures.transform(readOfNodeFuture,
318                 new AsyncFunction<Optional<FlowCapableNode>, Void>() {
319                     @Override
320                     public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) throws Exception {
321                         if (!input.isPresent() || input.get().getTable() == null || input.get().getTable().isEmpty()) {
322                             /* Last master close fail scenario so we would like to activate TxManager */
323                             LOG.debug("Operational DS for Device {} has to be replaced", deviceState.getNodeId());
324                             getDeviceState().setDeviceSynchronized(false);
325                             transactionChainManager.activateTransactionManager();
326                         }
327                         return DeviceInitializationUtils.initializeNodeInformation(DeviceContextImpl.this, switchFeaturesMandatory);
328                     }
329                 });
330
331         final ListenableFuture<Boolean> statPollFuture = Futures.transform(nodeInitInfoFuture,
332                 new AsyncFunction<Void, Boolean>() {
333
334                     @Override
335                     public ListenableFuture<Boolean> apply(final Void input) throws Exception {
336                         getStatisticsContext().statListForCollectingInitialization();
337                         if (getDeviceState().deviceSynchronized()) {
338                             return Futures.immediateFuture(Boolean.TRUE);
339                         }
340                         return getStatisticsContext().gatherDynamicData();
341                     }
342                 });
343
344         return Futures.transform(statPollFuture, new Function<Boolean, Void>() {
345
346             @Override
347             public Void apply(final Boolean input) {
348                 if (ConnectionContext.CONNECTION_STATE.RIP.equals(getPrimaryConnectionContext().getConnectionState())) {
349                     final String errMsg = String.format("We lost connection for Device %s, context has to be closed.",
350                             getDeviceState().getNodeId());
351                     LOG.warn(errMsg);
352                     transactionChainManager.clearUnsubmittedTransaction();
353                     throw new IllegalStateException(errMsg);
354                 }
355                 if (!input.booleanValue()) {
356                     final String errMsg = String.format("Get Initial Device %s information fails",
357                             getDeviceState().getNodeId());
358                     LOG.warn(errMsg);
359                     transactionChainManager.clearUnsubmittedTransaction();
360                     throw new IllegalStateException(errMsg);
361                 }
362                 LOG.debug("Get Initial Device {} information is successful", getDeviceState().getNodeId());
363                 getDeviceState().setDeviceSynchronized(true);
364                 transactionChainManager.activateTransactionManager();
365                 initialSubmitTransaction();
366                 getDeviceState().setStatisticsPollingEnabledProp(true);
367                 return null;
368             }
369         });
370     }
371
372     @Override
373     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
374                                                           final InstanceIdentifier<T> path, final T data) {
375         transactionChainManager.writeToTransaction(store, path, data);
376     }
377
378     @Override
379     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
380         transactionChainManager.addDeleteOperationTotTxChain(store, path);
381     }
382
383     @Override
384     public boolean submitTransaction() {
385         return transactionChainManager.submitWriteTransaction();
386     }
387
388     @Override
389     public ConnectionContext getPrimaryConnectionContext() {
390         return primaryConnectionContext;
391     }
392
393     @Override
394     public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
395         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
396     }
397
398     @Override
399     public DeviceFlowRegistry getDeviceFlowRegistry() {
400         return deviceFlowRegistry;
401     }
402
403     @Override
404     public DeviceGroupRegistry getDeviceGroupRegistry() {
405         return deviceGroupRegistry;
406     }
407
408     @Override
409     public DeviceMeterRegistry getDeviceMeterRegistry() {
410         return deviceMeterRegistry;
411     }
412
413     @Override
414     public void processReply(final OfHeader ofHeader) {
415         if (ofHeader instanceof Error) {
416             messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
417         } else {
418             messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
419         }
420     }
421
422     @Override
423     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
424         for (final MultipartReply multipartReply : ofHeaderList) {
425             messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
426         }
427     }
428
429     @Override
430     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
431         final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
432         if (itemLifecycleListener != null) {
433             //1. translate to general flow (table, priority, match, cookie)
434             final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
435                     flowRemovedTranslator.translate(flowRemoved, this, null);
436             //2. create registry key
437             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(flowRemovedNotification);
438             //3. lookup flowId
439             final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey);
440             //4. if flowId present:
441             if (flowDescriptor != null) {
442                 // a) construct flow path
443                 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceState().getNodeInstanceIdentifier()
444                         .augmentation(FlowCapableNode.class)
445                         .child(Table.class, flowDescriptor.getTableKey())
446                         .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
447                 // b) notify listener
448                 itemLifecycleListener.onRemoved(flowPath);
449             } else {
450                 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
451                         getDeviceState().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
452             }
453         }
454     }
455
456     @Override
457     public void processPortStatusMessage(final PortStatusMessage portStatus) {
458         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
459         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator.translate(portStatus, this, null);
460
461         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
462         if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
463             // because of ADD status node connector has to be created
464             final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
465             nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
466             nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
467             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
468         } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
469             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
470         }
471         submitTransaction();
472     }
473
474     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
475         final InstanceIdentifier<Node> iiToNodes = deviceState.getNodeInstanceIdentifier();
476         final BigInteger dataPathId = deviceState.getFeatures().getDatapathId();
477         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
478         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
479     }
480
481     @Override
482     public void processPacketInMessage(final PacketInMessage packetInMessage) {
483         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
484         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
485         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, this, null);
486
487         if (packetReceived == null) {
488             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
489             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
490             return;
491         } else {
492             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
493         }
494
495         if (!packetInLimiter.acquirePermit()) {
496             LOG.debug("Packet limited");
497             // TODO: save packet into emergency slot if possible
498             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
499             return;
500         }
501
502         final ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
503         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
504             LOG.debug("notification offer rejected");
505             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
506             packetInLimiter.drainLowWaterMark();
507             packetInLimiter.releasePermit();
508             return;
509         }
510
511         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
512             @Override
513             public void onSuccess(final Object result) {
514                 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
515                 packetInLimiter.releasePermit();
516             }
517
518             @Override
519             public void onFailure(final Throwable t) {
520                 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
521                 LOG.debug("notification offer failed: {}", t.getMessage());
522                 LOG.trace("notification offer failed..", t);
523                 packetInLimiter.releasePermit();
524             }
525         });
526     }
527
528     @Override
529     public void processExperimenterMessage(final ExperimenterMessage notification) {
530         // lookup converter
531         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
532         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
533                 deviceState.getVersion(),
534                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
535         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter = extensionConverterProvider.getMessageConverter(key);
536         if (messageConverter == null) {
537             LOG.warn("custom converter for {}[OF:{}] not found",
538                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
539                     deviceState.getVersion());
540             return;
541         }
542         // build notification
543         final ExperimenterMessageOfChoice messageOfChoice;
544         try {
545             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
546             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
547                 .setNode(new NodeRef(deviceState.getNodeInstanceIdentifier()))
548                     .setExperimenterMessageOfChoice(messageOfChoice);
549             // publish
550             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
551         } catch (final ConversionException e) {
552             LOG.warn("Conversion of experimenter notification failed", e);
553         }
554     }
555
556     @Override
557     public TranslatorLibrary oook() {
558         return translatorLibrary;
559     }
560
561     @Override
562     public HashedWheelTimer getTimer() {
563         return hashedWheelTimer;
564     }
565
566     @Override
567     public synchronized void close() {
568         LOG.debug("closing deviceContext: {}, nodeId:{}",
569                 getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress(),
570                 getDeviceState().getNodeId());
571
572         if (deviceState.isValid()) {
573             primaryConnectionContext.closeConnection(false);
574             tearDown();
575         }
576     }
577
578     private synchronized void tearDown() {
579         LOG.trace("tearDown method for node {}", deviceState.getNodeId());
580         if (deviceState.isValid()) {
581             deviceState.setValid(false);
582
583             for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
584                 connectionContext.closeConnection(false);
585             }
586
587             deviceGroupRegistry.close();
588             deviceFlowRegistry.close();
589             deviceMeterRegistry.close();
590
591             final ListenableFuture<Void> future = transactionChainManager.shuttingDown();
592             Futures.addCallback(future, new FutureCallback<Void>() {
593
594                 @Override
595                 public void onSuccess(final Void result) {
596                     LOG.info("TxChain {} was shutdown successful.", getDeviceState().getNodeId());
597                     tearDownClean();
598                 }
599
600                 @Override
601                 public void onFailure(final Throwable t) {
602                     LOG.warn("Shutdown TxChain {} fail.", getDeviceState().getNodeId(), t);
603                     tearDownClean();
604                 }
605             });
606         }
607     }
608
609     private void tearDownClean() {
610         LOG.info("Closing transaction chain manager without cleaning inventory operational");
611         transactionChainManager.close();
612
613         final LinkedList<DeviceContextClosedHandler> reversedCloseHandlers = new LinkedList<>(closeHandlers);
614         Collections.reverse(reversedCloseHandlers);
615         for (final DeviceContextClosedHandler deviceContextClosedHandler : reversedCloseHandlers) {
616             deviceContextClosedHandler.onDeviceContextClosed(this);
617         }
618         closeHandlers.clear();
619     }
620
621     @Override
622     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
623         if (getPrimaryConnectionContext().equals(connectionContext)) {
624             try {
625                 tearDown();
626             } catch (final Exception e) {
627                 LOG.trace("Error closing device context.");
628             }
629         } else {
630             LOG.debug("auxiliary connection dropped: {}, nodeId:{}",
631                     connectionContext.getConnectionAdapter().getRemoteAddress(),
632                     getDeviceState().getNodeId());
633             final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
634             auxiliaryConnectionContexts.remove(connectionDistinguisher);
635         }
636     }
637
638     @Override
639     public void setCurrentBarrierTimeout(final Timeout timeout) {
640         barrierTaskTimeout = timeout;
641     }
642
643     @Override
644     public Timeout getBarrierTaskTimeout() {
645         return barrierTaskTimeout;
646     }
647
648     @Override
649     public void setNotificationService(final NotificationService notificationService) {
650         this.notificationService = notificationService;
651     }
652
653     @Override
654     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
655         this.notificationPublishService = notificationPublishService;
656     }
657
658     @Override
659     public MessageSpy getMessageSpy() {
660         return messageSpy;
661     }
662
663     @Override
664     public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) {
665         closeHandlers.add(deviceContextClosedHandler);
666     }
667
668     @Override
669     public void onPublished() {
670         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
671         for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
672             switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
673         }
674     }
675
676     @Override
677     public MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext) {
678         return new MultiMsgCollectorImpl(this, requestContext);
679     }
680
681     @Override
682     public NodeConnectorRef lookupNodeConnectorRef(final Long portNumber) {
683         return nodeConnectorCache.get(portNumber);
684     }
685
686     @Override
687     public void storeNodeConnectorRef(final Long portNumber, final NodeConnectorRef nodeConnectorRef) {
688         nodeConnectorCache.put(
689                 Preconditions.checkNotNull(portNumber),
690                 Preconditions.checkNotNull(nodeConnectorRef));
691     }
692
693     @Override
694     public void updatePacketInRateLimit(final long upperBound) {
695         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound), (int) (HIGH_WATERMARK_FACTOR * upperBound));
696     }
697
698     @Override
699     public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
700         return itemLifeCycleSourceRegistry;
701     }
702
703     @Override
704     public void setRpcContext(final RpcContext rpcContext) {
705         this.rpcContext = rpcContext;
706     }
707
708     @Override
709     public RpcContext getRpcContext() {
710         return rpcContext;
711     }
712
713     @Override
714     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
715         this.extensionConverterProvider = extensionConverterProvider;
716     }
717
718     @Override
719     public ExtensionConverterProvider getExtensionConverterProvider() {
720         return extensionConverterProvider;
721     }
722
723     @Override
724     public void setStatisticsContext(final StatisticsContext statisticsContext) {
725         this.statisticsContext = statisticsContext;
726     }
727
728     @Override
729     public StatisticsContext getStatisticsContext() {
730         return statisticsContext;
731     }
732 }