Adding Listener Registration for OF Messages in plugin
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.Collection;
12 import java.util.List;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.LinkedBlockingQueue;
15 import java.util.concurrent.TimeUnit;
16
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
18 import org.opendaylight.openflowplugin.openflow.core.IMessageListener;
19 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
20 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
21 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
41 import org.opendaylight.yangtools.yang.binding.DataObject;
42 import org.opendaylight.yangtools.yang.common.RpcError;
43 import org.opendaylight.yangtools.yang.common.RpcResult;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.Lists;
49
50 /**
51  * @author mirehak
52  */
53 public class ConnectionConductorImpl implements OpenflowProtocolListener,
54         SystemNotificationsListener, ConnectionConductor {
55
56     private static final Logger LOG = LoggerFactory
57             .getLogger(ConnectionConductorImpl.class);
58
59     private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
60
61     private final ConnectionAdapter connectionAdapter;
62     private final List<Short> versionOrder;
63     private ConnectionConductor.CONDUCTOR_STATE conductorState;
64     private Short version;
65
66     private SwitchConnectionDistinguisher auxiliaryKey;
67
68     private SessionContext sessionContext;
69
70     private ImmutableMap<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
71
72     /**
73      * @param connectionAdapter
74      */
75     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
76         this.connectionAdapter = connectionAdapter;
77         conductorState = CONDUCTOR_STATE.HANDSHAKING;
78         versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
79         new Thread(new ErrorQueueHandler(errorQueue)).start();
80     }
81
82     @Override
83     public void init() {
84         connectionAdapter.setMessageListener(this);
85         connectionAdapter.setSystemListener(this);
86     }
87
88     @Override
89     public void onEchoRequestMessage(EchoRequestMessage echoRequestMessage) {
90         LOG.debug("echo request received: " + echoRequestMessage.getXid());
91         EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
92         builder.setVersion(echoRequestMessage.getVersion());
93         builder.setXid(echoRequestMessage.getXid());
94         builder.setData(echoRequestMessage.getData());
95
96         connectionAdapter.echoReply(builder.build());
97     }
98
99     @Override
100     public void onErrorMessage(ErrorMessage errorMessage) {
101         // TODO Auto-generated method stub
102         LOG.debug("error received, type: " + errorMessage.getType()
103                 + "; code: " + errorMessage.getCode());
104     }
105
106     @Override
107     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
108         // TODO Auto-generated method stub
109         LOG.debug("experimenter received, type: "
110                 + experimenterMessage.getExpType());
111     }
112
113     @Override
114     public void onFlowRemovedMessage(FlowRemovedMessage arg0) {
115         // TODO Auto-generated method stub
116     }
117
118     @Override
119     public void onHelloMessage(HelloMessage hello) {
120         // do handshake
121         LOG.info("handshake STARTED");
122         checkState(CONDUCTOR_STATE.HANDSHAKING);
123
124         Short remoteVersion = hello.getVersion();
125         long xid = hello.getXid();
126         short proposedVersion;
127         try {
128             proposedVersion = proposeVersion(remoteVersion);
129         } catch (Exception e) {
130             handleException(e);
131             throw e;
132         }
133         HelloInputBuilder helloBuilder = new HelloInputBuilder();
134         xid++;
135         helloBuilder.setVersion(proposedVersion).setXid(xid);
136         LOG.debug("sending helloReply");
137         connectionAdapter.hello(helloBuilder.build());
138
139         if (proposedVersion != remoteVersion) {
140             // need to wait for another hello
141         } else {
142             // sent version is equal to remote --> version is negotiated
143             version = proposedVersion;
144             LOG.debug("version set: " + proposedVersion);
145
146             // request features
147             GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
148             xid++;
149             featuresBuilder.setVersion(version).setXid(xid);
150             Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
151                     .getFeatures(featuresBuilder.build());
152             LOG.debug("waiting for features");
153             RpcResult<GetFeaturesOutput> rpcFeatures;
154             try {
155                 rpcFeatures = featuresFuture.get(getMaxTimeout(),
156                         TimeUnit.MILLISECONDS);
157                 if (!rpcFeatures.isSuccessful()) {
158                     LOG.error("obtained features problem: "
159                             + rpcFeatures.getErrors());
160                 } else {
161                     LOG.debug("obtained features: datapathId="
162                             + rpcFeatures.getResult().getDatapathId());
163                     conductorState = CONDUCTOR_STATE.WORKING;
164
165                     OFSessionUtil.registerSession(this,
166                             rpcFeatures.getResult(), version);
167                     LOG.info("handshake SETTLED");
168                 }
169             } catch (Exception e) {
170                 handleException(e);
171             }
172         }
173     }
174
175     /**
176      * @return rpc-response timeout in [ms]
177      */
178     private long getMaxTimeout() {
179         // TODO:: get from configuration
180         return 2000;
181     }
182
183     /**
184      * @param e
185      */
186     private void handleException(Exception e) {
187         try {
188             errorQueue.put(e);
189         } catch (InterruptedException e1) {
190             LOG.error(e1.getMessage(), e1);
191         }
192     }
193
194     @Override
195     public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
196         // TODO Auto-generated method stub
197     }
198
199     @Override
200     public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
201         // TODO Auto-generated method stub
202     }
203
204     @Override
205     public void onPacketInMessage(PacketInMessage message) {
206         notifyListeners(PacketInMessage.class, message);
207     }
208
209     @Override
210     public void onPortStatusMessage(PortStatusMessage arg0) {
211         // TODO Auto-generated method stub
212     }
213
214     @Override
215     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
216         if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
217             // idle state in any other conductorState than WORKING means real
218             // problem and wont be handled by echoReply, but disconnection
219             OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
220         } else {
221             LOG.debug("first idle state occured");
222             EchoInputBuilder builder = new EchoInputBuilder();
223             builder.setVersion(version);
224             // TODO: get xid from sessionContext
225             builder.setXid(42L);
226
227             Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
228                     .echo(builder.build());
229
230             try {
231                 // TODO: read timeout from config
232                 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(5,
233                         TimeUnit.SECONDS);
234                 if (echoReplyValue.isSuccessful()) {
235                     conductorState = CONDUCTOR_STATE.WORKING;
236                 } else {
237                     for (RpcError replyError : echoReplyValue.getErrors()) {
238                         Throwable cause = replyError.getCause();
239                         LOG.error(
240                                 "while receiving echoReply in TIMEOUTING state: "
241                                         + cause.getMessage(), cause);
242                     }
243                 }
244             } catch (Exception e) {
245                 LOG.error("while waiting for echoReply in TIMEOUTING state: "
246                         + e.getMessage(), e);
247             }
248         }
249     }
250
251     /**
252      * @param conductorState
253      *            the connectionState to set
254      */
255     @Override
256     public void setConductorState(CONDUCTOR_STATE conductorState) {
257         this.conductorState = conductorState;
258     }
259
260     @Override
261     public CONDUCTOR_STATE getConductorState() {
262         return conductorState;
263     }
264
265     /**
266      * @param handshaking
267      */
268     private void checkState(CONDUCTOR_STATE expectedState) {
269         if (!conductorState.equals(expectedState)) {
270             throw new IllegalStateException("Expected state: " + expectedState
271                     + ", actual state:" + conductorState);
272         }
273     }
274
275     @Override
276     public void onDisconnectEvent(DisconnectEvent arg0) {
277         SessionManager sessionManager = OFSessionUtil.getSessionManager();
278         sessionManager.invalidateOnDisconnect(this);
279     }
280
281     protected short proposeVersion(short remoteVersion) {
282         Short proposal = null;
283         for (short offer : versionOrder) {
284             if (offer <= remoteVersion) {
285                 proposal = offer;
286                 break;
287             }
288         }
289         if (proposal == null) {
290             throw new IllegalArgumentException("unsupported version: "
291                     + remoteVersion);
292         }
293         return proposal;
294     }
295
296     @Override
297     public Short getVersion() {
298         return version;
299     }
300
301     @Override
302     public Future<Boolean> disconnect() {
303         return connectionAdapter.disconnect();
304     }
305
306     @Override
307     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
308         this.auxiliaryKey = auxiliaryKey;
309     }
310
311     @Override
312     public void setSessionContext(SessionContext sessionContext) {
313         this.sessionContext = sessionContext;
314     }
315
316     @Override
317     public SwitchConnectionDistinguisher getAuxiliaryKey() {
318         return auxiliaryKey;
319     }
320
321     @Override
322     public SessionContext getSessionContext() {
323         return sessionContext;
324     }
325
326     /**
327      * @param listenerMapping the listenerMapping to set
328      */
329     public void setListenerMapping(
330             ImmutableMap<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
331         //TODO: adjust the listener interface
332         this.listenerMapping = listenerMapping;
333     }
334
335     /**
336      * @param messageType
337      * @param message
338      */
339     private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
340         Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
341         if (listeners != null) {
342                 for (IMDMessageListener listener : listeners) {
343                 //TODO : need to add unit-tests
344                 //listener.receive(this.getAuxiliaryKey().getId(), this.getSessionContext(), message);
345             }
346         }
347     }
348 }