added requestMap access methods
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.SettableFuture;
13 import io.netty.util.HashedWheelTimer;
14 import io.netty.util.Timeout;
15 import java.math.BigInteger;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.TreeMap;
21 import java.util.concurrent.atomic.AtomicLong;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
27 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
30 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
31 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
33 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
34 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
35 import org.opendaylight.openflowplugin.api.openflow.device.listener.OpenflowMessageListenerFacade;
36 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
37 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
38 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
39 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
40 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
42 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
43 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
44 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
45 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
46 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableFeatures;
66 import org.opendaylight.yangtools.yang.binding.ChildOf;
67 import org.opendaylight.yangtools.yang.binding.DataObject;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
70 import org.opendaylight.yangtools.yang.common.RpcError;
71 import org.opendaylight.yangtools.yang.common.RpcResult;
72 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75
76 /**
77  *
78  */
79 public class DeviceContextImpl implements DeviceContext {
80
81     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
82
83     private final ConnectionContext primaryConnectionContext;
84     private final DeviceState deviceState;
85     private final DataBroker dataBroker;
86     private final XidGenerator xidGenerator;
87     private final HashedWheelTimer hashedWheelTimer;
88     private Map<Long, RequestContext> requests = new TreeMap<>();
89
90     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
91     private final TransactionChainManager txChainManager;
92     private TranslatorLibrary translatorLibrary;
93     private OpenflowMessageListenerFacade openflowMessageListenerFacade;
94     private final DeviceFlowRegistry deviceFlowRegistry;
95     private final DeviceGroupRegistry deviceGroupRegistry;
96     private final DeviceMeterRegistry deviceMeterRegistry;
97     private Timeout barrierTaskTimeout;
98     private NotificationProviderService notificationService;
99     private final MessageSpy<Class> messageSpy;
100
101
102     @VisibleForTesting
103     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
104                       @Nonnull final DeviceState deviceState,
105                       @Nonnull final DataBroker dataBroker,
106                       @Nonnull final HashedWheelTimer hashedWheelTimer,
107                       @Nonnull final MessageSpy _messageSpy) {
108         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
109         this.deviceState = Preconditions.checkNotNull(deviceState);
110         this.dataBroker = Preconditions.checkNotNull(dataBroker);
111         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
112         xidGenerator = new XidGenerator();
113         txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L);
114         auxiliaryConnectionContexts = new HashMap<>();
115         requests = new HashMap<>();
116         deviceFlowRegistry = new DeviceFlowRegistryImpl();
117         deviceGroupRegistry = new DeviceGroupRegistryImpl();
118         deviceMeterRegistry = new DeviceMeterRegistryImpl();
119         messageSpy = _messageSpy;
120     }
121
122     /**
123      * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
124      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
125      */
126     void submitTransaction() {
127         txChainManager.submitTransaction();
128         txChainManager.enableCounter();
129     }
130
131     @Override
132     public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext requestContext) {
133         // TODO Auto-generated method stub
134
135     }
136
137     @Override
138     public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
139         final SwitchConnectionDistinguisher connectionDistinguisher = new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
140         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
141     }
142
143     @Override
144     public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
145         // TODO Auto-generated method stub
146     }
147
148     @Override
149     public DeviceState getDeviceState() {
150         return deviceState;
151     }
152
153     @Override
154     public ReadTransaction getReadTransaction() {
155         return dataBroker.newReadOnlyTransaction();
156     }
157
158     @Override
159     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
160                                                           final InstanceIdentifier<T> path, final T data) {
161         txChainManager.writeToTransaction(store, path, data);
162     }
163
164     @Override
165     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
166         txChainManager.addDeleteOperationTotTxChain(store, path);
167     }
168
169     @Override
170     public TableFeatures getCapabilities() {
171         // TODO Auto-generated method stub
172         return null;
173     }
174
175     @Override
176     public ConnectionContext getPrimaryConnectionContext() {
177         return primaryConnectionContext;
178     }
179
180     @Override
181     public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
182         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
183     }
184
185     @Override
186     public Xid getNextXid() {
187         return xidGenerator.generate();
188     }
189
190     @Override
191     public RequestContext lookupRequest(Xid xid) {
192         return requests.get(xid.getValue());
193     }
194
195     @Override
196     public int getNumberOfOutstandingRequests() {
197         return requests.size();
198     }
199
200     @Override
201     public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) {
202         requests.put(xid.getValue(), requestFutureContext);
203     }
204
205     @Override
206     public RequestContext unhookRequestCtx(Xid xid) {
207         return requests.remove(xid.getValue());
208     }
209
210     @Override
211     public void attachOpenflowMessageListener(final OpenflowMessageListenerFacade openflowMessageListenerFacade) {
212         this.openflowMessageListenerFacade = openflowMessageListenerFacade;
213         primaryConnectionContext.getConnectionAdapter().setMessageListener(openflowMessageListenerFacade);
214     }
215
216     @Override
217     public OpenflowMessageListenerFacade getOpenflowMessageListenerFacade() {
218         return openflowMessageListenerFacade;
219     }
220
221     @Override
222     public DeviceFlowRegistry getDeviceFlowRegistry() {
223         return deviceFlowRegistry;
224     }
225
226     @Override
227     public DeviceGroupRegistry getDeviceGroupRegistry() {
228         return deviceGroupRegistry;
229     }
230
231     @Override
232     public DeviceMeterRegistry getDeviceMeterRegistry() {
233         return deviceMeterRegistry;
234     }
235
236     @Override
237     public void processReply(final OfHeader ofHeader) {
238         final RequestContext requestContext = requests.get(ofHeader.getXid());
239         if (null != requestContext) {
240             final SettableFuture replyFuture = requestContext.getFuture();
241             requests.remove(ofHeader.getXid());
242             RpcResult<OfHeader> rpcResult;
243             if (ofHeader instanceof Error) {
244                 final Error error = (Error) ofHeader;
245                 final String message = "Operation on device failed";
246                 rpcResult = RpcResultBuilder
247                         .<OfHeader>failed()
248                         .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
249                         .build();
250                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
251             } else {
252                 rpcResult = RpcResultBuilder
253                         .<OfHeader>success()
254                         .withResult(ofHeader)
255                         .build();
256                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
257             }
258
259             replyFuture.set(rpcResult);
260             try {
261                 requestContext.close();
262             } catch (final Exception e) {
263                 LOG.error("Closing RequestContext failed: ", e);
264             }
265         } else {
266             LOG.error("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(),
267                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
268         }
269     }
270
271     @Override
272     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
273         final RequestContext requestContext = requests.get(xid.getValue());
274         if (null != requestContext) {
275             final SettableFuture replyFuture = requestContext.getFuture();
276             requests.remove(xid.getValue());
277             final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
278                     .<List<MultipartReply>>success()
279                     .withResult(ofHeaderList)
280                     .build();
281             replyFuture.set(rpcResult);
282             for (MultipartReply multipartReply : ofHeaderList) {
283                 messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
284             }
285
286             try {
287                 requestContext.close();
288             } catch (final Exception e) {
289                 LOG.error("Closing RequestContext failed: ", e);
290             }
291         } else {
292             LOG.error("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(),
293                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
294         }
295     }
296
297     @Override
298     public void processException(final Xid xid, final DeviceDataException deviceDataException) {
299
300         LOG.trace("Processing exception for xid : {}", xid.getValue());
301
302         final RequestContext requestContext = requests.get(xid.getValue());
303
304         if (null != requestContext) {
305             final SettableFuture replyFuture = requestContext.getFuture();
306             requests.remove(xid.getValue());
307             final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
308                     .<List<OfHeader>>failed()
309                     .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
310                     .build();
311             replyFuture.set(rpcResult);
312             messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
313             try {
314                 requestContext.close();
315             } catch (final Exception e) {
316                 LOG.error("Closing RequestContext failed: ", e);
317             }
318         } else {
319             LOG.error("Can't find request context registered for xid : {}", xid.getValue());
320         }
321     }
322
323     @Override
324     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
325         //TODO: will be defined later
326     }
327
328     @Override
329     public void processPortStatusMessage(final PortStatusMessage portStatus) {
330         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
331         final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortGrouping.class.getName());
332         final MessageTranslator<PortGrouping, FlowCapableNodeConnector> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
333         final FlowCapableNodeConnector flowCapableNodeConnector = messageTranslator.translate(portStatus, this, null);
334
335         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
336         if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
337             // because of ADD status node connector has to be created
338             final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
339             nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
340             nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
341             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
342         } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
343             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
344         }
345     }
346
347     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
348         final InstanceIdentifier<Node> iiToNodes = deviceState.getNodeInstanceIdentifier();
349         final BigInteger dataPathId = deviceState.getFeatures().getDatapathId();
350         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
351         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
352     }
353
354     @Override
355     public void processPacketInMessage(final PacketInMessage packetInMessage) {
356         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
357         final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName());
358         final MessageTranslator<PacketInMessage, PacketReceived> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
359         final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
360         notificationService.publish(packetReceived);
361     }
362
363     @Override
364     public TranslatorLibrary oook() {
365         return translatorLibrary;
366     }
367
368     @Override
369     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
370         this.translatorLibrary = translatorLibrary;
371     }
372
373     @Override
374     public HashedWheelTimer getTimer() {
375         return hashedWheelTimer;
376     }
377
378     @Override
379     public void close() throws Exception {
380         for (Map.Entry<Long, RequestContext> entry : requests.entrySet()){
381             entry.getValue().close();
382         }
383     }
384
385
386     private class XidGenerator {
387
388         private final AtomicLong xid = new AtomicLong(0);
389
390         public Xid generate() {
391             return new Xid(xid.incrementAndGet());
392         }
393     }
394
395     @Override
396     public RequestContext extractNextOutstandingMessage(final long barrierXid) {
397         RequestContext nextMessage = null;
398         final Iterator<Long> keyIterator = requests.keySet().iterator();
399         if (keyIterator.hasNext()) {
400             final Long oldestXid = keyIterator.next();
401             if (oldestXid < barrierXid) {
402                 nextMessage = requests.remove(oldestXid);
403             }
404         }
405         return nextMessage;
406     }
407
408     @Override
409     public void setCurrentBarrierTimeout(final Timeout timeout) {
410         barrierTaskTimeout = timeout;
411     }
412
413     @Override
414     public Timeout getBarrierTaskTimeout() {
415         return barrierTaskTimeout;
416     }
417
418     @Override
419     public void setNotificationService(final NotificationProviderService notificationServiceParam) {
420         notificationService = notificationServiceParam;
421     }
422
423     @Override
424     public MessageSpy getMessageSpy() {
425         return messageSpy;
426     }
427 }