Merge "Improving logging in DeviceContext and MultiMsgCollector."
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.SettableFuture;
13 import io.netty.util.HashedWheelTimer;
14 import io.netty.util.Timeout;
15 import java.math.BigInteger;
16 import java.util.ArrayList;
17 import java.util.HashMap;
18 import java.util.Iterator;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.TreeMap;
22 import java.util.concurrent.atomic.AtomicLong;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
28 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
31 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
32 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
34 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
35 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
36 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
37 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
38 import org.opendaylight.openflowplugin.api.openflow.device.listener.OpenflowMessageListenerFacade;
39 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
40 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
41 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
42 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
43 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
44 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
45 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
46 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
47 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
48 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
49 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
50 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
69 import org.opendaylight.yangtools.yang.binding.ChildOf;
70 import org.opendaylight.yangtools.yang.binding.DataObject;
71 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
72 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.common.RpcError;
74 import org.opendaylight.yangtools.yang.common.RpcResult;
75 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 /**
80  *
81  */
82 public class DeviceContextImpl implements DeviceContext {
83
84     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
85     public static final String DEVICE_DISCONNECTED = "Device disconnected.";
86
87     private final ConnectionContext primaryConnectionContext;
88     private final DeviceState deviceState;
89     private final DataBroker dataBroker;
90     private final XidGenerator xidGenerator;
91     private final HashedWheelTimer hashedWheelTimer;
92     private Map<Long, RequestContext> requests = new TreeMap();
93
94     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
95     private final TransactionChainManager txChainManager;
96     private TranslatorLibrary translatorLibrary;
97     private OpenflowMessageListenerFacade openflowMessageListenerFacade;
98     private final DeviceFlowRegistry deviceFlowRegistry;
99     private final DeviceGroupRegistry deviceGroupRegistry;
100     private final DeviceMeterRegistry deviceMeterRegistry;
101     private Timeout barrierTaskTimeout;
102     private NotificationProviderService notificationService;
103     private final MessageSpy<Class> messageSpy;
104     private DeviceDisconnectedHandler deviceDisconnectedHandler;
105     private List<DeviceContextClosedHandler> closeHandlers = new ArrayList<>();
106
107
108     @VisibleForTesting
109     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
110                       @Nonnull final DeviceState deviceState,
111                       @Nonnull final DataBroker dataBroker,
112                       @Nonnull final HashedWheelTimer hashedWheelTimer,
113                       @Nonnull final MessageSpy _messageSpy) {
114         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
115         this.deviceState = Preconditions.checkNotNull(deviceState);
116         this.dataBroker = Preconditions.checkNotNull(dataBroker);
117         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
118         xidGenerator = new XidGenerator();
119         txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L, 500L);
120         auxiliaryConnectionContexts = new HashMap<>();
121         deviceFlowRegistry = new DeviceFlowRegistryImpl();
122         deviceGroupRegistry = new DeviceGroupRegistryImpl();
123         deviceMeterRegistry = new DeviceMeterRegistryImpl();
124         messageSpy = _messageSpy;
125     }
126
127     /**
128      * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
129      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
130      */
131     void submitTransaction() {
132         txChainManager.submitTransaction();
133         txChainManager.enableCounter();
134     }
135
136     @Override
137     public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext requestContext) {
138         // TODO Auto-generated method stub
139
140     }
141
142     @Override
143     public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
144         final SwitchConnectionDistinguisher connectionDistinguisher = new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
145         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
146     }
147
148     @Override
149     public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
150         // TODO Auto-generated method stub
151     }
152
153     @Override
154     public DeviceState getDeviceState() {
155         return deviceState;
156     }
157
158     @Override
159     public ReadTransaction getReadTransaction() {
160         return dataBroker.newReadOnlyTransaction();
161     }
162
163     @Override
164     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
165                                                           final InstanceIdentifier<T> path, final T data) {
166         txChainManager.writeToTransaction(store, path, data);
167     }
168
169     @Override
170     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
171         txChainManager.addDeleteOperationTotTxChain(store, path);
172     }
173
174     @Override
175     public ConnectionContext getPrimaryConnectionContext() {
176         return primaryConnectionContext;
177     }
178
179     @Override
180     public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
181         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
182     }
183
184     @Override
185     public Xid getNextXid() {
186         return xidGenerator.generate();
187     }
188
189     @Override
190     public RequestContext lookupRequest(Xid xid) {
191         return requests.get(xid.getValue());
192     }
193
194     @Override
195     public int getNumberOfOutstandingRequests() {
196         return requests.size();
197     }
198
199     @Override
200     public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) {
201         requests.put(xid.getValue(), requestFutureContext);
202     }
203
204     @Override
205     public RequestContext unhookRequestCtx(Xid xid) {
206         return requests.remove(xid.getValue());
207     }
208
209     @Override
210     public void attachOpenflowMessageListener(final OpenflowMessageListenerFacade openflowMessageListenerFacade) {
211         this.openflowMessageListenerFacade = openflowMessageListenerFacade;
212         primaryConnectionContext.getConnectionAdapter().setMessageListener(openflowMessageListenerFacade);
213     }
214
215     @Override
216     public OpenflowMessageListenerFacade getOpenflowMessageListenerFacade() {
217         return openflowMessageListenerFacade;
218     }
219
220     @Override
221     public DeviceFlowRegistry getDeviceFlowRegistry() {
222         return deviceFlowRegistry;
223     }
224
225     @Override
226     public DeviceGroupRegistry getDeviceGroupRegistry() {
227         return deviceGroupRegistry;
228     }
229
230     @Override
231     public DeviceMeterRegistry getDeviceMeterRegistry() {
232         return deviceMeterRegistry;
233     }
234
235     @Override
236     public void processReply(final OfHeader ofHeader) {
237         final RequestContext requestContext = requests.get(ofHeader.getXid());
238         if (null != requestContext) {
239             final SettableFuture replyFuture = requestContext.getFuture();
240             requests.remove(ofHeader.getXid());
241             RpcResult<OfHeader> rpcResult;
242             if (ofHeader instanceof Error) {
243                 //TODO : this is the point, where we can discover that add flow operation failed and where we should
244                 //TODO : remove this flow from deviceFlowRegistry
245                 final Error error = (Error) ofHeader;
246                 final String message = "Operation on device failed";
247                 rpcResult = RpcResultBuilder
248                         .<OfHeader>failed()
249                         .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
250                         .build();
251                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
252             } else {
253                 rpcResult = RpcResultBuilder
254                         .<OfHeader>success()
255                         .withResult(ofHeader)
256                         .build();
257                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
258             }
259
260             replyFuture.set(rpcResult);
261             try {
262                 requestContext.close();
263             } catch (final Exception e) {
264                 LOG.error("Closing RequestContext failed: ", e);
265             }
266         } else {
267             LOG.error("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(),
268                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
269         }
270     }
271
272     @Override
273     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
274         final RequestContext requestContext = requests.get(xid.getValue());
275         if (null != requestContext) {
276             final SettableFuture replyFuture = requestContext.getFuture();
277             requests.remove(xid.getValue());
278             final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
279                     .<List<MultipartReply>>success()
280                     .withResult(ofHeaderList)
281                     .build();
282             replyFuture.set(rpcResult);
283             for (MultipartReply multipartReply : ofHeaderList) {
284                 messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
285             }
286
287             try {
288                 requestContext.close();
289             } catch (final Exception e) {
290                 LOG.error("Closing RequestContext failed: ", e);
291             }
292         } else {
293             LOG.error("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(),
294                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
295         }
296     }
297
298     @Override
299     public void processException(final Xid xid, final DeviceDataException deviceDataException) {
300
301         LOG.trace("Processing exception for xid : {}", xid.getValue());
302
303         final RequestContext requestContext = requests.get(xid.getValue());
304
305         if (null != requestContext) {
306             final SettableFuture replyFuture = requestContext.getFuture();
307             requests.remove(xid.getValue());
308             final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
309                     .<List<OfHeader>>failed()
310                     .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
311                     .build();
312             replyFuture.set(rpcResult);
313             messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
314             try {
315                 requestContext.close();
316             } catch (final Exception e) {
317                 LOG.warn("Closing RequestContext failed: ", e);
318             }
319         } else {
320             LOG.warn("Can't find request context registered for xid : {}. Exception message {}",
321                     xid.getValue(), deviceDataException.getMessage());
322         }
323     }
324
325     @Override
326     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
327         //TODO: will be defined later
328     }
329
330     @Override
331     public void processPortStatusMessage(final PortStatusMessage portStatus) {
332         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
333         final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortGrouping.class.getName());
334         final MessageTranslator<PortGrouping, FlowCapableNodeConnector> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
335         final FlowCapableNodeConnector flowCapableNodeConnector = messageTranslator.translate(portStatus, this, null);
336
337         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
338         if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
339             // because of ADD status node connector has to be created
340             final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
341             nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
342             nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
343             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
344         } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
345             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
346         }
347     }
348
349     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
350         final InstanceIdentifier<Node> iiToNodes = deviceState.getNodeInstanceIdentifier();
351         final BigInteger dataPathId = deviceState.getFeatures().getDatapathId();
352         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
353         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
354     }
355
356     @Override
357     public void processPacketInMessage(final PacketInMessage packetInMessage) {
358         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
359         final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName());
360         final MessageTranslator<PacketInMessage, PacketReceived> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
361         final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
362         notificationService.publish(packetReceived);
363     }
364
365     @Override
366     public TranslatorLibrary oook() {
367         return translatorLibrary;
368     }
369
370     @Override
371     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
372         this.translatorLibrary = translatorLibrary;
373     }
374
375     @Override
376     public HashedWheelTimer getTimer() {
377         return hashedWheelTimer;
378     }
379
380     @Override
381     public void close() throws Exception {
382         deviceState.setValid(false);
383
384         LOG.trace("Removing node {} from operational DS.", getDeviceState().getNodeId());
385         addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, getDeviceState().getNodeInstanceIdentifier());
386
387         deviceGroupRegistry.close();
388         deviceFlowRegistry.close();
389         deviceMeterRegistry.close();
390
391         if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
392             primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
393             primaryConnectionContext.getConnectionAdapter().disconnect();
394         }
395         for (Map.Entry<Long, RequestContext> entry : requests.entrySet()) {
396             RequestContextUtil.closeRequestContextWithRpcError(entry.getValue(), DEVICE_DISCONNECTED);
397         }
398         for (ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
399             if (connectionContext.getConnectionAdapter().isAlive()) {
400                 connectionContext.getConnectionAdapter().disconnect();
401             }
402         }
403         for (DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
404             deviceContextClosedHandler.onDeviceContextClosed(this);
405         }
406
407     }
408
409     @Override
410     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
411         if (this.getPrimaryConnectionContext().equals(connectionContext)) {
412             try {
413                 close();
414             } catch (Exception e) {
415                 LOG.trace("Error closing device context.");
416             }
417             if (null != deviceDisconnectedHandler) {
418                 deviceDisconnectedHandler.onDeviceDisconnected(connectionContext);
419             }
420         } else {
421             auxiliaryConnectionContexts.remove(connectionContext);
422         }
423     }
424
425
426     private class XidGenerator {
427
428         private final AtomicLong xid = new AtomicLong(0);
429
430         public Xid generate() {
431             return new Xid(xid.incrementAndGet());
432         }
433     }
434
435     @Override
436     public RequestContext extractNextOutstandingMessage(final long barrierXid) {
437         RequestContext nextMessage = null;
438         final Iterator<Long> keyIterator = requests.keySet().iterator();
439         if (keyIterator.hasNext()) {
440             final Long oldestXid = keyIterator.next();
441             if (oldestXid < barrierXid) {
442                 nextMessage = requests.remove(oldestXid);
443             }
444         }
445         return nextMessage;
446     }
447
448     @Override
449     public void setCurrentBarrierTimeout(final Timeout timeout) {
450         barrierTaskTimeout = timeout;
451     }
452
453     @Override
454     public Timeout getBarrierTaskTimeout() {
455         return barrierTaskTimeout;
456     }
457
458     @Override
459     public void setNotificationService(final NotificationProviderService notificationServiceParam) {
460         notificationService = notificationServiceParam;
461     }
462
463     @Override
464     public MessageSpy getMessageSpy() {
465         return messageSpy;
466     }
467
468     @Override
469     public void setDeviceDisconnectedHandler(final DeviceDisconnectedHandler deviceDisconnectedHandler) {
470         this.deviceDisconnectedHandler = deviceDisconnectedHandler;
471     }
472
473     @Override
474     public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) {
475         this.closeHandlers.add(deviceContextClosedHandler);
476     }
477 }