Bug 8223: Fixed incorrect enable-flow-removed-notification check.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Verify;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.JdkFutureAdapters;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import io.netty.util.HashedWheelTimer;
19 import io.netty.util.Timeout;
20 import io.netty.util.TimerTask;
21 import java.math.BigInteger;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30 import javax.annotation.Nonnull;
31 import javax.annotation.Nullable;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
40 import org.opendaylight.openflowplugin.api.ConnectionException;
41 import org.opendaylight.openflowplugin.api.OFConstants;
42 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
43 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
46 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
47 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
48 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
49 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
50 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
51 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
52 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
53 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
54 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
55 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
56 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
57 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
58 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
59 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
60 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
61 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
62 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
63 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
64 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
65 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
66 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
67 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
68 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
69 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
70 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
71 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
72 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
73 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
74 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
75 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
76 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
77 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
78 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
79 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
80 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
81 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
82 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
83 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
84 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
85 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
86 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
87 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
88 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
118 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
119 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
120 import org.opendaylight.yangtools.yang.binding.DataObject;
121 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
122 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
123 import org.opendaylight.yangtools.yang.common.RpcResult;
124 import org.slf4j.Logger;
125 import org.slf4j.LoggerFactory;
126
127 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
128
129     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
130
131     // TODO: drain factor should be parametrized
132     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
133     // TODO: low water mark factor should be parametrized
134     private static final float LOW_WATERMARK_FACTOR = 0.75f;
135     // TODO: high water mark factor should be parametrized
136     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
137
138     // Timeout in seconds after what we will give up on propagating role
139     private static final int SET_ROLE_TIMEOUT = 10;
140
141     private static final int LOW_WATERMARK = 1000;
142     private static final int HIGH_WATERMARK = 2000;
143
144     private boolean initialized;
145
146     private SalRoleService salRoleService = null;
147     private final HashedWheelTimer hashedWheelTimer;
148     private volatile ConnectionContext primaryConnectionContext;
149     private final DeviceState deviceState;
150     private final DataBroker dataBroker;
151     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
152     private TransactionChainManager transactionChainManager;
153     private DeviceFlowRegistry deviceFlowRegistry;
154     private DeviceGroupRegistry deviceGroupRegistry;
155     private DeviceMeterRegistry deviceMeterRegistry;
156     private PacketInRateLimiter packetInLimiter;
157     private final MessageSpy messageSpy;
158     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
159     private NotificationPublishService notificationPublishService;
160     private Timeout barrierTaskTimeout;
161     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
162     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
163     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
164     private final TranslatorLibrary translatorLibrary;
165     private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
166     private ExtensionConverterProvider extensionConverterProvider;
167     private boolean skipTableFeatures;
168     private boolean switchFeaturesMandatory;
169     private DeviceInfo deviceInfo;
170     private final ConvertorExecutor convertorExecutor;
171     private volatile CONTEXT_STATE state;
172     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
173     private final DeviceManager myManager;
174     private final DeviceInitializerProvider deviceInitializerProvider;
175     private final boolean useSingleLayerSerialization;
176     private OutboundQueueProvider outboundQueueProvider;
177
178     DeviceContextImpl(
179         @Nonnull final ConnectionContext primaryConnectionContext,
180         @Nonnull final DataBroker dataBroker,
181         @Nonnull final MessageSpy messageSpy,
182         @Nonnull final TranslatorLibrary translatorLibrary,
183         @Nonnull final DeviceManager contextManager,
184         final ConvertorExecutor convertorExecutor,
185         final boolean skipTableFeatures,
186         final HashedWheelTimer hashedWheelTimer,
187         final boolean useSingleLayerSerialization,
188         final DeviceInitializerProvider deviceInitializerProvider) {
189
190         this.primaryConnectionContext = primaryConnectionContext;
191         this.outboundQueueProvider = (OutboundQueueProvider) primaryConnectionContext.getOutboundQueueProvider();
192         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
193         this.hashedWheelTimer = hashedWheelTimer;
194         this.deviceInitializerProvider = deviceInitializerProvider;
195         this.myManager = contextManager;
196         this.deviceState = new DeviceStateImpl();
197         this.dataBroker = dataBroker;
198         this.auxiliaryConnectionContexts = new HashMap<>();
199         this.messageSpy = messageSpy;
200
201         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
202                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
203
204         this.translatorLibrary = translatorLibrary;
205         this.portStatusTranslator = translatorLibrary.lookupTranslator(
206                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
207         this.packetInTranslator = translatorLibrary.lookupTranslator(
208                 new TranslatorKey(deviceInfo.getVersion(), PacketIn.class.getName()));
209         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
210                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
211
212         this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
213         this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
214         this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
215         this.state = CONTEXT_STATE.INITIALIZATION;
216         this.convertorExecutor = convertorExecutor;
217         this.skipTableFeatures = skipTableFeatures;
218         this.useSingleLayerSerialization = useSingleLayerSerialization;
219         this.initialized = false;
220     }
221
222     @Override
223     public boolean initialSubmitTransaction() {
224         return initialized && transactionChainManager.initialSubmitWriteTransaction();
225     }
226
227     @Override
228     public void addAuxiliaryConnectionContext(final ConnectionContext connectionContext) {
229         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
230         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
231     }
232
233     private static SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) {
234         return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
235     }
236
237     @Override
238     public void removeAuxiliaryConnectionContext(final ConnectionContext connectionContext) {
239         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
240         LOG.debug("auxiliary connection dropped: {}, nodeId:{}", connectionContext.getConnectionAdapter()
241                 .getRemoteAddress(), getDeviceInfo().getLOGValue());
242         auxiliaryConnectionContexts.remove(connectionDistinguisher);
243     }
244
245     @Override
246     public DeviceState getDeviceState() {
247         return deviceState;
248     }
249
250     @Override
251     public ReadOnlyTransaction getReadTransaction() {
252         return dataBroker.newReadOnlyTransaction();
253     }
254
255     @Override
256     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
257                                                           final InstanceIdentifier<T> path,
258                                                           final T data){
259         if (initialized) {
260             transactionChainManager.writeToTransaction(store, path, data, false);
261         }
262     }
263
264     @Override
265     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
266                                                                          final InstanceIdentifier<T> path,
267                                                                          final T data){
268         if (initialized) {
269             transactionChainManager.writeToTransaction(store, path, data, true);
270         }
271     }
272
273     @Override
274     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
275         if (initialized) {
276             transactionChainManager.addDeleteOperationTotTxChain(store, path);
277         }
278     }
279
280     @Override
281     public boolean submitTransaction() {
282         return initialized && transactionChainManager.submitWriteTransaction();
283     }
284
285     @Override
286     public ConnectionContext getPrimaryConnectionContext() {
287         return primaryConnectionContext;
288     }
289
290     @Override
291     public ConnectionContext getAuxiliaryConnectionContexts(final BigInteger cookie) {
292         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
293     }
294
295     @Override
296     public DeviceFlowRegistry getDeviceFlowRegistry() {
297         return deviceFlowRegistry;
298     }
299
300     @Override
301     public DeviceGroupRegistry getDeviceGroupRegistry() {
302         return deviceGroupRegistry;
303     }
304
305     @Override
306     public DeviceMeterRegistry getDeviceMeterRegistry() {
307         return deviceMeterRegistry;
308     }
309
310     @Override
311     public void processReply(final OfHeader ofHeader) {
312         messageSpy.spyMessage(
313             ofHeader.getImplementedInterface(),
314             (ofHeader instanceof Error)
315                 ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE
316                 : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
317     }
318
319     @Override
320     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
321         ofHeaderList.forEach(header -> messageSpy.spyMessage(
322             header.getImplementedInterface(),
323             (header instanceof Error)
324                 ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE
325                 : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS));
326     }
327
328     @Override
329     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
330         //1. translate to general flow (table, priority, match, cookie)
331         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
332                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
333
334         if(myManager.isFlowRemovedNotificationOn()) {
335             // Trigger off a notification
336             notificationPublishService.offerNotification(flowRemovedNotification);
337         } else if(LOG.isDebugEnabled()) {
338             LOG.debug("For nodeId={} isNotificationFlowRemovedOn={}", getDeviceInfo().getLOGValue(), myManager.isFlowRemovedNotificationOn());
339         }
340
341         final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
342         if (itemLifecycleListener != null) {
343             //2. create registry key
344             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification);
345             //3. lookup flowId
346             final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
347             //4. if flowId present:
348             if (flowDescriptor != null) {
349                 // a) construct flow path
350                 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
351                         .augmentation(FlowCapableNode.class)
352                         .child(Table.class, flowDescriptor.getTableKey())
353                         .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
354                 // b) notify listener
355                 itemLifecycleListener.onRemoved(flowPath);
356             } else {
357                 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
358                         getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
359             }
360         }
361     }
362
363     @Override
364     public void processPortStatusMessage(final PortStatusMessage portStatus) {
365         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
366         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator.translate(portStatus, getDeviceInfo(), null);
367
368         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
369         try {
370             if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
371                 // because of ADD status node connector has to be created
372                 final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
373                 nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
374                 nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
375                 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
376             } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
377                 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
378             }
379             submitTransaction();
380         } catch (final Exception e) {
381             LOG.warn("Error processing port status message for port {} on device {} : {}", portStatus.getPortNo(),
382                     getDeviceInfo().getNodeId().toString(), e);
383         }
384     }
385
386     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
387         final InstanceIdentifier<Node> iiToNodes = getDeviceInfo().getNodeInstanceIdentifier();
388         final BigInteger dataPathId = getDeviceInfo().getDatapathId();
389         final NodeConnectorId nodeConnectorId = InventoryDataServiceUtil.nodeConnectorIdfromDatapathPortNo(dataPathId, portNo, OpenflowVersion.get(version));
390         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
391     }
392
393     @Override
394     public void processPacketInMessage(final PacketInMessage packetInMessage) {
395         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
396         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
397         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
398
399         if (packetReceived == null) {
400             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
401             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
402             return;
403         } else {
404             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
405         }
406
407         if (!packetInLimiter.acquirePermit()) {
408             LOG.debug("Packet limited");
409             // TODO: save packet into emergency slot if possible
410             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
411             return;
412         }
413
414         final ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(packetReceived);
415         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
416             LOG.debug("notification offer rejected");
417             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
418             packetInLimiter.drainLowWaterMark();
419             packetInLimiter.releasePermit();
420             return;
421         }
422
423         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
424             @Override
425             public void onSuccess(final Object result) {
426                 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
427                 packetInLimiter.releasePermit();
428             }
429
430             @Override
431             public void onFailure(final Throwable t) {
432                 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
433                 LOG.debug("notification offer failed: {}", t.getMessage());
434                 LOG.trace("notification offer failed..", t);
435                 packetInLimiter.releasePermit();
436             }
437         });
438     }
439
440     @Override
441     public void processExperimenterMessage(final ExperimenterMessage notification) {
442         // lookup converter
443         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
444         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
445                 getDeviceInfo().getVersion(),
446                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
447         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter = extensionConverterProvider.getMessageConverter(key);
448         if (messageConverter == null) {
449             LOG.warn("custom converter for {}[OF:{}] not found",
450                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
451                     getDeviceInfo().getVersion());
452             return;
453         }
454         // build notification
455         final ExperimenterMessageOfChoice messageOfChoice;
456         try {
457             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
458             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
459                 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
460                     .setExperimenterMessageOfChoice(messageOfChoice);
461             // publish
462             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
463         } catch (final ConversionException e) {
464             LOG.error("Conversion of experimenter notification failed", e);
465         }
466     }
467
468     @Override
469     public TranslatorLibrary oook() {
470         return translatorLibrary;
471     }
472
473     @Override
474     public void setCurrentBarrierTimeout(final Timeout timeout) {
475         barrierTaskTimeout = timeout;
476     }
477
478     @Override
479     public Timeout getBarrierTaskTimeout() {
480         return barrierTaskTimeout;
481     }
482
483     @Override
484     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
485         this.notificationPublishService = notificationPublishService;
486     }
487
488     @Override
489     public MessageSpy getMessageSpy() {
490         return messageSpy;
491     }
492
493     @Override
494     public void onPublished() {
495         Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
496         this.state = CONTEXT_STATE.WORKING;
497         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
498     }
499
500     @Override
501     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>> requestContext) {
502         return new MultiMsgCollectorImpl<>(this, requestContext);
503     }
504
505     @Override
506     public void updatePacketInRateLimit(final long upperBound) {
507         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound), (int) (HIGH_WATERMARK_FACTOR * upperBound));
508     }
509
510     @Override
511     public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
512         return itemLifeCycleSourceRegistry;
513     }
514
515     @Override
516     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
517         this.extensionConverterProvider = extensionConverterProvider;
518     }
519
520     @Override
521     public ExtensionConverterProvider getExtensionConverterProvider() {
522         return extensionConverterProvider;
523     }
524
525     @Override
526     public synchronized void shutdownConnection() {
527         if (LOG.isDebugEnabled()) {
528             LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue());
529         }
530         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
531             LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getLOGValue());
532             return;
533         }
534
535         if (ConnectionContext.CONNECTION_STATE.RIP.equals(getPrimaryConnectionContext().getConnectionState())) {
536             LOG.debug("ConnectionCtx for Node {} is in RIP state.", getDeviceInfo().getLOGValue());
537             return;
538         }
539
540         // Terminate Auxiliary Connection
541         for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
542             LOG.debug("Closing auxiliary connection {}", connectionContext.getNodeId());
543             connectionContext.closeConnection(false);
544         }
545
546         // Terminate Primary Connection
547         getPrimaryConnectionContext().closeConnection(true);
548
549         // Close all datastore registries
550         if (initialized) {
551             deviceGroupRegistry.close();
552             deviceFlowRegistry.close();
553             deviceMeterRegistry.close();
554         }
555     }
556
557     @Override
558     public ListenableFuture<Void> shuttingDownDataStoreTransactions() {
559         return initialized
560                 ? this.transactionChainManager.shuttingDown()
561                 : Futures.immediateFuture(null);
562     }
563
564     @VisibleForTesting
565     TransactionChainManager getTransactionChainManager() {
566         return this.transactionChainManager;
567     }
568
569     @Override
570     public void setSwitchFeaturesMandatory(boolean switchFeaturesMandatory) {
571         this.switchFeaturesMandatory = switchFeaturesMandatory;
572     }
573
574     @Override
575     public synchronized void replaceConnection(final ConnectionContext connectionContext) {
576
577         primaryConnectionContext = null;
578         deviceInfo = null;
579         packetInLimiter = null;
580
581         primaryConnectionContext = connectionContext;
582         deviceInfo = primaryConnectionContext.getDeviceInfo();
583
584         packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
585                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR);
586
587         primaryConnectionContext.setOutboundQueueProvider(outboundQueueProvider);
588
589         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
590                 primaryConnectionContext.getConnectionAdapter().registerOutboundQueueHandler(
591                         outboundQueueProvider,
592                         myManager.getBarrierCountLimit(),
593                         myManager.getBarrierIntervalNanos());
594
595         primaryConnectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
596
597         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
598                 primaryConnectionContext.getConnectionAdapter(), this);
599
600         primaryConnectionContext.getConnectionAdapter().setMessageListener(messageListener);
601
602         LOG.info("ConnectionEvent: Connection on device:{}, NodeId:{} switched.",
603                 primaryConnectionContext.getConnectionAdapter().getRemoteAddress(),
604                 primaryConnectionContext.getDeviceInfo().getNodeId());
605
606     }
607
608     @Override
609     public CONTEXT_STATE getState() {
610         return this.state;
611     }
612
613     @Override
614     public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
615         final ListenableFuture<Void> deactivateTxManagerFuture = initialized
616                 ? transactionChainManager.deactivateTransactionManager()
617                 : Futures.immediateFuture(null);
618
619         if (!connectionInterrupted) {
620             final ListenableFuture<Void> makeSlaveFuture
621                     = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
622                 @Nullable
623                 @Override
624                 public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
625                     return null;
626                 }
627             });
628
629             Futures.addCallback(makeSlaveFuture, new FutureCallback<Void>() {
630                 @Override
631                 public void onSuccess(@Nullable Void aVoid) {
632                     if (LOG.isDebugEnabled()) {
633                         LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
634                     }
635                 }
636
637                 @Override
638                 public void onFailure(final Throwable throwable) {
639                     LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue());
640                     LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
641                 }
642             });
643         }
644         return deactivateTxManagerFuture;
645     }
646
647     @Override
648     public ServiceGroupIdentifier getServiceIdentifier() {
649         return this.deviceInfo.getServiceIdentifier();
650     }
651
652     @Override
653     public DeviceInfo getDeviceInfo() {
654         return this.deviceInfo;
655     }
656
657     @Override
658     public void close() {
659         //NOOP
660     }
661
662     @Override
663     public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){
664         if (initialized) {
665             this.transactionChainManager.setLifecycleService(lifecycleService);
666         }
667     }
668
669     @Override
670     public boolean canUseSingleLayerSerialization() {
671         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
672     }
673
674     @Override
675     public boolean isSkipTableFeatures() {
676         return this.skipTableFeatures;
677     }
678
679     @Override
680     public void setSalRoleService(@Nonnull SalRoleService salRoleService) {
681         this.salRoleService = salRoleService;
682     }
683
684     @Override
685     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
686         this.clusterInitializationPhaseHandler = handler;
687     }
688
689     @Override
690     public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
691
692         LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
693
694         lazyTransactionManagerInitialization();
695
696         this.transactionChainManager.activateTransactionManager();
697
698         try {
699             final java.util.Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
700                 .lookup(deviceInfo.getVersion());
701
702             if (initializer.isPresent()) {
703                 final MultipartWriterProvider writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
704                 initializer.get().initialize(this, switchFeaturesMandatory, writerProvider, convertorExecutor);
705             } else {
706                 throw new ExecutionException(new ConnectionException("Unsupported version " + deviceInfo.getVersion()));
707             }
708         } catch (ExecutionException | InterruptedException e) {
709             LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), e);
710             return false;
711         }
712
713         Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER),
714                 new RpcResultFutureCallback(mastershipChangeListener));
715
716         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();
717         Futures.addCallback(deviceFlowRegistryFill,
718                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, mastershipChangeListener));
719
720         return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
721     }
722
723     @VisibleForTesting
724     void lazyTransactionManagerInitialization() {
725         if (!this.initialized) {
726             if (LOG.isDebugEnabled()) {
727                 LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
728             }
729             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
730             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier());
731             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
732             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
733             this.initialized = true;
734         }
735     }
736
737     @Nullable
738     @Override
739     public <T> RequestContext<T> createRequestContext() {
740         return new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
741             @Override
742             public void close() {
743             }
744         };
745
746     }
747
748     ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
749         if (LOG.isDebugEnabled()) {
750             LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
751         }
752
753         final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
754
755         if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
756             final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
757                     .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
758
759             setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
760
761             final TimerTask timerTask = timeout -> {
762                 if (!setRoleOutputFuture.isDone()) {
763                     LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT);
764                     setRoleOutputFuture.cancel(true);
765                 }
766             };
767
768             hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
769         } else {
770             LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
771             return Futures.immediateFuture(null);
772         }
773
774         return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
775     }
776
777     @Override
778     public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
779         return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
780     }
781
782     private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
783
784         private final MastershipChangeListener mastershipChangeListener;
785
786         RpcResultFutureCallback(final MastershipChangeListener mastershipChangeListener) {
787             this.mastershipChangeListener = mastershipChangeListener;
788         }
789
790         @Override
791         public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
792             this.mastershipChangeListener.onMasterRoleAcquired(
793                     deviceInfo,
794                     ContextChainMastershipState.MASTER_ON_DEVICE
795             );
796             if (LOG.isDebugEnabled()) {
797                 LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
798             }
799         }
800
801         @Override
802         public void onFailure(final Throwable throwable) {
803             mastershipChangeListener.onNotAbleToStartMastership(
804                     deviceInfo,
805                     "Was not able to set MASTER role on device");
806         }
807     }
808
809     private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
810         private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
811         private final MastershipChangeListener mastershipChangeListener;
812
813         DeviceFlowRegistryCallback(
814                 ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
815                 MastershipChangeListener mastershipChangeListener) {
816             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
817             this.mastershipChangeListener = mastershipChangeListener;
818         }
819
820         @Override
821         public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
822             if (LOG.isDebugEnabled()) {
823                 // Count all flows we read from datastore for debugging purposes.
824                 // This number do not always represent how many flows were actually added
825                 // to DeviceFlowRegistry, because of possible duplicates.
826                 long flowCount = Optional.fromNullable(result).asSet().stream()
827                         .flatMap(Collection::stream)
828                         .filter(Objects::nonNull)
829                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
830                         .filter(Objects::nonNull)
831                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
832                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
833                         .filter(Objects::nonNull)
834                         .filter(table -> Objects.nonNull(table.getFlow()))
835                         .flatMap(table -> table.getFlow().stream())
836                         .filter(Objects::nonNull)
837                         .count();
838
839                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getLOGValue());
840             }
841             this.mastershipChangeListener.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_FLOW_REGISTRY_FILL);
842         }
843
844         @Override
845         public void onFailure(Throwable t) {
846             if (deviceFlowRegistryFill.isCancelled()) {
847                 if (LOG.isDebugEnabled()) {
848                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
849                 }
850             } else {
851                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t);
852             }
853             mastershipChangeListener.onNotAbleToStartMastership(
854                     deviceInfo,
855                     "Was not able to fill flow registry on device");
856         }
857     }
858
859 }