first proposal of integration plugin - library
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / plan / ConnectionAdapterStackImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core.plan;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.HashMap;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Stack;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.ScheduledThreadPoolExecutor;
21 import java.util.concurrent.TimeUnit;
22
23 import org.opendaylight.controller.sal.common.util.RpcErrors;
24 import org.opendaylight.controller.sal.common.util.Rpcs;
25 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
64 import org.opendaylight.yangtools.yang.binding.Notification;
65 import org.opendaylight.yangtools.yang.common.RpcError;
66 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
67 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
68 import org.opendaylight.yangtools.yang.common.RpcResult;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 import com.google.common.collect.Lists;
73 import com.google.common.util.concurrent.SettableFuture;
74
75 /**
76  * @author mirehak
77  */
78 public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
79
80     /** notify and rpc-response thread start default delay in [ms] */
81     public static final int JOB_DELAY = 50;
82
83     protected static final Logger LOG = LoggerFactory
84             .getLogger(ConnectionAdapterStackImpl.class);
85
86     protected Stack<? extends SwitchTestEvent> eventPlan;
87     protected OpenflowProtocolListener ofListener;
88     protected SystemNotificationsListener systemListener;
89
90     protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
91     private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
92             8);
93     protected boolean planTouched = false;
94
95     private long proceedTimeout;
96
97     protected List<Exception> occuredExceptions = new ArrayList<>();
98
99     /**
100      * default ctor
101      */
102     public ConnectionAdapterStackImpl() {
103         // do nothing
104     }
105
106     @Override
107     public Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
108         checkRpc(arg0, "barrier");
109         SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
110         next();
111         return result;
112     }
113
114     @Override
115     public Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
116         checkRpc(arg0, "echo");
117         Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
118         next();
119         return result;
120     }
121
122     @Override
123     public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
124         checkRpc(arg0, "echoReply");
125         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
126         next();
127         return result;
128     }
129
130     @Override
131     public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
132         checkRpc(arg0, "experimenter");
133         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
134         next();
135         return result;
136     }
137
138     @Override
139     public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
140         checkRpc(arg0, "flowMod");
141         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
142         next();
143         return result;
144     }
145
146     @Override
147     public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
148         checkRpc(arg0, "echo");
149         Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
150         next();
151         return result;
152     }
153
154     @Override
155     public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
156         checkRpc(arg0, "echo");
157         Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
158         next();
159         return result;
160     }
161
162     @Override
163     public Future<RpcResult<GetFeaturesOutput>> getFeatures(
164             GetFeaturesInput arg0) {
165         checkRpc(arg0, "getFeatures");
166         Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
167         next();
168         return result;
169     }
170
171     @Override
172     public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
173             GetQueueConfigInput arg0) {
174         checkRpc(arg0, "echo");
175         Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
176         next();
177         return result;
178     }
179
180     @Override
181     public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
182         checkRpc(arg0, "groupMod");
183         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
184         next();
185         return result;
186     }
187
188     @Override
189     public Future<RpcResult<Void>> hello(HelloInput arg0) {
190         checkRpc(arg0, "helloReply");
191         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
192         next();
193         return result;
194     }
195
196     @Override
197     public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
198         checkRpc(arg0, "meterMod");
199         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
200         next();
201         return result;
202     }
203
204     @Override
205     public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
206         checkRpc(arg0, "packetOut");
207         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
208         next();
209         return result;
210     }
211
212     @Override
213     public Future<RpcResult<Void>> portMod(PortModInput arg0) {
214         checkRpc(arg0, "portMod");
215         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
216         next();
217         return result;
218     }
219
220     @Override
221     public Future<RpcResult<RoleRequestOutput>> roleRequest(
222             RoleRequestInput arg0) {
223         checkRpc(arg0, "echo");
224         Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
225         next();
226         return result;
227     }
228
229     @Override
230     public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
231         checkRpc(arg0, "setAsync");
232         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
233         next();
234         return result;
235     }
236
237     @Override
238     public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
239         checkRpc(arg0, "setConfig");
240         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
241         next();
242         return result;
243     }
244
245     @Override
246     public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
247         checkRpc(arg0, "tableMod");
248         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
249         next();
250         return result;
251     }
252
253     @Override
254     public Future<Boolean> disconnect() {
255         // TODO Auto-generated method stub
256         return null;
257     }
258
259     @Override
260     public boolean isAlive() {
261         // TODO make dynamic
262         return true;
263     }
264
265     @Override
266     public void setMessageListener(OpenflowProtocolListener ofListener) {
267         this.ofListener = ofListener;
268     }
269
270     @Override
271     public void checkListeners() {
272         if (ofListener == null || systemListener == null) {
273             occuredExceptions
274                     .add(new IllegalStateException("missing listeners"));
275         }
276     }
277
278     @Override
279     public void setSystemListener(SystemNotificationsListener systemListener) {
280         this.systemListener = systemListener;
281     }
282
283     /**
284      * @param rpcInput
285      *            rpc call parameter
286      * @param rpcName
287      *            rpc yang name
288      */
289     private synchronized void checkRpc(OfHeader rpcInput, String rpcName) {
290         String msg = null;
291         LOG.debug("checking rpc: " + rpcName);
292         if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)) {
293             msg = "expected [rpc], got [" + rpcInput.getClass().getSimpleName()
294                     + "]";
295         } else {
296             SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
297                     .peek();
298             if (!rpcName.equals(switchTestRpcEvent.getRpcName())) {
299                 msg = "expected rpc name [" + switchTestRpcEvent.getRpcName()
300                         + "], got [" + rpcName + "]";
301             } else if (!rpcInput.getXid().equals(switchTestRpcEvent.getXid())) {
302                 msg = "expected xid [" + switchTestRpcEvent.getXid()
303                         + "], got [" + rpcInput.getXid() + "]";
304             }
305         }
306
307         if (msg != null) {
308             LOG.debug("check .. FAILED: " + msg);
309             occuredExceptions.add(new IllegalArgumentException(msg));
310         }
311         LOG.debug("check .. OK");
312     }
313
314     /**
315      * discard current event, execute next, if possible
316      */
317     private synchronized void next() {
318         LOG.debug("STEPPING TO NEXT event in plan");
319         eventPlan.pop();
320         planTouched = true;
321         notify();
322     }
323
324     /**
325      * start or continue processing plan
326      */
327     private synchronized void proceed() {
328         boolean processed = false;
329         LOG.debug("proceeding plan item: " + eventPlan.peek());
330         if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
331             SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
332                     .peek();
333             processNotification(notification);
334             processed = true;
335         } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
336             SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
337                     .peek();
338             processRpcResponse(rpcResponse);
339             processed = true;
340         }
341
342         if (processed) {
343             next();
344         } else {
345             try {
346                 LOG.debug("now waiting for HANDLER to act");
347                 wait(proceedTimeout);
348             } catch (InterruptedException e) {
349                 LOG.error(e.getMessage(), e);
350             }
351         }
352     }
353
354     @Override
355     public void run() {
356         LOG.debug("evenPlan STARTING ..");
357         while (!eventPlan.isEmpty()) {
358             planTouched = false;
359             proceed();
360             if (!planTouched) {
361                 occuredExceptions.add(new IllegalStateException(
362                         "eventPlan STALLED"));
363             }
364         }
365
366         try {
367             pool.awaitTermination(10 * JOB_DELAY, TimeUnit.MILLISECONDS);
368         } catch (InterruptedException e) {
369             LOG.error(e.getMessage(), e);
370         }
371         LOG.debug("eventPlan done");
372     }
373
374     /**
375      * @param notificationEvent
376      */
377     private void processNotification(
378             final SwitchTestNotificationEvent notificationEvent) {
379
380         Callable<Void> notifyCmd = new Callable<Void>() {
381             @Override
382             public Void call() throws Exception {
383                 Notification notification = notificationEvent
384                         .getPlannedNotification();
385                 LOG.debug("notificating HANDLER: "
386                         + notification.getClass().getSimpleName());
387
388                 // system events
389                 if (notification instanceof DisconnectEvent) {
390                     systemListener
391                             .onDisconnectEvent((DisconnectEvent) notification);
392                 }
393                 // of notifications
394                 else if (notification instanceof EchoRequestMessage) {
395                     ofListener
396                             .onEchoRequestMessage((EchoRequestMessage) notification);
397                 } else if (notification instanceof ErrorMessage) {
398                     ofListener.onErrorMessage((ErrorMessage) notification);
399                 } else if (notification instanceof ExperimenterMessage) {
400                     ofListener
401                             .onExperimenterMessage((ExperimenterMessage) notification);
402                 } else if (notification instanceof FlowRemovedMessage) {
403                     ofListener
404                             .onFlowRemovedMessage((FlowRemovedMessage) notification);
405                 } else if (notification instanceof HelloMessage) {
406                     ofListener.onHelloMessage((HelloMessage) notification);
407                 } else if (notification instanceof MultipartReplyMessage) {
408                     ofListener
409                             .onMultipartReplyMessage((MultipartReplyMessage) notification);
410                 } else if (notification instanceof MultipartRequestMessage) {
411                     ofListener
412                             .onMultipartRequestMessage((MultipartRequestMessage) notification);
413                 } else if (notification instanceof PacketInMessage) {
414                     ofListener
415                             .onPacketInMessage((PacketInMessage) notification);
416                 } else if (notification instanceof PortStatusMessage) {
417                     ofListener
418                             .onPortStatusMessage((PortStatusMessage) notification);
419                 }
420                 // default
421                 else {
422                     occuredExceptions.add(new IllegalStateException(
423                             "message listening not supported for type: "
424                                     + notification.getClass()));
425                 }
426
427                 LOG.debug("thread finished");
428                 return null;
429             }
430
431         };
432
433         pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
434     }
435
436     /**
437      * @param rpcResponse
438      */
439     private void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
440         Callable<Void> notifyCmd = new Callable<Void>() {
441             @Override
442             public Void call() throws Exception {
443
444                 OfHeader plannedRpcResponseValue = rpcResponse
445                         .getPlannedRpcResponse();
446                 LOG.debug("rpc-responding to HANDLER: " + rpcResponse.getXid());
447
448                 @SuppressWarnings("unchecked")
449                 SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
450                         .get(rpcResponse.getXid());
451
452                 if (response != null) {
453                     boolean successful = plannedRpcResponseValue != null;
454                     Collection<RpcError> errors;
455                     if (successful) {
456                         errors = Collections.emptyList();
457                     } else {
458                         errors = Lists
459                                 .newArrayList(RpcErrors
460                                         .getRpcError(
461                                                 "unit",
462                                                 "unit",
463                                                 "not requested",
464                                                 ErrorSeverity.ERROR,
465                                                 "planned response to RPC.id = "
466                                                         + rpcResponse.getXid(),
467                                                 ErrorType.RPC,
468                                                 new Exception(
469                                                         "rpc response failed (planned behavior)")));
470                     }
471                     RpcResult<?> result = Rpcs.getRpcResult(successful,
472                             plannedRpcResponseValue, errors);
473                     response.set(result);
474                 } else {
475                     String msg = "RpcResponse not expected: xid="
476                             + rpcResponse.getXid()
477                             + ", "
478                             + plannedRpcResponseValue.getClass()
479                                     .getSimpleName();
480                     LOG.error(msg);
481                     occuredExceptions.add(new IllegalStateException(msg));
482                 }
483
484                 LOG.debug("thread finished");
485                 return null;
486             }
487         };
488
489         pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
490     }
491
492     /**
493      * @param arg0
494      *            rpc call content
495      * @return rpc future result
496      */
497     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
498             IN arg0) {
499         SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
500         rpcResults.put(arg0.getXid(), result);
501         return result;
502     }
503
504     /**
505      * @return rpc future result
506      */
507     private static SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
508         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
509         result.set(null);
510         return result;
511     }
512
513     /**
514      * @param eventPlan
515      *            the eventPlan to set
516      */
517     public void setEventPlan(Stack<? extends SwitchTestEvent> eventPlan) {
518         this.eventPlan = eventPlan;
519     }
520
521     /**
522      * @param proceedTimeout
523      *            max timeout for processing one planned event (in [ms])
524      */
525     public void setProceedTimeout(long proceedTimeout) {
526         this.proceedTimeout = proceedTimeout;
527     }
528
529     /**
530      * @return the occuredExceptions
531      */
532     public List<Exception> getOccuredExceptions() {
533         return occuredExceptions;
534     }
535 }