0accedc5a3bc4053c4cb4c6f73f3afa3d987c409
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.SettableFuture;
13 import io.netty.util.HashedWheelTimer;
14 import io.netty.util.Timeout;
15 import java.math.BigInteger;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.TreeMap;
23 import java.util.concurrent.atomic.AtomicLong;
24 import javax.annotation.Nonnull;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
29 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
32 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
33 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
35 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
36 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
37 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
38 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
39 import org.opendaylight.openflowplugin.api.openflow.device.listener.OpenflowMessageListenerFacade;
40 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
41 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
42 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
43 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
44 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
45 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
46 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
47 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
48 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
49 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
50 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
51 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
70 import org.opendaylight.yangtools.yang.binding.ChildOf;
71 import org.opendaylight.yangtools.yang.binding.DataObject;
72 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
73 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
74 import org.opendaylight.yangtools.yang.common.RpcError;
75 import org.opendaylight.yangtools.yang.common.RpcResult;
76 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80 /**
81  *
82  */
83 public class DeviceContextImpl implements DeviceContext {
84
85     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
86     public static final String DEVICE_DISCONNECTED = "Device disconnected.";
87
88     private final ConnectionContext primaryConnectionContext;
89     private final DeviceState deviceState;
90     private final DataBroker dataBroker;
91     private final XidGenerator xidGenerator;
92     private final HashedWheelTimer hashedWheelTimer;
93     private Map<Long, RequestContext> requests = new TreeMap();
94
95     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
96     private final TransactionChainManager txChainManager;
97     private TranslatorLibrary translatorLibrary;
98     private OpenflowMessageListenerFacade openflowMessageListenerFacade;
99     private final DeviceFlowRegistry deviceFlowRegistry;
100     private final DeviceGroupRegistry deviceGroupRegistry;
101     private final DeviceMeterRegistry deviceMeterRegistry;
102     private Timeout barrierTaskTimeout;
103     private NotificationProviderService notificationService;
104     private final MessageSpy<Class> messageSpy;
105     private DeviceDisconnectedHandler deviceDisconnectedHandler;
106     private List<DeviceContextClosedHandler> closeHandlers = new ArrayList<>();
107
108
109     @VisibleForTesting
110     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
111                       @Nonnull final DeviceState deviceState,
112                       @Nonnull final DataBroker dataBroker,
113                       @Nonnull final HashedWheelTimer hashedWheelTimer,
114                       @Nonnull final MessageSpy _messageSpy) {
115         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
116         this.deviceState = Preconditions.checkNotNull(deviceState);
117         this.dataBroker = Preconditions.checkNotNull(dataBroker);
118         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
119         xidGenerator = new XidGenerator();
120         txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L, 500L);
121         auxiliaryConnectionContexts = new HashMap<>();
122         deviceFlowRegistry = new DeviceFlowRegistryImpl();
123         deviceGroupRegistry = new DeviceGroupRegistryImpl();
124         deviceMeterRegistry = new DeviceMeterRegistryImpl();
125         messageSpy = _messageSpy;
126     }
127
128     /**
129      * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
130      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
131      */
132     void submitTransaction() {
133         txChainManager.submitTransaction();
134         txChainManager.enableCounter();
135     }
136
137     @Override
138     public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext requestContext) {
139         // TODO Auto-generated method stub
140
141     }
142
143     @Override
144     public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
145         final SwitchConnectionDistinguisher connectionDistinguisher = new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
146         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
147     }
148
149     @Override
150     public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
151         // TODO Auto-generated method stub
152     }
153
154     @Override
155     public DeviceState getDeviceState() {
156         return deviceState;
157     }
158
159     @Override
160     public ReadTransaction getReadTransaction() {
161         return dataBroker.newReadOnlyTransaction();
162     }
163
164     @Override
165     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
166                                                           final InstanceIdentifier<T> path, final T data) {
167         txChainManager.writeToTransaction(store, path, data);
168     }
169
170     @Override
171     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
172         txChainManager.addDeleteOperationTotTxChain(store, path);
173     }
174
175     @Override
176     public ConnectionContext getPrimaryConnectionContext() {
177         return primaryConnectionContext;
178     }
179
180     @Override
181     public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
182         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
183     }
184
185     @Override
186     public Xid getNextXid() {
187         return xidGenerator.generate();
188     }
189
190     @Override
191     public RequestContext lookupRequest(Xid xid) {
192         return requests.get(xid.getValue());
193     }
194
195     @Override
196     public int getNumberOfOutstandingRequests() {
197         return requests.size();
198     }
199
200     @Override
201     public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) {
202         requests.put(xid.getValue(), requestFutureContext);
203     }
204
205     @Override
206     public RequestContext unhookRequestCtx(Xid xid) {
207         return requests.remove(xid.getValue());
208     }
209
210     @Override
211     public void attachOpenflowMessageListener(final OpenflowMessageListenerFacade openflowMessageListenerFacade) {
212         this.openflowMessageListenerFacade = openflowMessageListenerFacade;
213         primaryConnectionContext.getConnectionAdapter().setMessageListener(openflowMessageListenerFacade);
214     }
215
216     @Override
217     public OpenflowMessageListenerFacade getOpenflowMessageListenerFacade() {
218         return openflowMessageListenerFacade;
219     }
220
221     @Override
222     public DeviceFlowRegistry getDeviceFlowRegistry() {
223         return deviceFlowRegistry;
224     }
225
226     @Override
227     public DeviceGroupRegistry getDeviceGroupRegistry() {
228         return deviceGroupRegistry;
229     }
230
231     @Override
232     public DeviceMeterRegistry getDeviceMeterRegistry() {
233         return deviceMeterRegistry;
234     }
235
236     @Override
237     public void processReply(final OfHeader ofHeader) {
238         final RequestContext requestContext = requests.get(ofHeader.getXid());
239         if (null != requestContext) {
240             final SettableFuture replyFuture = requestContext.getFuture();
241             requests.remove(ofHeader.getXid());
242             RpcResult<OfHeader> rpcResult;
243             if (ofHeader instanceof Error) {
244                 //TODO : this is the point, where we can discover that add flow operation failed and where we should
245                 //TODO : remove this flow from deviceFlowRegistry
246                 final Error error = (Error) ofHeader;
247                 final String message = "Operation on device failed";
248                 rpcResult = RpcResultBuilder
249                         .<OfHeader>failed()
250                         .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
251                         .build();
252                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
253             } else {
254                 rpcResult = RpcResultBuilder
255                         .<OfHeader>success()
256                         .withResult(ofHeader)
257                         .build();
258                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
259             }
260
261             replyFuture.set(rpcResult);
262             try {
263                 requestContext.close();
264             } catch (final Exception e) {
265                 LOG.error("Closing RequestContext failed: ", e);
266             }
267         } else {
268             LOG.error("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(),
269                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
270         }
271     }
272
273     @Override
274     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
275         final RequestContext requestContext = requests.get(xid.getValue());
276         if (null != requestContext) {
277             final SettableFuture replyFuture = requestContext.getFuture();
278             requests.remove(xid.getValue());
279             final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
280                     .<List<MultipartReply>>success()
281                     .withResult(ofHeaderList)
282                     .build();
283             replyFuture.set(rpcResult);
284             for (MultipartReply multipartReply : ofHeaderList) {
285                 messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
286             }
287
288             try {
289                 requestContext.close();
290             } catch (final Exception e) {
291                 LOG.error("Closing RequestContext failed: ", e);
292             }
293         } else {
294             LOG.error("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(),
295                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
296         }
297     }
298
299     @Override
300     public void processException(final Xid xid, final DeviceDataException deviceDataException) {
301
302         LOG.trace("Processing exception for xid : {}", xid.getValue());
303
304         final RequestContext requestContext = requests.get(xid.getValue());
305
306         if (null != requestContext) {
307             final SettableFuture replyFuture = requestContext.getFuture();
308             requests.remove(xid.getValue());
309             final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
310                     .<List<OfHeader>>failed()
311                     .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
312                     .build();
313             replyFuture.set(rpcResult);
314             messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
315             try {
316                 requestContext.close();
317             } catch (final Exception e) {
318                 LOG.error("Closing RequestContext failed: ", e);
319             }
320         } else {
321             LOG.error("Can't find request context registered for xid : {}", xid.getValue());
322         }
323     }
324
325     @Override
326     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
327         //TODO: will be defined later
328     }
329
330     @Override
331     public void processPortStatusMessage(final PortStatusMessage portStatus) {
332         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
333         final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortGrouping.class.getName());
334         final MessageTranslator<PortGrouping, FlowCapableNodeConnector> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
335         final FlowCapableNodeConnector flowCapableNodeConnector = messageTranslator.translate(portStatus, this, null);
336
337         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
338         if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
339             // because of ADD status node connector has to be created
340             final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
341             nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
342             nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
343             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
344         } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
345             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
346         }
347     }
348
349     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
350         final InstanceIdentifier<Node> iiToNodes = deviceState.getNodeInstanceIdentifier();
351         final BigInteger dataPathId = deviceState.getFeatures().getDatapathId();
352         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
353         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
354     }
355
356     @Override
357     public void processPacketInMessage(final PacketInMessage packetInMessage) {
358         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
359         final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName());
360         final MessageTranslator<PacketInMessage, PacketReceived> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
361         final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
362         notificationService.publish(packetReceived);
363     }
364
365     @Override
366     public TranslatorLibrary oook() {
367         return translatorLibrary;
368     }
369
370     @Override
371     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
372         this.translatorLibrary = translatorLibrary;
373     }
374
375     @Override
376     public HashedWheelTimer getTimer() {
377         return hashedWheelTimer;
378     }
379
380     @Override
381     public void close() throws Exception {
382         deviceState.setValid(false);
383
384         LOG.trace("Removing node {} from operational DS.", getDeviceState().getNodeId());
385         addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, getDeviceState().getNodeInstanceIdentifier());
386
387         deviceGroupRegistry.close();
388         deviceFlowRegistry.close();
389         deviceMeterRegistry.close();
390
391         if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
392             primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
393             primaryConnectionContext.getConnectionAdapter().disconnect();
394         }
395         for (Map.Entry<Long, RequestContext> entry : requests.entrySet()) {
396             RequestContextUtil.closeRequestContextWithRpcError(entry.getValue(), DEVICE_DISCONNECTED);
397         }
398         for (ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
399             if (connectionContext.getConnectionAdapter().isAlive()) {
400                 connectionContext.getConnectionAdapter().disconnect();
401             }
402         }
403         for (DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
404             deviceContextClosedHandler.onDeviceContextClosed(this);
405         }
406
407     }
408
409     @Override
410     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
411         if (this.getPrimaryConnectionContext().equals(connectionContext)) {
412             try {
413                 close();
414             } catch (Exception e) {
415                 LOG.trace("Error closing device context.");
416             }
417             if (null != deviceDisconnectedHandler) {
418                 deviceDisconnectedHandler.onDeviceDisconnected(connectionContext);
419             }
420         } else {
421             auxiliaryConnectionContexts.remove(connectionContext);
422         }
423     }
424
425
426     private class XidGenerator {
427
428         private final AtomicLong xid = new AtomicLong(0);
429
430         public Xid generate() {
431             return new Xid(xid.incrementAndGet());
432         }
433     }
434
435     @Override
436     public RequestContext extractNextOutstandingMessage(final long barrierXid) {
437         RequestContext nextMessage = null;
438         final Iterator<Long> keyIterator = requests.keySet().iterator();
439         if (keyIterator.hasNext()) {
440             final Long oldestXid = keyIterator.next();
441             if (oldestXid < barrierXid) {
442                 nextMessage = requests.remove(oldestXid);
443             }
444         }
445         return nextMessage;
446     }
447
448     @Override
449     public void setCurrentBarrierTimeout(final Timeout timeout) {
450         barrierTaskTimeout = timeout;
451     }
452
453     @Override
454     public Timeout getBarrierTaskTimeout() {
455         return barrierTaskTimeout;
456     }
457
458     @Override
459     public void setNotificationService(final NotificationProviderService notificationServiceParam) {
460         notificationService = notificationServiceParam;
461     }
462
463     @Override
464     public MessageSpy getMessageSpy() {
465         return messageSpy;
466     }
467
468     @Override
469     public void setDeviceDisconnectedHandler(final DeviceDisconnectedHandler deviceDisconnectedHandler) {
470         this.deviceDisconnectedHandler = deviceDisconnectedHandler;
471     }
472
473     @Override
474     public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) {
475         this.closeHandlers.add(deviceContextClosedHandler);
476     }
477 }