Remove a potential NPE
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import io.netty.util.HashedWheelTimer;
16 import io.netty.util.Timeout;
17 import java.math.BigInteger;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Map;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
26 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
27 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
30 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
31 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
34 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
35 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
36 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
37 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
38 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
39 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
40 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
41 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
42 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
43 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
44 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
45 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
46 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
48 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
49 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
50 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
51 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
52 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
53 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
72 import org.opendaylight.yangtools.yang.binding.ChildOf;
73 import org.opendaylight.yangtools.yang.binding.DataObject;
74 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
75 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 /**
80  *
81  */
82 public class DeviceContextImpl implements DeviceContext {
83
84     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
85     public static final String DEVICE_DISCONNECTED = "Device disconnected.";
86
87     private final ConnectionContext primaryConnectionContext;
88     private final DeviceState deviceState;
89     private final DataBroker dataBroker;
90     private final HashedWheelTimer hashedWheelTimer;
91     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
92     private final TransactionChainManager txChainManager;
93     private TranslatorLibrary translatorLibrary;
94     private final DeviceFlowRegistry deviceFlowRegistry;
95     private final DeviceGroupRegistry deviceGroupRegistry;
96     private final DeviceMeterRegistry deviceMeterRegistry;
97     private Timeout barrierTaskTimeout;
98     private NotificationService notificationService;
99     private final MessageSpy messageSpy;
100     private DeviceDisconnectedHandler deviceDisconnectedHandler;
101     private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
102     private NotificationPublishService notificationPublishService;
103     private final OutboundQueue outboundQueueProvider;
104     private final MultiMsgCollector multiMsgCollector = new MultiMsgCollectorImpl();
105
106     private volatile int outstandingNotificationsAmount = 0;
107     private volatile boolean filteringPacketIn = false;
108     private final Object throttlingLock = new Object();
109     private int filteringHighWaterMark = 0;
110
111     @Override
112     public MultiMsgCollector getMultiMsgCollector() {
113         return multiMsgCollector;
114     }
115
116     @Override
117     public Long getReservedXid() {
118         return outboundQueueProvider.reserveEntry();
119     }
120
121     @VisibleForTesting
122     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
123                       @Nonnull final DeviceState deviceState,
124                       @Nonnull final DataBroker dataBroker,
125                       @Nonnull final HashedWheelTimer hashedWheelTimer,
126                       @Nonnull final MessageSpy _messageSpy) {
127         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
128         this.deviceState = Preconditions.checkNotNull(deviceState);
129         this.dataBroker = Preconditions.checkNotNull(dataBroker);
130         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
131         txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L, 500L);
132         auxiliaryConnectionContexts = new HashMap<>();
133         deviceFlowRegistry = new DeviceFlowRegistryImpl();
134         deviceGroupRegistry = new DeviceGroupRegistryImpl();
135         deviceMeterRegistry = new DeviceMeterRegistryImpl();
136         messageSpy = _messageSpy;
137         multiMsgCollector.setDeviceReplyProcessor(this);
138         outboundQueueProvider = Preconditions.checkNotNull(primaryConnectionContext.getOutboundQueueProvider());
139     }
140
141     /**
142      * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
143      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
144      */
145     void submitTransaction() {
146         txChainManager.enableSubmit();
147         txChainManager.submitTransaction();
148     }
149
150     @Override
151     public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext<?> requestContext) {
152         // TODO Auto-generated method stub
153     }
154
155     @Override
156     public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
157         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
158         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
159     }
160
161     private static SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) {
162         return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
163     }
164
165     @Override
166     public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
167         // TODO Auto-generated method stub
168     }
169
170     @Override
171     public DeviceState getDeviceState() {
172         return deviceState;
173     }
174
175     @Override
176     public ReadTransaction getReadTransaction() {
177         return dataBroker.newReadOnlyTransaction();
178     }
179
180     @Override
181     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
182                                                           final InstanceIdentifier<T> path, final T data) {
183         txChainManager.writeToTransaction(store, path, data);
184     }
185
186     @Override
187     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
188         txChainManager.addDeleteOperationTotTxChain(store, path);
189     }
190
191     @Override
192     public ConnectionContext getPrimaryConnectionContext() {
193         return primaryConnectionContext;
194     }
195
196     @Override
197     public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
198         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
199     }
200
201     @Override
202     public DeviceFlowRegistry getDeviceFlowRegistry() {
203         return deviceFlowRegistry;
204     }
205
206     @Override
207     public DeviceGroupRegistry getDeviceGroupRegistry() {
208         return deviceGroupRegistry;
209     }
210
211     @Override
212     public DeviceMeterRegistry getDeviceMeterRegistry() {
213         return deviceMeterRegistry;
214     }
215
216     @Override
217     public void processReply(final OfHeader ofHeader) {
218         if (ofHeader instanceof Error) {
219             messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
220         } else {
221             messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
222         }
223     }
224
225     @Override
226     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
227         for (final MultipartReply multipartReply : ofHeaderList) {
228             messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
229         }
230     }
231
232     @Override
233     public void processException(final Xid xid, final DeviceDataException deviceDataException) {
234         messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
235     }
236
237     @Override
238     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
239         //TODO: will be defined later
240     }
241
242     @Override
243     public void processPortStatusMessage(final PortStatusMessage portStatus) {
244         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
245         final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortGrouping.class.getName());
246         final MessageTranslator<PortGrouping, FlowCapableNodeConnector> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
247         final FlowCapableNodeConnector flowCapableNodeConnector = messageTranslator.translate(portStatus, this, null);
248
249         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
250         if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
251             // because of ADD status node connector has to be created
252             final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
253             nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
254             nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
255             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
256         } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
257             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
258         }
259     }
260
261     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
262         final InstanceIdentifier<Node> iiToNodes = deviceState.getNodeInstanceIdentifier();
263         final BigInteger dataPathId = deviceState.getFeatures().getDatapathId();
264         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
265         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
266     }
267
268     @Override
269     public void processPacketInMessage(final PacketInMessage packetInMessage) {
270         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
271         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
272
273         final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName());
274         final MessageTranslator<PacketInMessage, PacketReceived> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
275         final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
276
277         if (packetReceived == null) {
278             LOG.debug("Received a null packet from switch");
279             return;
280         }
281         messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
282
283         ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
284         synchronized (throttlingLock) {
285             outstandingNotificationsAmount += 1;
286         }
287         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
288             LOG.debug("notification offer rejected");
289             synchronized (throttlingLock) {
290                 if (outstandingNotificationsAmount > 1 && !filteringPacketIn) {
291                     connectionAdapter.setPacketInFiltering(true);
292                     messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_ON);
293                     filteringPacketIn = true;
294                     filteringHighWaterMark = outstandingNotificationsAmount;
295                     LOG.debug("PacketIn filtering on: {}, watermark: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
296                 }
297             }
298         }
299
300         Futures.addCallback(offerNotification,
301                 new FutureCallback<Object>() {
302                     @Override
303                     public void onSuccess(final Object result) {
304                         countdownFiltering();
305                         messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
306                     }
307
308                     @Override
309                     public void onFailure(final Throwable t) {
310                         countdownFiltering();
311                         messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
312                         LOG.debug("notification offer failed: {}, outstanding: {}", t.getMessage(), outstandingNotificationsAmount);
313                         LOG.trace("notification offer failed..", t);
314                     }
315
316                     private void countdownFiltering() {
317                         synchronized (throttlingLock) {
318                             outstandingNotificationsAmount -= 1;
319                             if (outstandingNotificationsAmount == 0 && filteringPacketIn) {
320                                 connectionAdapter.setPacketInFiltering(false);
321                                 messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_OFF);
322
323                                 filteringPacketIn = false;
324                                 LOG.debug("PacketIn filtering off: {}, outstanding: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
325                             }
326                         }
327                     }
328                 }
329         );
330     }
331
332     @Override
333     public TranslatorLibrary oook() {
334         return translatorLibrary;
335     }
336
337     @Override
338     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
339         this.translatorLibrary = translatorLibrary;
340     }
341
342     @Override
343     public HashedWheelTimer getTimer() {
344         return hashedWheelTimer;
345     }
346
347     @Override
348     public void close() {
349         deviceState.setValid(false);
350
351         LOG.trace("Removing node {} from operational DS.", getDeviceState().getNodeId());
352         addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, getDeviceState().getNodeInstanceIdentifier());
353
354         deviceGroupRegistry.close();
355         deviceFlowRegistry.close();
356         deviceMeterRegistry.close();
357
358         if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
359             primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
360             primaryConnectionContext.getConnectionAdapter().disconnect();
361         }
362         for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
363             if (connectionContext.getConnectionAdapter().isAlive()) {
364                 connectionContext.getConnectionAdapter().disconnect();
365             }
366         }
367         for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
368             deviceContextClosedHandler.onDeviceContextClosed(this);
369         }
370     }
371
372     @Override
373     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
374         if (this.getPrimaryConnectionContext().equals(connectionContext)) {
375             try {
376                 close();
377             } catch (final Exception e) {
378                 LOG.trace("Error closing device context.");
379             }
380             if (null != deviceDisconnectedHandler) {
381                 deviceDisconnectedHandler.onDeviceDisconnected(connectionContext);
382             }
383         } else {
384             final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
385             auxiliaryConnectionContexts.remove(connectionDistinguisher);
386         }
387     }
388
389     @Override
390     public void setCurrentBarrierTimeout(final Timeout timeout) {
391         barrierTaskTimeout = timeout;
392     }
393
394     @Override
395     public Timeout getBarrierTaskTimeout() {
396         return barrierTaskTimeout;
397     }
398
399     @Override
400     public void setNotificationService(final NotificationService notificationServiceParam) {
401         notificationService = notificationServiceParam;
402     }
403
404     @Override
405     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
406         this.notificationPublishService = notificationPublishService;
407     }
408
409     @Override
410     public MessageSpy getMessageSpy() {
411         return messageSpy;
412     }
413
414     @Override
415     public void setDeviceDisconnectedHandler(final DeviceDisconnectedHandler deviceDisconnectedHandler) {
416         this.deviceDisconnectedHandler = deviceDisconnectedHandler;
417     }
418
419     @Override
420     public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) {
421         this.closeHandlers.add(deviceContextClosedHandler);
422     }
423
424     @Override
425     public void startGatheringOperationsToOneTransaction() {
426         txChainManager.startGatheringOperationsToOneTransaction();
427     }
428
429     @Override
430     public void commitOperationsGatheredInOneTransaction() {
431         txChainManager.commitOperationsGatheredInOneTransaction();
432     }
433
434     @Override
435     public void onPublished() {
436         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
437         for (ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
438             switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
439         }
440     }
441 }