fixed ActionType mismatch, FlowConvertor - flow.isStrict() NPE
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15
16 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
18 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
19 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
20 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
21 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestPortDescBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
47 import org.opendaylight.yangtools.yang.binding.DataObject;
48 import org.opendaylight.yangtools.yang.common.RpcError;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import com.google.common.util.concurrent.Futures;
54
55 /**
56  * @author mirehak
57  */
58 public class ConnectionConductorImpl implements OpenflowProtocolListener,
59         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener {
60
61     protected static final Logger LOG = LoggerFactory
62             .getLogger(ConnectionConductorImpl.class);
63
64     /* variable to make BitMap-based negotiation enabled / disabled.
65      * it will help while testing and isolating issues related to processing of
66      * BitMaps from switches.
67      */
68     private boolean isBitmapNegotiationEnable = true;
69     protected ErrorHandler errorHandler;
70
71     private final ConnectionAdapter connectionAdapter;
72     private ConnectionConductor.CONDUCTOR_STATE conductorState;
73     private Short version;
74
75     private SwitchConnectionDistinguisher auxiliaryKey;
76
77     private SessionContext sessionContext;
78
79     private QueueKeeper<OfHeader, DataObject> queueKeeper;
80     private ExecutorService hsPool;
81     private HandshakeManager handshakeManager;
82
83     private boolean firstHelloProcessed;
84
85     /**
86      * @param connectionAdapter
87      */
88     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
89         this.connectionAdapter = connectionAdapter;
90         conductorState = CONDUCTOR_STATE.HANDSHAKING;
91         hsPool = Executors.newFixedThreadPool(1);
92         firstHelloProcessed = false;
93         handshakeManager = new HandshakeManagerImpl(connectionAdapter,
94                 ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder);
95         handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable);
96         handshakeManager.setHandshakeListener(this);
97     }
98
99     @Override
100     public void init() {
101         connectionAdapter.setMessageListener(this);
102         connectionAdapter.setSystemListener(this);
103         connectionAdapter.setConnectionReadyListener(this);
104     }
105
106     @Override
107     public void setQueueKeeper(QueueKeeper<OfHeader, DataObject> queueKeeper) {
108         this.queueKeeper = queueKeeper;
109     }
110
111     /**
112      * @param errorHandler the errorHandler to set
113      */
114     @Override
115     public void setErrorHandler(ErrorHandler errorHandler) {
116         this.errorHandler = errorHandler;
117         handshakeManager.setErrorHandler(errorHandler);
118     }
119
120     @Override
121     public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
122         new Thread(new Runnable() {
123             @Override
124             public void run() {
125                 LOG.debug("echo request received: " + echoRequestMessage.getXid());
126                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
127                 builder.setVersion(echoRequestMessage.getVersion());
128                 builder.setXid(echoRequestMessage.getXid());
129                 builder.setData(echoRequestMessage.getData());
130
131                 getConnectionAdapter().echoReply(builder.build());
132             }
133         }).start();
134     }
135
136     @Override
137     public void onErrorMessage(ErrorMessage errorMessage) {
138         queueKeeper.push(errorMessage, this);
139     }
140
141     @Override
142     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
143         queueKeeper.push(experimenterMessage, this);
144     }
145
146     @Override
147     public void onFlowRemovedMessage(FlowRemovedMessage message) {
148         queueKeeper.push(message, this);
149     }
150
151
152     /**
153      * version negotiation happened as per following steps:
154      * 1. If HelloMessage version field has same version, continue connection processing.
155      *    If HelloMessage version is lower than supported versions, just disconnect.
156      * 2. If HelloMessage contains bitmap and common version found in bitmap
157      *    then continue connection processing. if no common version found, just disconnect.
158      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
159      * 4. If Hello message received again with not supported version, just disconnect.
160      *
161      *   TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
162      */
163     @Override
164     public synchronized void onHelloMessage(final HelloMessage hello) {
165         LOG.debug("processing HELLO.xid: {}", hello.getXid());
166         firstHelloProcessed = true;
167         checkState(CONDUCTOR_STATE.HANDSHAKING);
168         HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
169                 hello, handshakeManager, connectionAdapter);
170         hsPool.execute(handshakeStepWrapper);
171     }
172
173     /**
174      * @return rpc-response timeout in [ms]
175      */
176     protected long getMaxTimeout() {
177         // TODO:: get from configuration
178         return 2000;
179     }
180
181     /**
182      * @return milliseconds
183      */
184     protected TimeUnit getMaxTimeoutUnit() {
185         // TODO:: get from configuration
186         return TimeUnit.MILLISECONDS;
187     }
188
189     @Override
190     public void onMultipartReplyMessage(MultipartReplyMessage message) {
191         queueKeeper.push(message, this);
192     }
193
194     @Override
195     public void onPacketInMessage(PacketInMessage message) {
196         queueKeeper.push(message, this);
197     }
198
199     @Override
200     public void onPortStatusMessage(PortStatusMessage message) {
201         this.getSessionContext().processPortStatusMsg(message);
202         queueKeeper.push(message, this);
203     }
204
205     @Override
206     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
207         new Thread(new Runnable() {
208             @Override
209             public void run() {
210                 if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
211                     // idle state in any other conductorState than WORKING means real
212                     // problem and wont be handled by echoReply, but disconnection
213                     disconnect();
214                     OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
215                 } else {
216                     LOG.debug("first idle state occured");
217                     EchoInputBuilder builder = new EchoInputBuilder();
218                     builder.setVersion(getVersion());
219                     builder.setXid(getSessionContext().getNextXid());
220
221                     Future<RpcResult<EchoOutput>> echoReplyFuture = getConnectionAdapter()
222                             .echo(builder.build());
223
224                     try {
225                         RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
226                                 getMaxTimeoutUnit());
227                         if (echoReplyValue.isSuccessful()) {
228                             setConductorState(CONDUCTOR_STATE.WORKING);
229                         } else {
230                             for (RpcError replyError : echoReplyValue.getErrors()) {
231                                 Throwable cause = replyError.getCause();
232                                 LOG.error(
233                                         "while receiving echoReply in TIMEOUTING state: "
234                                                 + cause.getMessage(), cause);
235                             }
236                             //switch issue occurred
237                             throw new Exception("switch issue occurred");
238                         }
239                     } catch (Exception e) {
240                         LOG.error("while waiting for echoReply in TIMEOUTING state: "
241                                 + e.getMessage());
242                         errorHandler.handleException(e, sessionContext);
243                         //switch is not responding
244                         disconnect();
245                         OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
246                     }
247                 }
248             }
249
250         }).start();
251     }
252
253     /**
254      * @param conductorState
255      *            the connectionState to set
256      */
257     @Override
258     public void setConductorState(CONDUCTOR_STATE conductorState) {
259         this.conductorState = conductorState;
260     }
261
262     @Override
263     public CONDUCTOR_STATE getConductorState() {
264         return conductorState;
265     }
266
267     /**
268      * @param handshaking
269      */
270     protected void checkState(CONDUCTOR_STATE expectedState) {
271         if (!conductorState.equals(expectedState)) {
272             throw new IllegalStateException("Expected state: " + expectedState
273                     + ", actual state:" + conductorState);
274         }
275     }
276
277     @Override
278     public void onDisconnectEvent(DisconnectEvent arg0) {
279         SessionManager sessionManager = OFSessionUtil.getSessionManager();
280         sessionManager.invalidateOnDisconnect(this);
281     }
282
283     @Override
284     public Short getVersion() {
285         return version;
286     }
287
288     @Override
289     public Future<Boolean> disconnect() {
290         LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
291
292         Future<Boolean> result = null;
293         if (connectionAdapter.isAlive()) {
294             result = connectionAdapter.disconnect();
295         } else {
296             LOG.debug("connection already disconnected");
297             result = Futures.immediateFuture(true);
298         }
299
300         return result;
301     }
302
303     @Override
304     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
305         this.auxiliaryKey = auxiliaryKey;
306     }
307
308     @Override
309     public void setSessionContext(SessionContext sessionContext) {
310         this.sessionContext = sessionContext;
311     }
312
313     @Override
314     public SwitchConnectionDistinguisher getAuxiliaryKey() {
315         return auxiliaryKey;
316     }
317
318     @Override
319     public SessionContext getSessionContext() {
320         return sessionContext;
321     }
322
323     @Override
324     public ConnectionAdapter getConnectionAdapter() {
325         return connectionAdapter;
326     }
327
328     @Override
329     public synchronized void onConnectionReady() {
330         LOG.debug("connection is ready-to-use");
331         if (! firstHelloProcessed) {
332             HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
333                     null, handshakeManager, connectionAdapter);
334             hsPool.execute(handshakeStepWrapper);
335             firstHelloProcessed = true;
336         } else {
337             LOG.debug("already touched by hello message");
338         }
339     }
340
341     @Override
342     public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
343             Short negotiatedVersion) {
344         version = negotiatedVersion;
345         conductorState = CONDUCTOR_STATE.WORKING;
346         OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
347         requestDesc();
348         requestPorts();
349         requestGroupFeatures();
350         requestMeterFeatures();
351     }
352
353     /*
354      *  Send an OFPMP_DESC request message to the switch
355      */
356
357     private void requestDesc() {
358         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
359         builder.setType(MultipartType.OFPMPDESC);
360         builder.setVersion(getVersion());
361         builder.setFlags(new MultipartRequestFlags(false));
362         builder.setMultipartRequestBody(new MultipartRequestDescBuilder().build());
363         builder.setXid(getSessionContext().getNextXid());
364         getConnectionAdapter().multipartRequest(builder.build());
365     }
366
367     private void requestPorts() {
368         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
369         builder.setType(MultipartType.OFPMPPORTDESC);
370         builder.setVersion(getVersion());
371         builder.setFlags(new MultipartRequestFlags(false));
372         builder.setMultipartRequestBody(new MultipartRequestPortDescBuilder().build());
373         builder.setXid(getSessionContext().getNextXid());
374         getConnectionAdapter().multipartRequest(builder.build());
375     }
376     private void requestGroupFeatures(){
377         MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
378         mprInput.setType(MultipartType.OFPMPGROUPFEATURES);
379         mprInput.setVersion(getVersion());
380         mprInput.setFlags(new MultipartRequestFlags(false));
381         mprInput.setXid(getSessionContext().getNextXid());
382
383         MultipartRequestGroupFeaturesBuilder mprGroupFeaturesBuild = new MultipartRequestGroupFeaturesBuilder();
384         mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
385
386         LOG.debug("Send group features statistics request :{}",mprGroupFeaturesBuild);
387         getConnectionAdapter().multipartRequest(mprInput.build());
388         
389     }
390     private void requestMeterFeatures(){
391         MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
392         mprInput.setType(MultipartType.OFPMPMETERFEATURES);
393         mprInput.setVersion(getVersion());
394         mprInput.setFlags(new MultipartRequestFlags(false));
395         mprInput.setXid(getSessionContext().getNextXid());
396
397         MultipartRequestMeterFeaturesBuilder mprMeterFeaturesBuild = new MultipartRequestMeterFeaturesBuilder();
398         mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
399
400         LOG.debug("Send meter features statistics request :{}",mprMeterFeaturesBuild);
401         getConnectionAdapter().multipartRequest(mprInput.build());
402         
403     }
404     /**
405      * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set
406      */
407     public void setBitmapNegotiationEnable(
408             boolean isBitmapNegotiationEnable) {
409         this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
410     }
411
412     protected void shutdownPool() {
413         hsPool.shutdownNow();
414         LOG.debug("pool is terminated: {}", hsPool.isTerminated());
415     }
416 }