Merge "Bug 8873 - Bundle based reconciliation to enable bundling of messages"
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / plan / ConnectionAdapterStackImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core.plan;
10
11 import com.google.common.base.Joiner;
12 import com.google.common.collect.Lists;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.net.InetSocketAddress;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.Stack;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import org.opendaylight.controller.sal.common.util.RpcErrors;
29 import org.opendaylight.controller.sal.common.util.Rpcs;
30 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
31 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
32 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
33 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
34 import org.opendaylight.openflowjava.protocol.api.extensibility.AlienMessageListener;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
74 import org.opendaylight.yangtools.yang.binding.Notification;
75 import org.opendaylight.yangtools.yang.common.RpcError;
76 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
77 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
78 import org.opendaylight.yangtools.yang.common.RpcResult;
79 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83 /**
84  * @author mirehak
85  */
86 public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
87
88     /** notify and rpc-response thread start default delay in [ms] */
89     public static final int JOB_DELAY = 50;
90
91     protected static final Logger LOG = LoggerFactory
92             .getLogger(ConnectionAdapterStackImpl.class);
93
94     protected Stack<? extends SwitchTestEvent> eventPlan;
95     protected OpenflowProtocolListener ofListener;
96     protected SystemNotificationsListener systemListener;
97
98     protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
99     protected boolean planTouched = false;
100
101     private long proceedTimeout;
102
103     protected List<Exception> occuredExceptions = new ArrayList<>();
104
105     private ConnectionReadyListener connectionReadyListener;
106
107     private int planItemCounter;
108
109     private boolean autoRead = true;
110     private final ExecutorService pool;
111     private boolean packetInFiltering;
112
113
114     /**
115      * default ctor
116      */
117     public ConnectionAdapterStackImpl() {
118         pool = Executors.newSingleThreadExecutor();
119     }
120
121     @Override
122     public synchronized Future<RpcResult<BarrierOutput>> barrier(
123             final BarrierInput arg0) {
124         checkRpcAndNext(arg0, "barrier");
125         SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
126         return result;
127     }
128
129     @Override
130     public synchronized Future<RpcResult<EchoOutput>> echo(final EchoInput arg0) {
131         checkRpcAndNext(arg0, "echo");
132         Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
133         return result;
134     }
135
136     @Override
137     public Future<RpcResult<Void>> echoReply(final EchoReplyInput arg0) {
138         checkRpcAndNext(arg0, "echoReply");
139         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
140         return result;
141     }
142
143     @Override
144     public Future<RpcResult<Void>> experimenter(final ExperimenterInput arg0) {
145         checkRpcAndNext(arg0, "experimenter");
146         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
147         return result;
148     }
149
150     @Override
151     public Future<RpcResult<Void>> flowMod(final FlowModInput arg0) {
152         checkRpcAndNext(arg0, "flowMod");
153         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
154         return result;
155     }
156
157     @Override
158     public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(
159             final GetAsyncInput arg0) {
160         checkRpcAndNext(arg0, "echo");
161         Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
162         return result;
163     }
164
165     @Override
166     public synchronized Future<RpcResult<GetConfigOutput>> getConfig(
167             final GetConfigInput arg0) {
168         checkRpcAndNext(arg0, "echo");
169         Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
170         return result;
171     }
172
173     @Override
174     public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
175             final GetFeaturesInput arg0) {
176         checkRpcAndNext(arg0, "getFeatures");
177         Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
178         return result;
179     }
180
181     @Override
182     public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
183             final GetQueueConfigInput arg0) {
184         checkRpcAndNext(arg0, "echo");
185         Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
186         return result;
187     }
188
189     @Override
190     public Future<RpcResult<Void>> groupMod(final GroupModInput arg0) {
191         checkRpcAndNext(arg0, "groupMod");
192         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
193         return result;
194     }
195
196     @Override
197     public Future<RpcResult<Void>> hello(final HelloInput arg0) {
198         checkRpcAndNext(arg0, "helloReply");
199         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
200         return result;
201     }
202
203     @Override
204     public Future<RpcResult<Void>> meterMod(final MeterModInput arg0) {
205         checkRpcAndNext(arg0, "meterMod");
206         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
207         return result;
208     }
209
210     @Override
211     public Future<RpcResult<Void>> packetOut(final PacketOutInput arg0) {
212         checkRpcAndNext(arg0, "packetOut");
213         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
214         return result;
215     }
216
217     @Override
218     public Future<RpcResult<Void>> portMod(final PortModInput arg0) {
219         checkRpcAndNext(arg0, "portMod");
220         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
221         return result;
222     }
223
224     @Override
225     public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
226             final RoleRequestInput arg0) {
227         checkRpcAndNext(arg0, "echo");
228         Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
229         return result;
230     }
231
232     @Override
233     public Future<RpcResult<Void>> setAsync(final SetAsyncInput arg0) {
234         checkRpcAndNext(arg0, "setAsync");
235         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
236         return result;
237     }
238
239     @Override
240     public Future<RpcResult<Void>> setConfig(final SetConfigInput arg0) {
241         checkRpcAndNext(arg0, "setConfig");
242         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
243         return result;
244     }
245
246     @Override
247     public Future<RpcResult<Void>> tableMod(final TableModInput arg0) {
248         checkRpcAndNext(arg0, "tableMod");
249         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
250         return result;
251     }
252
253     @Override
254     public Future<Boolean> disconnect() {
255         LOG.debug("adapter is told to disconnect");
256         DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
257         disconnectEventBuilder.setInfo("disconnected by plugin");
258         systemListener.onDisconnectEvent(disconnectEventBuilder.build());
259         return null;
260     }
261
262     @Override
263     public boolean isAlive() {
264         // TODO make dynamic
265         return true;
266     }
267
268     @Override
269     public void setMessageListener(final OpenflowProtocolListener ofListener) {
270         this.ofListener = ofListener;
271     }
272
273     @Override
274     public void checkListeners() {
275         if (ofListener == null || systemListener == null
276                 || connectionReadyListener == null) {
277             occuredExceptions
278                     .add(new IllegalStateException("missing listeners"));
279         }
280     }
281
282     @Override
283     public void setSystemListener(final SystemNotificationsListener systemListener) {
284         this.systemListener = systemListener;
285     }
286
287     @Override
288     public void setAlienMessageListener(final AlienMessageListener alienMessageListener) {
289         
290     }
291
292     /**
293      * @param rpcInput
294      *            rpc call parameter
295      * @param rpcName
296      *            rpc yang name
297      */
298     private boolean checkRpc(final OfHeader rpcInput, final String rpcName) {
299         String msg = null;
300         boolean finished = true;
301
302         if (eventPlan.isEmpty()) {
303             throw new IllegalStateException("eventPlan already depleted");
304         }
305
306         LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName,
307                 rpcInput.getVersion(), rpcInput.getXid());
308         if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
309                 && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
310             if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
311                 SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan
312                         .peek());
313                 msg = "expected [notification: "
314                         + notifEvent.getPlannedNotification() + "], got ["
315                         + rpcInput.getClass().getSimpleName() + "]";
316             } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
317                 SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan
318                         .peek());
319                 msg = "expected [rpc: " + rpcEvent.getPlannedRpcResponse()
320                         + "], got [" + rpcInput.getClass().getSimpleName()
321                         + "]";
322             }
323         } else {
324             if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
325                 SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
326                         .peek();
327                 Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll
328                         .getWaitEventBag();
329                 List<String> msgLot = new ArrayList<>();
330
331                 if (eventBag == null || eventBag.isEmpty()) {
332                     msg = "no wait events in bag";
333                 } else {
334                     finished = false;
335                     for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
336                         String msgPart = checkSingleRpcContent(rpcInput,
337                                 rpcName, switchTestWaitForRpc);
338
339                         if (msgPart != null) {
340                             msgLot.add(msgPart);
341                         } else {
342                             LOG.debug("wait event matched: {}", rpcName);
343                             eventBag.remove(switchTestWaitForRpc);
344                             if (eventBag.isEmpty()) {
345                                 finished = true;
346                             }
347                             msgLot.clear();
348                             break;
349                         }
350                     }
351                 }
352
353                 if (!msgLot.isEmpty()) {
354                     msg = Joiner.on(" | ").join(msgLot);
355                 }
356             } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
357                 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
358                         .peek();
359                 msg = checkSingleRpcContent(rpcInput, rpcName,
360                         switchTestRpcEvent);
361             }
362         }
363
364         if (msg != null) {
365             LOG.debug("rpc check .. FAILED: " + msg);
366             occuredExceptions.add(new IllegalArgumentException("step:"
367                     + planItemCounter + " | " + msg));
368         } else {
369             LOG.debug("rpc check .. OK");
370         }
371         return finished;
372     }
373
374     /**
375      * @param rpcInput
376      * @param rpcName
377      * @param switchTestWaitForRpc
378      * @return
379      */
380     private static String checkSingleRpcContent(final OfHeader rpcInput,
381             final String rpcName, final SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
382         String failureMsg = null;
383         if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
384             failureMsg = "expected rpc name ["
385                     + switchTestWaitForRpc.getRpcName() + "], got [" + rpcName
386                     + "]";
387         } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
388             failureMsg = "expected " + rpcName + ".xid ["
389                     + switchTestWaitForRpc.getXid() + "], got ["
390                     + rpcInput.getXid() + "]";
391         }
392
393         return failureMsg;
394     }
395
396     /**
397      * @param rpcInput
398      *            rpc call parameter
399      * @param rpcName
400      *            rpc yang name
401      */
402     private synchronized void checkRpcAndNext(final OfHeader rpcInput, final String rpcName) {
403         boolean finished = checkRpc(rpcInput, rpcName);
404         if (finished) {
405             next();
406         }
407     }
408
409     /**
410      * discard current event, execute next, if possible
411      */
412     private void next() {
413         LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})",
414                 planItemCounter, eventPlan.peek());
415         eventPlan.pop();
416         planItemCounter++;
417         planTouched = true;
418         try {
419             Thread.sleep(JOB_DELAY);
420         } catch (InterruptedException e) {
421             LOG.error(e.getMessage(), e);
422         }
423         notify();
424     }
425
426     /**
427      * start or continue processing plan
428      */
429     private synchronized void proceed() {
430         boolean processed = false;
431         LOG.debug("proceeding plan item[{}]: {}", planItemCounter,
432                 eventPlan.peek());
433         if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
434             SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
435                     .peek();
436             processNotification(notification);
437             processed = true;
438         } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
439             SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
440                     .peek();
441             processRpcResponse(rpcResponse);
442             processed = true;
443         } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
444             SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan
445                     .peek();
446             try {
447                 callbackEvent.getCallback().call();
448             } catch (Exception e) {
449                 LOG.error(e.getMessage(), e);
450                 occuredExceptions.add(e);
451             }
452             processed = true;
453         }
454
455         if (processed) {
456             next();
457         } else {
458             try {
459                 LOG.debug("now WAITING for OF_LISTENER to act ..");
460                 wait(proceedTimeout);
461             } catch (InterruptedException e) {
462                 LOG.error(e.getMessage(), e);
463             }
464         }
465     }
466
467     @Override
468     public void run() {
469         LOG.debug("|---> evenPlan STARTING ..");
470         planItemCounter = 0;
471         while (!eventPlan.isEmpty()) {
472             planTouched = false;
473             proceed();
474             if (!planTouched) {
475                 occuredExceptions
476                         .add(new IllegalStateException(
477                                 "eventPlan STALLED, planItemCounter="
478                                         + planItemCounter));
479                 break;
480             }
481         }
482
483         try {
484             Thread.sleep(JOB_DELAY);
485         } catch (InterruptedException e) {
486             LOG.error(e.getMessage(), e);
487         }
488         LOG.debug("<---| eventPlan DONE");
489     }
490
491     /**
492      * @param notificationEvent
493      */
494     private synchronized void processNotification(
495             final SwitchTestNotificationEvent notificationEvent) {
496
497         Notification notification = notificationEvent.getPlannedNotification();
498         LOG.debug("notificating OF_LISTENER: "
499                 + notification.getClass().getSimpleName());
500
501         // system events
502         if (notification instanceof DisconnectEvent) {
503             systemListener.onDisconnectEvent((DisconnectEvent) notification);
504         }
505         // of notifications
506         else if (notification instanceof EchoRequestMessage) {
507             ofListener.onEchoRequestMessage((EchoRequestMessage) notification);
508         } else if (notification instanceof ErrorMessage) {
509             ofListener.onErrorMessage((ErrorMessage) notification);
510         } else if (notification instanceof ExperimenterMessage) {
511             ofListener
512                     .onExperimenterMessage((ExperimenterMessage) notification);
513         } else if (notification instanceof FlowRemovedMessage) {
514             ofListener.onFlowRemovedMessage((FlowRemovedMessage) notification);
515         } else if (notification instanceof HelloMessage) {
516             ofListener.onHelloMessage((HelloMessage) notification);
517         } else if (notification instanceof MultipartReplyMessage) {
518             ofListener
519                     .onMultipartReplyMessage((MultipartReplyMessage) notification);
520         } else if (notification instanceof PacketInMessage) {
521             ofListener.onPacketInMessage((PacketInMessage) notification);
522         } else if (notification instanceof PortStatusMessage) {
523             ofListener.onPortStatusMessage((PortStatusMessage) notification);
524         }
525         // default
526         else {
527             occuredExceptions.add(new IllegalStateException("step:"
528                     + planItemCounter + " | "
529                     + "message listening not supported for type: "
530                     + notification.getClass()));
531         }
532
533         LOG.debug("notification [" + notification.getClass().getSimpleName()
534                 + "] .. done");
535     }
536
537     /**
538      * @param rpcResponse
539      */
540     private synchronized void processRpcResponse(
541             final SwitchTestRcpResponseEvent rpcResponse) {
542         OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
543         LOG.debug("rpc-responding to OF_LISTENER: {}", rpcResponse.getXid());
544
545         @SuppressWarnings("unchecked")
546         final SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
547                 .get(rpcResponse.getXid());
548
549         if (response != null) {
550             boolean successful = plannedRpcResponseValue != null;
551             Collection<RpcError> errors;
552             if (successful) {
553                 errors = Collections.emptyList();
554             } else {
555                 errors = Lists.newArrayList(RpcErrors.getRpcError("unit",
556                         "unit", "not requested", ErrorSeverity.ERROR,
557                         "planned response to RPC.id = " + rpcResponse.getXid(),
558                         ErrorType.RPC, new Exception(
559                                 "rpc response failed (planned behavior)")));
560             }
561
562             final RpcResult<?> result = Rpcs.getRpcResult(successful,
563                     plannedRpcResponseValue, errors);
564             setFutureViaPool(response, result);
565         } else {
566             String msg = "RpcResponse not expected: xid="
567                     + rpcResponse.getXid() + ", "
568                     + plannedRpcResponseValue.getClass().getSimpleName();
569             LOG.error(msg);
570             occuredExceptions.add(new IllegalStateException("step:"
571                     + planItemCounter + " | " + msg));
572         }
573
574         LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
575     }
576
577     private void setFutureViaPool(final SettableFuture<RpcResult<?>> response, final RpcResult<?> result) {
578         pool.execute(new Runnable() {
579             @Override
580             public void run() {
581                 response.set(result);
582             }
583         });
584     }
585
586     /**
587      * @param arg0
588      *            rpc call content
589      * @return rpc future result
590      */
591     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
592             final IN arg0) {
593         SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
594         rpcResults.put(arg0.getXid(), result);
595         return result;
596     }
597
598     /**
599      * @return rpc future result
600      */
601     private static ListenableFuture<RpcResult<Void>> createOneWayRpcResult() {
602         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
603     }
604
605     /**
606      * @param eventPlan
607      *            the eventPlan to set
608      */
609     public void setEventPlan(final Stack<? extends SwitchTestEvent> eventPlan) {
610         this.eventPlan = eventPlan;
611     }
612
613     /**
614      * @param proceedTimeout
615      *            max timeout for processing one planned event (in [ms])
616      */
617     public void setProceedTimeout(final long proceedTimeout) {
618         this.proceedTimeout = proceedTimeout;
619     }
620
621     /**
622      * @return the occuredExceptions
623      */
624     public List<Exception> getOccuredExceptions() {
625         return occuredExceptions;
626     }
627
628     @Override
629     public void fireConnectionReadyNotification() {
630         connectionReadyListener.onConnectionReady();
631     }
632
633     @Override
634     public void setConnectionReadyListener(
635             final ConnectionReadyListener connectionReadyListener) {
636         this.connectionReadyListener = connectionReadyListener;
637     }
638
639     @Override
640     public Future<RpcResult<Void>> multipartRequest(final MultipartRequestInput arg0) {
641         checkRpcAndNext(arg0, "multipartRequestInput");
642         ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
643         return result;
644     }
645
646     @Override
647     public InetSocketAddress getRemoteAddress() {
648         return InetSocketAddress.createUnresolved("unittest-odl.example.org", 4242);
649     }
650
651     @Override
652     public boolean isAutoRead() {
653         return autoRead;
654     }
655
656     @Override
657     public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(final T t, final int i, final long l) {
658         return null;
659     }
660
661     @Override
662     public void setAutoRead(final boolean autoRead) {
663         this.autoRead = autoRead;
664     }
665
666     @Override
667     public void setPacketInFiltering(final boolean packetInFiltering) {
668         this.packetInFiltering = packetInFiltering;
669     }
670 }