fix port update - honor message version
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15
16 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
18 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
19 import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
20 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
21 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
22 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Port;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestPortDescBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.common.RpcError;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import com.google.common.util.concurrent.Futures;
56
57 /**
58  * @author mirehak
59  */
60 public class ConnectionConductorImpl implements OpenflowProtocolListener,
61         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener {
62
63     protected static final Logger LOG = LoggerFactory
64             .getLogger(ConnectionConductorImpl.class);
65
66     /* variable to make BitMap-based negotiation enabled / disabled.
67      * it will help while testing and isolating issues related to processing of
68      * BitMaps from switches.
69      */
70     private boolean isBitmapNegotiationEnable = true;
71     protected ErrorHandler errorHandler;
72
73     private final ConnectionAdapter connectionAdapter;
74     private ConnectionConductor.CONDUCTOR_STATE conductorState;
75     private Short version;
76
77     private SwitchConnectionDistinguisher auxiliaryKey;
78
79     private SessionContext sessionContext;
80
81     private QueueKeeper<OfHeader, DataObject> queueKeeper;
82     private ExecutorService hsPool;
83     private HandshakeManager handshakeManager;
84
85     private boolean firstHelloProcessed;
86     
87     private PortFeaturesUtil portFeaturesUtils;
88
89     /**
90      * @param connectionAdapter
91      */
92     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
93         this.connectionAdapter = connectionAdapter;
94         conductorState = CONDUCTOR_STATE.HANDSHAKING;
95         hsPool = Executors.newFixedThreadPool(1);
96         firstHelloProcessed = false;
97         handshakeManager = new HandshakeManagerImpl(connectionAdapter,
98                 ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder);
99         handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable);
100         handshakeManager.setHandshakeListener(this);
101         portFeaturesUtils = PortFeaturesUtil.getInstance();
102     }
103
104     @Override
105     public void init() {
106         connectionAdapter.setMessageListener(this);
107         connectionAdapter.setSystemListener(this);
108         connectionAdapter.setConnectionReadyListener(this);
109     }
110
111     @Override
112     public void setQueueKeeper(QueueKeeper<OfHeader, DataObject> queueKeeper) {
113         this.queueKeeper = queueKeeper;
114     }
115
116     /**
117      * @param errorHandler the errorHandler to set
118      */
119     @Override
120     public void setErrorHandler(ErrorHandler errorHandler) {
121         this.errorHandler = errorHandler;
122         handshakeManager.setErrorHandler(errorHandler);
123     }
124
125     @Override
126     public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
127         new Thread(new Runnable() {
128             @Override
129             public void run() {
130                 LOG.debug("echo request received: " + echoRequestMessage.getXid());
131                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
132                 builder.setVersion(echoRequestMessage.getVersion());
133                 builder.setXid(echoRequestMessage.getXid());
134                 builder.setData(echoRequestMessage.getData());
135
136                 getConnectionAdapter().echoReply(builder.build());
137             }
138         }).start();
139     }
140
141     @Override
142     public void onErrorMessage(ErrorMessage errorMessage) {
143         queueKeeper.push(errorMessage, this);
144     }
145
146     @Override
147     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
148         queueKeeper.push(experimenterMessage, this);
149     }
150
151     @Override
152     public void onFlowRemovedMessage(FlowRemovedMessage message) {
153         queueKeeper.push(message, this);
154     }
155
156
157     /**
158      * version negotiation happened as per following steps:
159      * 1. If HelloMessage version field has same version, continue connection processing.
160      *    If HelloMessage version is lower than supported versions, just disconnect.
161      * 2. If HelloMessage contains bitmap and common version found in bitmap
162      *    then continue connection processing. if no common version found, just disconnect.
163      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
164      * 4. If Hello message received again with not supported version, just disconnect.
165      *
166      *   TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
167      */
168     @Override
169     public synchronized void onHelloMessage(final HelloMessage hello) {
170         LOG.debug("processing HELLO.xid: {}", hello.getXid());
171         firstHelloProcessed = true;
172         checkState(CONDUCTOR_STATE.HANDSHAKING);
173         HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
174                 hello, handshakeManager, connectionAdapter);
175         hsPool.execute(handshakeStepWrapper);
176     }
177
178     /**
179      * @return rpc-response timeout in [ms]
180      */
181     protected long getMaxTimeout() {
182         // TODO:: get from configuration
183         return 2000;
184     }
185
186     /**
187      * @return milliseconds
188      */
189     protected TimeUnit getMaxTimeoutUnit() {
190         // TODO:: get from configuration
191         return TimeUnit.MILLISECONDS;
192     }
193
194     @Override
195     public void onMultipartReplyMessage(MultipartReplyMessage message) {
196         queueKeeper.push(message, this);
197     }
198
199     @Override
200     public void onPacketInMessage(PacketInMessage message) {
201         queueKeeper.push(message, this);
202     }
203
204     @Override
205     public void onPortStatusMessage(PortStatusMessage message) {
206         processPortStatusMsg(message);
207         queueKeeper.push(message, this);
208     }
209     
210     protected void processPortStatusMsg(PortStatus msg) {
211         if (msg.getReason().getIntValue() == 2) {
212             updatePort(msg);
213         } else if (msg.getReason().getIntValue() == 0) {
214             updatePort(msg);
215         } else if (msg.getReason().getIntValue() == 1) {
216             deletePort(msg);
217         }
218     }
219     
220     protected void updatePort(PortStatus msg) {
221         Long portNumber = msg.getPortNo();        
222         Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg);
223         
224         if(portBandwidth == null) {
225             LOG.warn("can't get bandwidth info from port: {}, aborting port update", msg.toString());
226         } else {
227             this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
228             this.getSessionContext().getPortsBandwidth().put(portNumber, portBandwidth);                   
229         }            
230     }
231     
232     protected void deletePort(Port port) {
233         Long portNumber = port.getPortNo();
234         
235         this.getSessionContext().getPhysicalPorts().remove(portNumber);
236         this.getSessionContext().getPortsBandwidth().remove(portNumber);
237     }
238
239     @Override
240     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
241         new Thread(new Runnable() {
242             @Override
243             public void run() {
244                 if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
245                     // idle state in any other conductorState than WORKING means real
246                     // problem and wont be handled by echoReply, but disconnection
247                     disconnect();
248                     OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
249                 } else {
250                     LOG.debug("first idle state occured");
251                     EchoInputBuilder builder = new EchoInputBuilder();
252                     builder.setVersion(getVersion());
253                     builder.setXid(getSessionContext().getNextXid());
254
255                     Future<RpcResult<EchoOutput>> echoReplyFuture = getConnectionAdapter()
256                             .echo(builder.build());
257
258                     try {
259                         RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
260                                 getMaxTimeoutUnit());
261                         if (echoReplyValue.isSuccessful()) {
262                             setConductorState(CONDUCTOR_STATE.WORKING);
263                         } else {
264                             for (RpcError replyError : echoReplyValue.getErrors()) {
265                                 Throwable cause = replyError.getCause();
266                                 LOG.error(
267                                         "while receiving echoReply in TIMEOUTING state: "
268                                                 + cause.getMessage(), cause);
269                             }
270                             //switch issue occurred
271                             throw new Exception("switch issue occurred");
272                         }
273                     } catch (Exception e) {
274                         LOG.error("while waiting for echoReply in TIMEOUTING state: "
275                                 + e.getMessage());
276                         errorHandler.handleException(e, sessionContext);
277                         //switch is not responding
278                         disconnect();
279                         OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
280                     }
281                 }
282             }
283
284         }).start();
285     }
286
287     /**
288      * @param conductorState
289      *            the connectionState to set
290      */
291     @Override
292     public void setConductorState(CONDUCTOR_STATE conductorState) {
293         this.conductorState = conductorState;
294     }
295
296     @Override
297     public CONDUCTOR_STATE getConductorState() {
298         return conductorState;
299     }
300
301     /**
302      * @param handshaking
303      */
304     protected void checkState(CONDUCTOR_STATE expectedState) {
305         if (!conductorState.equals(expectedState)) {
306             throw new IllegalStateException("Expected state: " + expectedState
307                     + ", actual state:" + conductorState);
308         }
309     }
310
311     @Override
312     public void onDisconnectEvent(DisconnectEvent arg0) {
313         SessionManager sessionManager = OFSessionUtil.getSessionManager();
314         sessionManager.invalidateOnDisconnect(this);
315     }
316
317     @Override
318     public Short getVersion() {
319         return version;
320     }
321
322     @Override
323     public Future<Boolean> disconnect() {
324         LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
325
326         Future<Boolean> result = null;
327         if (connectionAdapter.isAlive()) {
328             result = connectionAdapter.disconnect();
329         } else {
330             LOG.debug("connection already disconnected");
331             result = Futures.immediateFuture(true);
332         }
333
334         return result;
335     }
336
337     @Override
338     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
339         this.auxiliaryKey = auxiliaryKey;
340     }
341
342     @Override
343     public void setSessionContext(SessionContext sessionContext) {
344         this.sessionContext = sessionContext;
345     }
346
347     @Override
348     public SwitchConnectionDistinguisher getAuxiliaryKey() {
349         return auxiliaryKey;
350     }
351
352     @Override
353     public SessionContext getSessionContext() {
354         return sessionContext;
355     }
356
357     @Override
358     public ConnectionAdapter getConnectionAdapter() {
359         return connectionAdapter;
360     }
361
362     @Override
363     public synchronized void onConnectionReady() {
364         LOG.debug("connection is ready-to-use");
365         if (! firstHelloProcessed) {
366             HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
367                     null, handshakeManager, connectionAdapter);
368             hsPool.execute(handshakeStepWrapper);
369             firstHelloProcessed = true;
370         } else {
371             LOG.debug("already touched by hello message");
372         }
373     }
374
375     @Override
376     public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
377             Short negotiatedVersion) {
378         version = negotiatedVersion;
379         conductorState = CONDUCTOR_STATE.WORKING;
380         OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
381         requestDesc();
382         requestPorts();
383         requestGroupFeatures();
384         requestMeterFeatures();
385     }
386
387     /*
388      *  Send an OFPMP_DESC request message to the switch
389      */
390
391     private void requestDesc() {
392         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
393         builder.setType(MultipartType.OFPMPDESC);
394         builder.setVersion(getVersion());
395         builder.setFlags(new MultipartRequestFlags(false));
396         builder.setMultipartRequestBody(new MultipartRequestDescBuilder().build());
397         builder.setXid(getSessionContext().getNextXid());
398         getConnectionAdapter().multipartRequest(builder.build());
399     }
400
401     private void requestPorts() {
402         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
403         builder.setType(MultipartType.OFPMPPORTDESC);
404         builder.setVersion(getVersion());
405         builder.setFlags(new MultipartRequestFlags(false));
406         builder.setMultipartRequestBody(new MultipartRequestPortDescBuilder().build());
407         builder.setXid(getSessionContext().getNextXid());
408         getConnectionAdapter().multipartRequest(builder.build());
409     }
410     private void requestGroupFeatures(){
411         MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
412         mprInput.setType(MultipartType.OFPMPGROUPFEATURES);
413         mprInput.setVersion(getVersion());
414         mprInput.setFlags(new MultipartRequestFlags(false));
415         mprInput.setXid(getSessionContext().getNextXid());
416
417         MultipartRequestGroupFeaturesBuilder mprGroupFeaturesBuild = new MultipartRequestGroupFeaturesBuilder();
418         mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
419
420         LOG.debug("Send group features statistics request :{}",mprGroupFeaturesBuild);
421         getConnectionAdapter().multipartRequest(mprInput.build());
422         
423     }
424     private void requestMeterFeatures(){
425         MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
426         mprInput.setType(MultipartType.OFPMPMETERFEATURES);
427         mprInput.setVersion(getVersion());
428         mprInput.setFlags(new MultipartRequestFlags(false));
429         mprInput.setXid(getSessionContext().getNextXid());
430
431         MultipartRequestMeterFeaturesBuilder mprMeterFeaturesBuild = new MultipartRequestMeterFeaturesBuilder();
432         mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
433
434         LOG.debug("Send meter features statistics request :{}",mprMeterFeaturesBuild);
435         getConnectionAdapter().multipartRequest(mprInput.build());
436         
437     }
438     /**
439      * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set
440      */
441     public void setBitmapNegotiationEnable(
442             boolean isBitmapNegotiationEnable) {
443         this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
444     }
445
446     protected void shutdownPool() {
447         hsPool.shutdownNow();
448         LOG.debug("pool is terminated: {}", hsPool.isTerminated());
449     }
450 }