Merge "Write to operational DS response of update-table rpc."
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.SettableFuture;
13 import io.netty.util.HashedWheelTimer;
14 import io.netty.util.Timeout;
15 import java.math.BigInteger;
16 import java.util.ArrayList;
17 import java.util.HashMap;
18 import java.util.Iterator;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.TreeMap;
22 import java.util.concurrent.atomic.AtomicLong;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
28 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
31 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
32 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
34 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
35 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
36 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
37 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
38 import org.opendaylight.openflowplugin.api.openflow.device.listener.OpenflowMessageListenerFacade;
39 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
40 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
41 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
42 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
43 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
44 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
45 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
46 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
47 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
48 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
49 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
50 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
69 import org.opendaylight.yangtools.yang.binding.ChildOf;
70 import org.opendaylight.yangtools.yang.binding.DataObject;
71 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
72 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.common.RpcError;
74 import org.opendaylight.yangtools.yang.common.RpcResult;
75 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 /**
80  *
81  */
82 public class DeviceContextImpl implements DeviceContext {
83
84     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
85     public static final String DEVICE_DISCONNECTED = "Device disconnected.";
86
87     private final ConnectionContext primaryConnectionContext;
88     private final DeviceState deviceState;
89     private final DataBroker dataBroker;
90     private final XidGenerator xidGenerator;
91     private final HashedWheelTimer hashedWheelTimer;
92     private Map<Long, RequestContext> requests = new TreeMap();
93
94     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
95     private final TransactionChainManager txChainManager;
96     private TranslatorLibrary translatorLibrary;
97     private OpenflowMessageListenerFacade openflowMessageListenerFacade;
98     private final DeviceFlowRegistry deviceFlowRegistry;
99     private final DeviceGroupRegistry deviceGroupRegistry;
100     private final DeviceMeterRegistry deviceMeterRegistry;
101     private Timeout barrierTaskTimeout;
102     private NotificationProviderService notificationService;
103     private final MessageSpy<Class> messageSpy;
104     private DeviceDisconnectedHandler deviceDisconnectedHandler;
105     private List<DeviceContextClosedHandler> closeHandlers = new ArrayList<>();
106
107
108     @VisibleForTesting
109     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
110                       @Nonnull final DeviceState deviceState,
111                       @Nonnull final DataBroker dataBroker,
112                       @Nonnull final HashedWheelTimer hashedWheelTimer,
113                       @Nonnull final MessageSpy _messageSpy) {
114         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
115         this.deviceState = Preconditions.checkNotNull(deviceState);
116         this.dataBroker = Preconditions.checkNotNull(dataBroker);
117         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
118         xidGenerator = new XidGenerator();
119         txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L, 500L);
120         auxiliaryConnectionContexts = new HashMap<>();
121         deviceFlowRegistry = new DeviceFlowRegistryImpl();
122         deviceGroupRegistry = new DeviceGroupRegistryImpl();
123         deviceMeterRegistry = new DeviceMeterRegistryImpl();
124         messageSpy = _messageSpy;
125     }
126
127     /**
128      * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
129      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
130      */
131     void submitTransaction() {
132         txChainManager.enableCounter();
133         txChainManager.submitTransaction();
134     }
135
136     @Override
137     public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext requestContext) {
138         // TODO Auto-generated method stub
139
140     }
141
142     @Override
143     public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
144         final SwitchConnectionDistinguisher connectionDistinguisher = new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
145         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
146     }
147
148     @Override
149     public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
150         // TODO Auto-generated method stub
151     }
152
153     @Override
154     public DeviceState getDeviceState() {
155         return deviceState;
156     }
157
158     @Override
159     public ReadTransaction getReadTransaction() {
160         return dataBroker.newReadOnlyTransaction();
161     }
162
163     @Override
164     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
165                                                           final InstanceIdentifier<T> path, final T data) {
166         txChainManager.writeToTransaction(store, path, data);
167     }
168
169     @Override
170     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
171         txChainManager.addDeleteOperationTotTxChain(store, path);
172     }
173
174     @Override
175     public ConnectionContext getPrimaryConnectionContext() {
176         return primaryConnectionContext;
177     }
178
179     @Override
180     public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
181         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
182     }
183
184     @Override
185     public Xid getNextXid() {
186         return xidGenerator.generate();
187     }
188
189     @Override
190     public RequestContext lookupRequest(Xid xid) {
191         return requests.get(xid.getValue());
192     }
193
194     @Override
195     public int getNumberOfOutstandingRequests() {
196         return requests.size();
197     }
198
199     @Override
200     public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) {
201         requests.put(xid.getValue(), requestFutureContext);
202     }
203
204     @Override
205     public RequestContext unhookRequestCtx(Xid xid) {
206         return requests.remove(xid.getValue());
207     }
208
209     @Override
210     public void attachOpenflowMessageListener(final OpenflowMessageListenerFacade openflowMessageListenerFacade) {
211         this.openflowMessageListenerFacade = openflowMessageListenerFacade;
212         primaryConnectionContext.getConnectionAdapter().setMessageListener(openflowMessageListenerFacade);
213     }
214
215     @Override
216     public OpenflowMessageListenerFacade getOpenflowMessageListenerFacade() {
217         return openflowMessageListenerFacade;
218     }
219
220     @Override
221     public DeviceFlowRegistry getDeviceFlowRegistry() {
222         return deviceFlowRegistry;
223     }
224
225     @Override
226     public DeviceGroupRegistry getDeviceGroupRegistry() {
227         return deviceGroupRegistry;
228     }
229
230     @Override
231     public DeviceMeterRegistry getDeviceMeterRegistry() {
232         return deviceMeterRegistry;
233     }
234
235     @Override
236     public void processReply(final OfHeader ofHeader) {
237         final RequestContext requestContext = requests.get(ofHeader.getXid());
238         if (null != requestContext) {
239             final SettableFuture replyFuture = requestContext.getFuture();
240             requests.remove(ofHeader.getXid());
241             RpcResult<OfHeader> rpcResult;
242             if (ofHeader instanceof Error) {
243                 //TODO : this is the point, where we can discover that add flow operation failed and where we should
244                 //TODO : remove this flow from deviceFlowRegistry
245                 final Error error = (Error) ofHeader;
246                 final String message = "Operation on device failed with xid "+ofHeader.getXid()+".";
247                 rpcResult = RpcResultBuilder
248                         .<OfHeader>failed()
249                         .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
250                         .build();
251                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
252             } else {
253                 rpcResult = RpcResultBuilder
254                         .<OfHeader>success()
255                         .withResult(ofHeader)
256                         .build();
257                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
258             }
259
260             replyFuture.set(rpcResult);
261             try {
262                 requestContext.close();
263             } catch (final Exception e) {
264                 LOG.warn("Closing RequestContext failed: {}", e.getMessage());
265                 LOG.debug("Closing RequestContext failed.. ", e);
266             }
267         } else {
268             LOG.warn("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(),
269                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
270         }
271     }
272
273     @Override
274     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
275         final RequestContext requestContext = requests.get(xid.getValue());
276         if (null != requestContext) {
277             final SettableFuture replyFuture = requestContext.getFuture();
278             requests.remove(xid.getValue());
279             final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
280                     .<List<MultipartReply>>success()
281                     .withResult(ofHeaderList)
282                     .build();
283             replyFuture.set(rpcResult);
284             for (MultipartReply multipartReply : ofHeaderList) {
285                 messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
286             }
287
288             try {
289                 requestContext.close();
290             } catch (final Exception e) {
291                 LOG.warn("Closing RequestContext failed: {}", e.getMessage());
292                 LOG.debug("Closing RequestContext failed.. ", e);
293             }
294         } else {
295             LOG.warn("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(),
296                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
297         }
298     }
299
300     @Override
301     public void processException(final Xid xid, final DeviceDataException deviceDataException) {
302
303         LOG.trace("Processing exception for xid : {}", xid.getValue());
304
305         final RequestContext requestContext = requests.get(xid.getValue());
306
307         if (null != requestContext) {
308             final SettableFuture replyFuture = requestContext.getFuture();
309             requests.remove(xid.getValue());
310             final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
311                     .<List<OfHeader>>failed()
312                     .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
313                     .build();
314             replyFuture.set(rpcResult);
315             messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
316             try {
317                 requestContext.close();
318             } catch (final Exception e) {
319                 LOG.warn("Closing RequestContext failed: ", e);
320                 LOG.debug("Closing RequestContext failed..", e);
321             }
322         } else {
323             LOG.warn("Can't find request context registered for xid : {}. Exception message {}",
324                     xid.getValue(), deviceDataException.getMessage());
325         }
326     }
327
328     @Override
329     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
330         //TODO: will be defined later
331     }
332
333     @Override
334     public void processPortStatusMessage(final PortStatusMessage portStatus) {
335         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
336         final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortGrouping.class.getName());
337         final MessageTranslator<PortGrouping, FlowCapableNodeConnector> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
338         final FlowCapableNodeConnector flowCapableNodeConnector = messageTranslator.translate(portStatus, this, null);
339
340         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
341         if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
342             // because of ADD status node connector has to be created
343             final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
344             nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
345             nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
346             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
347         } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
348             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
349         }
350     }
351
352     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
353         final InstanceIdentifier<Node> iiToNodes = deviceState.getNodeInstanceIdentifier();
354         final BigInteger dataPathId = deviceState.getFeatures().getDatapathId();
355         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
356         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
357     }
358
359     @Override
360     public void processPacketInMessage(final PacketInMessage packetInMessage) {
361         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
362         final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName());
363         final MessageTranslator<PacketInMessage, PacketReceived> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
364         final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
365         notificationService.publish(packetReceived);
366     }
367
368     @Override
369     public TranslatorLibrary oook() {
370         return translatorLibrary;
371     }
372
373     @Override
374     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
375         this.translatorLibrary = translatorLibrary;
376     }
377
378     @Override
379     public HashedWheelTimer getTimer() {
380         return hashedWheelTimer;
381     }
382
383     @Override
384     public void close() throws Exception {
385         deviceState.setValid(false);
386
387         LOG.trace("Removing node {} from operational DS.", getDeviceState().getNodeId());
388         addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, getDeviceState().getNodeInstanceIdentifier());
389
390         deviceGroupRegistry.close();
391         deviceFlowRegistry.close();
392         deviceMeterRegistry.close();
393
394         if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
395             primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
396             primaryConnectionContext.getConnectionAdapter().disconnect();
397         }
398         for (Map.Entry<Long, RequestContext> entry : requests.entrySet()) {
399             RequestContextUtil.closeRequestContextWithRpcError(entry.getValue(), DEVICE_DISCONNECTED);
400         }
401         for (ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
402             if (connectionContext.getConnectionAdapter().isAlive()) {
403                 connectionContext.getConnectionAdapter().disconnect();
404             }
405         }
406         for (DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
407             deviceContextClosedHandler.onDeviceContextClosed(this);
408         }
409
410     }
411
412     @Override
413     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
414         if (this.getPrimaryConnectionContext().equals(connectionContext)) {
415             try {
416                 close();
417             } catch (Exception e) {
418                 LOG.trace("Error closing device context.");
419             }
420             if (null != deviceDisconnectedHandler) {
421                 deviceDisconnectedHandler.onDeviceDisconnected(connectionContext);
422             }
423         } else {
424             auxiliaryConnectionContexts.remove(connectionContext);
425         }
426     }
427
428
429     private class XidGenerator {
430
431         private final AtomicLong xid = new AtomicLong(0);
432
433         public Xid generate() {
434             return new Xid(xid.incrementAndGet());
435         }
436     }
437
438     @Override
439     public RequestContext extractNextOutstandingMessage(final long barrierXid) {
440         RequestContext nextMessage = null;
441         final Iterator<Long> keyIterator = requests.keySet().iterator();
442         if (keyIterator.hasNext()) {
443             final Long oldestXid = keyIterator.next();
444             if (oldestXid < barrierXid) {
445                 nextMessage = requests.remove(oldestXid);
446             }
447         }
448         return nextMessage;
449     }
450
451     @Override
452     public void setCurrentBarrierTimeout(final Timeout timeout) {
453         barrierTaskTimeout = timeout;
454     }
455
456     @Override
457     public Timeout getBarrierTaskTimeout() {
458         return barrierTaskTimeout;
459     }
460
461     @Override
462     public void setNotificationService(final NotificationProviderService notificationServiceParam) {
463         notificationService = notificationServiceParam;
464     }
465
466     @Override
467     public MessageSpy getMessageSpy() {
468         return messageSpy;
469     }
470
471     @Override
472     public void setDeviceDisconnectedHandler(final DeviceDisconnectedHandler deviceDisconnectedHandler) {
473         this.deviceDisconnectedHandler = deviceDisconnectedHandler;
474     }
475
476     @Override
477     public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) {
478         this.closeHandlers.add(deviceContextClosedHandler);
479     }
480
481     @Override
482     public void startGatheringOperationsToOneTransaction() {
483         txChainManager.startGatheringOperationsToOneTransaction();
484     }
485
486     @Override
487     public void commitOperationsGatheredInOneTransaction() {
488         txChainManager.commitOperationsGatheredInOneTransaction();
489     }
490
491 }