Quick fix RPCs and DevicCtx.close
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.AsyncFunction;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.FutureFallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import io.netty.util.HashedWheelTimer;
21 import io.netty.util.Timeout;
22 import java.math.BigInteger;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import javax.annotation.CheckForNull;
32 import javax.annotation.Nonnull;
33 import javax.annotation.Nullable;
34
35 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
36 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
38 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
39 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
42 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
43 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
44 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
45 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
46 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
47 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
48 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
49 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
50 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
51 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
52 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
53 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
54 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
55 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
56 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
58 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
59 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
60 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
61 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
62 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
63 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
64 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
65 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
66 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
67 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
68 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
69 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
70 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
71 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
72 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
73 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
74 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
75 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
76 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
77 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
78 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
79 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
80 import org.opendaylight.openflowplugin.impl.util.MdSalRegistratorUtils;
81 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
111 import org.opendaylight.yangtools.yang.binding.DataObject;
112 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
113 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
114 import org.slf4j.Logger;
115 import org.slf4j.LoggerFactory;
116
117 /**
118  *
119  */
120 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
121
122     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
123
124     // TODO: drain factor should be parametrized
125     public static final float REJECTED_DRAIN_FACTOR = 0.25f;
126     // TODO: low water mark factor should be parametrized
127     private static final float LOW_WATERMARK_FACTOR = 0.75f;
128     // TODO: high water mark factor should be parametrized
129     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
130
131     private final ConnectionContext primaryConnectionContext;
132     private final DeviceState deviceState;
133     private final DataBroker dataBroker;
134     private final HashedWheelTimer hashedWheelTimer;
135     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
136     private final TransactionChainManager transactionChainManager;
137     private final DeviceFlowRegistry deviceFlowRegistry;
138     private final DeviceGroupRegistry deviceGroupRegistry;
139     private final DeviceMeterRegistry deviceMeterRegistry;
140     private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
141     private final PacketInRateLimiter packetInLimiter;
142     private final MessageSpy messageSpy;
143     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
144     private NotificationPublishService notificationPublishService;
145     private NotificationService notificationService;
146     private final OutboundQueue outboundQueueProvider;
147     private Timeout barrierTaskTimeout;
148     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
149     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
150     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
151     private final TranslatorLibrary translatorLibrary;
152     private final Map<Long, NodeConnectorRef> nodeConnectorCache;
153     private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
154     private RpcContext rpcContext;
155     private ExtensionConverterProvider extensionConverterProvider;
156
157     private final boolean switchFeaturesMandatory;
158     private StatisticsContext statCtx;
159
160
161     @VisibleForTesting
162     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
163                       @Nonnull final DeviceState deviceState,
164                       @Nonnull final DataBroker dataBroker,
165                       @Nonnull final HashedWheelTimer hashedWheelTimer,
166                       @Nonnull final MessageSpy _messageSpy,
167                       @Nonnull final OutboundQueueProvider outboundQueueProvider,
168                       @Nonnull final TranslatorLibrary translatorLibrary,
169                       final boolean switchFeaturesMandatory) {
170         this.switchFeaturesMandatory = switchFeaturesMandatory;
171         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
172         this.deviceState = Preconditions.checkNotNull(deviceState);
173         this.dataBroker = Preconditions.checkNotNull(dataBroker);
174         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
175         this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider);
176         primaryConnectionContext.setDeviceDisconnectedHandler(DeviceContextImpl.this);
177         this.transactionChainManager = new TransactionChainManager(dataBroker, deviceState);
178         auxiliaryConnectionContexts = new HashMap<>();
179         deviceFlowRegistry = new DeviceFlowRegistryImpl();
180         deviceGroupRegistry = new DeviceGroupRegistryImpl();
181         deviceMeterRegistry = new DeviceMeterRegistryImpl();
182         messageSpy = _messageSpy;
183
184         packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
185                 /*initial*/ 1000, /*initial*/2000, messageSpy, REJECTED_DRAIN_FACTOR);
186
187         this.translatorLibrary = translatorLibrary;
188         portStatusTranslator = translatorLibrary.lookupTranslator(
189                 new TranslatorKey(deviceState.getVersion(), PortGrouping.class.getName()));
190         packetInTranslator = translatorLibrary.lookupTranslator(
191                 new TranslatorKey(deviceState.getVersion(), PacketIn.class.getName()));
192         flowRemovedTranslator = translatorLibrary.lookupTranslator(
193                 new TranslatorKey(deviceState.getVersion(), FlowRemoved.class.getName()));
194
195
196         nodeConnectorCache = new ConcurrentHashMap<>();
197
198         itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
199         flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
200         itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
201     }
202
203     /**
204      * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
205      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
206      */
207     void initialSubmitTransaction() {
208         transactionChainManager.initialSubmitWriteTransaction();
209     }
210
211     @Override
212     public Long reservedXidForDeviceMessage() {
213         return outboundQueueProvider.reserveEntry();
214     }
215
216     @Override
217     public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
218         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
219         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
220     }
221
222     private static SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) {
223         return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
224     }
225
226     @Override
227     public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
228         // TODO Auto-generated method stub
229     }
230
231     @Override
232     public DeviceState getDeviceState() {
233         return deviceState;
234     }
235
236     @Override
237     public ReadOnlyTransaction getReadTransaction() {
238         return dataBroker.newReadOnlyTransaction();
239     }
240
241     @Override
242     public ListenableFuture<Void> onClusterRoleChange(final OfpRole oldRole, @CheckForNull final OfpRole role) {
243         LOG.trace("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
244         Preconditions.checkArgument(role != null);
245         if (role.equals(oldRole)) {
246             LOG.debug("Demanded role change for device {} is not change OldRole: {}, NewRole {}", deviceState.getNodeId(), oldRole, role);
247             return Futures.immediateFuture(null);
248         }
249         if (OfpRole.BECOMEMASTER.equals(role)) {
250             MdSalRegistratorUtils.registerMasterServices(getRpcContext(), DeviceContextImpl.this, role);
251             getRpcContext().registerStatCompatibilityServices();
252             if (!deviceState.deviceSynchronized()) {
253                 //TODO: no necessary code for yet - it needs for initialization phase only
254                 LOG.debug("Setup Empty TxManager {} for initialization phase", getDeviceState().getNodeId());
255                 transactionChainManager.activateTransactionManager();
256                 return Futures.immediateCheckedFuture(null);
257             }
258             /* Relevant for no initial Slave-to-Master scenario in cluster */
259             final ListenableFuture<Void> deviceInitialization = asyncClusterRoleChange();
260             Futures.addCallback(deviceInitialization, new FutureCallback<Void>() {
261
262                 @Override
263                 public void onSuccess(@Nullable Void aVoid) {
264                     //No operation
265                 }
266
267                 @Override
268                 public void onFailure(Throwable throwable) {
269                     LOG.debug("Device {} init unexpected fail. Unregister RPCs", getDeviceState().getNodeId());
270                     MdSalRegistratorUtils.unregisterServices(getRpcContext());
271                 }
272
273             });
274
275             return deviceInitialization;
276
277         } else if (OfpRole.BECOMESLAVE.equals(role)) {
278             if (null != rpcContext) {
279                 MdSalRegistratorUtils.registerSlaveServices(rpcContext, role);
280             }
281             return transactionChainManager.deactivateTransactionManager();
282         } else {
283             LOG.warn("Unknown OFCluster Role {} for Node {}", role, deviceState.getNodeId());
284             if (null != rpcContext) {
285                 MdSalRegistratorUtils.unregisterServices(rpcContext);
286             }
287             return transactionChainManager.deactivateTransactionManager();
288         }
289     }
290
291     /*
292      * we don't have active TxManager so anything will not be stored to DS yet, but we have
293      * check all NodeInformation for statistics otherwise statistics will not contains
294      * all possible MultipartTypes for polling in StatTypeList
295      */
296     private ListenableFuture<Void> asyncClusterRoleChange() {
297         if (statCtx == null) {
298             final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceState.getNodeId());
299             LOG.warn(errMsg);
300             return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
301         }
302         if (rpcContext == null) {
303             final String errMsg = String.format("DeviceCtx %s is up but we are missing RpcContext", deviceState.getNodeId());
304             LOG.warn(errMsg);
305             return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
306         }
307
308         final InstanceIdentifier<FlowCapableNode> ofNodeII = deviceState.getNodeInstanceIdentifier()
309                 .augmentation(FlowCapableNode.class);
310         final ReadOnlyTransaction readTx = getReadTransaction();
311         final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readOfNodeFuture = readTx.read(
312                 LogicalDatastoreType.OPERATIONAL, ofNodeII);
313
314         final ListenableFuture<Void> nodeInitInfoFuture = Futures.transform(readOfNodeFuture,
315                 new AsyncFunction<Optional<FlowCapableNode>, Void>() {
316                     @Override
317                     public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) throws Exception {
318                         if (!input.isPresent() || input.get().getTable() == null || input.get().getTable().isEmpty()) {
319                             /* Last master close fail scenario so we would like to activate TxManager */
320                             LOG.debug("Operational DS for Device {} has to be replaced", deviceState.getNodeId());
321                             getDeviceState().setDeviceSynchronized(false);
322                             transactionChainManager.activateTransactionManager();
323                         }
324                         return DeviceInitializationUtils.initializeNodeInformation(DeviceContextImpl.this, switchFeaturesMandatory);
325                     }
326                 });
327
328         final ListenableFuture<Boolean> statPollFuture = Futures.transform(nodeInitInfoFuture,
329                 new AsyncFunction<Void, Boolean>() {
330
331                     @Override
332                     public ListenableFuture<Boolean> apply(final Void input) throws Exception {
333                         getStatisticsContext().statListForCollectingInitialization();
334                         if (getDeviceState().deviceSynchronized()) {
335                             return Futures.immediateFuture(Boolean.TRUE);
336                         }
337                         return getStatisticsContext().gatherDynamicData();
338                     }
339                 });
340
341         return Futures.transform(statPollFuture, new Function<Boolean, Void>() {
342
343             @Override
344             public Void apply(final Boolean input) {
345                 if (ConnectionContext.CONNECTION_STATE.RIP.equals(getPrimaryConnectionContext().getConnectionState())) {
346                     final String errMsg = String.format("We lost connection for Device %s, context has to be closed.",
347                             getDeviceState().getNodeId());
348                     LOG.warn(errMsg);
349                     transactionChainManager.clearUnsubmittedTransaction();
350                     throw new IllegalStateException(errMsg);
351                 }
352                 if (!input.booleanValue()) {
353                     final String errMsg = String.format("Get Initial Device %s information fails",
354                             getDeviceState().getNodeId());
355                     LOG.warn(errMsg);
356                     transactionChainManager.clearUnsubmittedTransaction();
357                     throw new IllegalStateException(errMsg);
358                 }
359                 LOG.debug("Get Initial Device {} information is successful", getDeviceState().getNodeId());
360                 getDeviceState().setDeviceSynchronized(true);
361                 transactionChainManager.activateTransactionManager();
362                 initialSubmitTransaction();
363                 getDeviceState().setStatisticsPollingEnabledProp(true);
364                 return null;
365             }
366         });
367     }
368
369     @Override
370     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
371                                                           final InstanceIdentifier<T> path, final T data) {
372         transactionChainManager.writeToTransaction(store, path, data);
373     }
374
375     @Override
376     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
377         transactionChainManager.addDeleteOperationTotTxChain(store, path);
378     }
379
380     @Override
381     public boolean submitTransaction() {
382         return transactionChainManager.submitWriteTransaction();
383     }
384
385     @Override
386     public ConnectionContext getPrimaryConnectionContext() {
387         return primaryConnectionContext;
388     }
389
390     @Override
391     public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
392         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
393     }
394
395     @Override
396     public DeviceFlowRegistry getDeviceFlowRegistry() {
397         return deviceFlowRegistry;
398     }
399
400     @Override
401     public DeviceGroupRegistry getDeviceGroupRegistry() {
402         return deviceGroupRegistry;
403     }
404
405     @Override
406     public DeviceMeterRegistry getDeviceMeterRegistry() {
407         return deviceMeterRegistry;
408     }
409
410     @Override
411     public void processReply(final OfHeader ofHeader) {
412         if (ofHeader instanceof Error) {
413             messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
414         } else {
415             messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
416         }
417     }
418
419     @Override
420     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
421         for (final MultipartReply multipartReply : ofHeaderList) {
422             messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
423         }
424     }
425
426     @Override
427     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
428         final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
429         if (itemLifecycleListener != null) {
430             //1. translate to general flow (table, priority, match, cookie)
431             final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
432                     flowRemovedTranslator.translate(flowRemoved, this, null);
433             //2. create registry key
434             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(flowRemovedNotification);
435             //3. lookup flowId
436             final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey);
437             //4. if flowId present:
438             if (flowDescriptor != null) {
439                 // a) construct flow path
440                 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceState().getNodeInstanceIdentifier()
441                         .augmentation(FlowCapableNode.class)
442                         .child(Table.class, flowDescriptor.getTableKey())
443                         .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
444                 // b) notify listener
445                 itemLifecycleListener.onRemoved(flowPath);
446             } else {
447                 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
448                         getDeviceState().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
449             }
450         }
451     }
452
453     @Override
454     public void processPortStatusMessage(final PortStatusMessage portStatus) {
455         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
456         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator.translate(portStatus, this, null);
457
458         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
459         if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
460             // because of ADD status node connector has to be created
461             final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
462             nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
463             nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
464             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
465         } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
466             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
467         }
468         submitTransaction();
469     }
470
471     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
472         final InstanceIdentifier<Node> iiToNodes = deviceState.getNodeInstanceIdentifier();
473         final BigInteger dataPathId = deviceState.getFeatures().getDatapathId();
474         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
475         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
476     }
477
478     @Override
479     public void processPacketInMessage(final PacketInMessage packetInMessage) {
480         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
481         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
482         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, this, null);
483
484         if (packetReceived == null) {
485             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
486             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
487             return;
488         } else {
489             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
490         }
491
492         if (!packetInLimiter.acquirePermit()) {
493             LOG.debug("Packet limited");
494             // TODO: save packet into emergency slot if possible
495             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
496             return;
497         }
498
499         final ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
500         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
501             LOG.debug("notification offer rejected");
502             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
503             packetInLimiter.drainLowWaterMark();
504             packetInLimiter.releasePermit();
505             return;
506         }
507
508         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
509             @Override
510             public void onSuccess(final Object result) {
511                 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
512                 packetInLimiter.releasePermit();
513             }
514
515             @Override
516             public void onFailure(final Throwable t) {
517                 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
518                 LOG.debug("notification offer failed: {}", t.getMessage());
519                 LOG.trace("notification offer failed..", t);
520                 packetInLimiter.releasePermit();
521             }
522         });
523     }
524
525     @Override
526     public void processExperimenterMessage(final ExperimenterMessage notification) {
527         // lookup converter
528         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
529         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
530                 deviceState.getVersion(),
531                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
532         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter = extensionConverterProvider.getMessageConverter(key);
533         if (messageConverter == null) {
534             LOG.warn("custom converter for {}[OF:{}] not found",
535                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
536                     deviceState.getVersion());
537             return;
538         }
539         // build notification
540         final ExperimenterMessageOfChoice messageOfChoice;
541         try {
542             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
543             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
544                 .setNode(new NodeRef(deviceState.getNodeInstanceIdentifier()))
545                     .setExperimenterMessageOfChoice(messageOfChoice);
546             // publish
547             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
548         } catch (final ConversionException e) {
549             LOG.warn("Conversion of experimenter notification failed", e);
550         }
551     }
552
553     @Override
554     public TranslatorLibrary oook() {
555         return translatorLibrary;
556     }
557
558     @Override
559     public HashedWheelTimer getTimer() {
560         return hashedWheelTimer;
561     }
562
563     @Override
564     public synchronized void close() {
565         LOG.debug("closing deviceContext: {}, nodeId:{}",
566                 getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress(),
567                 getDeviceState().getNodeId());
568
569         if (deviceState.isValid()) {
570             primaryConnectionContext.closeConnection(false);
571             tearDown();
572         }
573     }
574
575     private synchronized void tearDown() {
576         LOG.trace("tearDown method for node {}", deviceState.getNodeId());
577         if (deviceState.isValid()) {
578             deviceState.setValid(false);
579
580             for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
581                 connectionContext.closeConnection(false);
582             }
583
584             deviceGroupRegistry.close();
585             deviceFlowRegistry.close();
586             deviceMeterRegistry.close();
587
588             final ListenableFuture<Void> future = transactionChainManager.shuttingDown();
589             Futures.addCallback(future, new FutureCallback<Void>() {
590
591                 @Override
592                 public void onSuccess(final Void result) {
593                     LOG.info("TxChain {} was shutdown successfull.", getDeviceState().getNodeId());
594                     tearDownClean();
595                 }
596
597                 @Override
598                 public void onFailure(final Throwable t) {
599                     LOG.warn("Shutdown TxChain {} fail.", getDeviceState().getNodeId(), t);
600                     tearDownClean();
601                 }
602             });
603         }
604     }
605
606     private void tearDownClean() {
607         LOG.info("Closing transaction chain manager without cleaning inventory operational");
608         transactionChainManager.close();
609
610         final LinkedList<DeviceContextClosedHandler> reversedCloseHandlers = new LinkedList<>(closeHandlers);
611         Collections.reverse(reversedCloseHandlers);
612         for (final DeviceContextClosedHandler deviceContextClosedHandler : reversedCloseHandlers) {
613             deviceContextClosedHandler.onDeviceContextClosed(this);
614         }
615         closeHandlers.clear();
616     }
617
618     @Override
619     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
620         if (getPrimaryConnectionContext().equals(connectionContext)) {
621             try {
622                 tearDown();
623             } catch (final Exception e) {
624                 LOG.trace("Error closing device context.");
625             }
626         } else {
627             LOG.debug("auxiliary connection dropped: {}, nodeId:{}",
628                     connectionContext.getConnectionAdapter().getRemoteAddress(),
629                     getDeviceState().getNodeId());
630             final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
631             auxiliaryConnectionContexts.remove(connectionDistinguisher);
632         }
633     }
634
635     @Override
636     public void setCurrentBarrierTimeout(final Timeout timeout) {
637         barrierTaskTimeout = timeout;
638     }
639
640     @Override
641     public Timeout getBarrierTaskTimeout() {
642         return barrierTaskTimeout;
643     }
644
645     @Override
646     public void setNotificationService(final NotificationService notificationServiceParam) {
647         notificationService = notificationServiceParam;
648     }
649
650     @Override
651     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
652         this.notificationPublishService = notificationPublishService;
653     }
654
655     @Override
656     public MessageSpy getMessageSpy() {
657         return messageSpy;
658     }
659
660     @Override
661     public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) {
662         closeHandlers.add(deviceContextClosedHandler);
663     }
664
665     @Override
666     public void onPublished() {
667         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
668         for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
669             switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
670         }
671     }
672
673     @Override
674     public MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext) {
675         return new MultiMsgCollectorImpl(this, requestContext);
676     }
677
678     @Override
679     public NodeConnectorRef lookupNodeConnectorRef(final Long portNumber) {
680         return nodeConnectorCache.get(portNumber);
681     }
682
683     @Override
684     public void storeNodeConnectorRef(final Long portNumber, final NodeConnectorRef nodeConnectorRef) {
685         nodeConnectorCache.put(
686                 Preconditions.checkNotNull(portNumber),
687                 Preconditions.checkNotNull(nodeConnectorRef));
688     }
689
690     @Override
691     public void updatePacketInRateLimit(final long upperBound) {
692         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound), (int) (HIGH_WATERMARK_FACTOR * upperBound));
693     }
694
695     @Override
696     public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
697         return itemLifeCycleSourceRegistry;
698     }
699
700     @Override
701     public void setRpcContext(final RpcContext rpcContext) {
702         this.rpcContext = rpcContext;
703     }
704
705     @Override
706     public RpcContext getRpcContext() {
707         return rpcContext;
708     }
709
710     @Override
711     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
712         this.extensionConverterProvider = extensionConverterProvider;
713     }
714
715     @Override
716     public ExtensionConverterProvider getExtensionConverterProvider() {
717         return extensionConverterProvider;
718     }
719
720     @Override
721     public void setStatisticsContext(final StatisticsContext statisticsContext) {
722         this.statCtx = statisticsContext;
723     }
724
725     @Override
726     public StatisticsContext getStatisticsContext() {
727         return statCtx;
728     }
729 }