Fixed breakage from changes in openflowjava (which were my fault :( ).
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / plan / ConnectionAdapterStackImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core.plan;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.HashMap;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Set;
18 import java.util.Stack;
19 import java.util.concurrent.Future;
20
21 import org.opendaylight.controller.sal.common.util.RpcErrors;
22 import org.opendaylight.controller.sal.common.util.Rpcs;
23 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
24 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SendMultipartRequestMessageInput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
65 import org.opendaylight.yangtools.yang.binding.Notification;
66 import org.opendaylight.yangtools.yang.common.RpcError;
67 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
68 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
69 import org.opendaylight.yangtools.yang.common.RpcResult;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 import com.google.common.base.Joiner;
74 import com.google.common.collect.Lists;
75 import com.google.common.util.concurrent.SettableFuture;
76
77 /**
78  * @author mirehak
79  */
80 public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
81
82     /** notify and rpc-response thread start default delay in [ms] */
83     public static final int JOB_DELAY = 50;
84
85     protected static final Logger LOG = LoggerFactory
86             .getLogger(ConnectionAdapterStackImpl.class);
87
88     protected Stack<? extends SwitchTestEvent> eventPlan;
89     protected OpenflowProtocolListener ofListener;
90     protected SystemNotificationsListener systemListener;
91
92     protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
93     protected boolean planTouched = false;
94
95     private long proceedTimeout;
96
97     protected List<Exception> occuredExceptions = new ArrayList<>();
98
99     private ConnectionReadyListener connectionReadyListener;
100
101     private int planItemCounter;
102
103     /**
104      * default ctor
105      */
106     public ConnectionAdapterStackImpl() {
107         // do nothing
108     }
109
110     @Override
111     public synchronized Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
112         checkRpcAndNext(arg0, "barrier");
113         SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
114         return result;
115     }
116
117     @Override
118     public synchronized Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
119         checkRpcAndNext(arg0, "echo");
120         Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
121         return result;
122     }
123
124     @Override
125     public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
126         checkRpcAndNext(arg0, "echoReply");
127         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
128         return result;
129     }
130
131     @Override
132     public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
133         checkRpcAndNext(arg0, "experimenter");
134         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
135         return result;
136     }
137
138     @Override
139     public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
140         checkRpcAndNext(arg0, "flowMod");
141         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
142         return result;
143     }
144
145     @Override
146     public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
147         checkRpcAndNext(arg0, "echo");
148         Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
149         return result;
150     }
151
152     @Override
153     public synchronized Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
154         checkRpcAndNext(arg0, "echo");
155         Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
156         return result;
157     }
158
159     @Override
160     public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
161             GetFeaturesInput arg0) {
162         checkRpcAndNext(arg0, "getFeatures");
163         Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
164         return result;
165     }
166
167     @Override
168     public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
169             GetQueueConfigInput arg0) {
170         checkRpcAndNext(arg0, "echo");
171         Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
172         return result;
173     }
174
175     @Override
176     public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
177         checkRpcAndNext(arg0, "groupMod");
178         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
179         return result;
180     }
181
182     @Override
183     public Future<RpcResult<Void>> hello(HelloInput arg0) {
184         checkRpcAndNext(arg0, "helloReply");
185         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
186         return result;
187     }
188
189     @Override
190     public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
191         checkRpcAndNext(arg0, "meterMod");
192         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
193         return result;
194     }
195
196     @Override
197     public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
198         checkRpcAndNext(arg0, "packetOut");
199         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
200         return result;
201     }
202
203     @Override
204     public Future<RpcResult<Void>> portMod(PortModInput arg0) {
205         checkRpcAndNext(arg0, "portMod");
206         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
207         return result;
208     }
209
210     @Override
211     public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
212             RoleRequestInput arg0) {
213         checkRpcAndNext(arg0, "echo");
214         Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
215         return result;
216     }
217
218     @Override
219     public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
220         checkRpcAndNext(arg0, "setAsync");
221         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
222         return result;
223     }
224
225     @Override
226     public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
227         checkRpcAndNext(arg0, "setConfig");
228         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
229         return result;
230     }
231
232     @Override
233     public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
234         checkRpcAndNext(arg0, "tableMod");
235         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
236         return result;
237     }
238
239     @Override
240     public Future<Boolean> disconnect() {
241         LOG.info("adapter is told to disconnect");
242         DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
243         disconnectEventBuilder.setInfo("disconnected by plugin");
244         systemListener.onDisconnectEvent(disconnectEventBuilder.build());
245         return null;
246     }
247
248     @Override
249     public boolean isAlive() {
250         // TODO make dynamic
251         return true;
252     }
253
254     @Override
255     public void setMessageListener(OpenflowProtocolListener ofListener) {
256         this.ofListener = ofListener;
257     }
258
259     @Override
260     public void checkListeners() {
261         if (ofListener == null || systemListener == null || connectionReadyListener == null) {
262             occuredExceptions
263                     .add(new IllegalStateException("missing listeners"));
264         }
265     }
266
267     @Override
268     public void setSystemListener(SystemNotificationsListener systemListener) {
269         this.systemListener = systemListener;
270     }
271
272     /**
273      * @param rpcInput
274      *            rpc call parameter
275      * @param rpcName
276      *            rpc yang name
277      */
278     private boolean checkRpc(OfHeader rpcInput, String rpcName) {
279         String msg = null;
280         boolean finished = true;
281
282         if (eventPlan.isEmpty()) {
283             throw new IllegalStateException("eventPlan already depleted");
284         }
285
286         LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, rpcInput.getVersion(), rpcInput.getXid());
287         if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
288                 && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
289             if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
290                 SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
291                 msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
292                         + "]";
293             } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
294                 SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan.peek());
295                 msg = "expected [rpc: " +rpcEvent.getPlannedRpcResponse()+ "], got [" + rpcInput.getClass().getSimpleName()
296                         + "]";
297             }
298         } else {
299             if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
300                 SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
301                         .peek();
302                 Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll.getWaitEventBag();
303                 List<String> msgLot = new ArrayList<>();
304
305                 if (eventBag == null || eventBag.isEmpty()) {
306                     msg = "no wait events in bag";
307                 } else {
308                     finished = false;
309                     for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
310                         String msgPart = checkSingleRpcContent(rpcInput, rpcName, switchTestWaitForRpc);
311
312                         if (msgPart != null) {
313                             msgLot.add(msgPart);
314                         } else {
315                             LOG.debug("wait event matched: {}", rpcName);
316                             eventBag.remove(switchTestWaitForRpc);
317                             if (eventBag.isEmpty()) {
318                                 finished = true;
319                             }
320                             msgLot.clear();
321                             break;
322                         }
323                     }
324                 }
325
326                 if (!msgLot.isEmpty()) {
327                     msg = Joiner.on(" | ").join(msgLot);
328                 }
329             } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
330                 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
331                         .peek();
332                 msg = checkSingleRpcContent(rpcInput, rpcName, switchTestRpcEvent);
333             }
334         }
335
336         if (msg != null) {
337             LOG.debug("rpc check .. FAILED: " + msg);
338             occuredExceptions.add(new IllegalArgumentException("step:"+planItemCounter+" | "+msg));
339         } else {
340             LOG.debug("rpc check .. OK");
341         }
342         return finished;
343     }
344
345     /**
346      * @param rpcInput
347      * @param rpcName
348      * @param msgTmp
349      * @param switchTestWaitForRpc
350      * @return
351      */
352     private static String checkSingleRpcContent(OfHeader rpcInput, String rpcName,
353             SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
354         String failureMsg = null;
355         if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
356             failureMsg = "expected rpc name [" + switchTestWaitForRpc.getRpcName()
357                     + "], got [" + rpcName + "]";
358         } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
359             failureMsg = "expected "+rpcName+".xid [" + switchTestWaitForRpc.getXid()
360                     + "], got [" + rpcInput.getXid() + "]";
361         }
362
363         return failureMsg;
364     }
365
366     /**
367      * @param rpcInput
368      *            rpc call parameter
369      * @param rpcName
370      *            rpc yang name
371      */
372     private synchronized void checkRpcAndNext(OfHeader rpcInput, String rpcName) {
373         boolean finished = checkRpc(rpcInput, rpcName);
374         if (finished) {
375             next();
376         }
377     }
378
379     /**
380      * discard current event, execute next, if possible
381      */
382     private void next() {
383         LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
384         eventPlan.pop();
385         planItemCounter ++;
386         planTouched = true;
387         try {
388             Thread.sleep(JOB_DELAY);
389         } catch (InterruptedException e) {
390             LOG.error(e.getMessage(), e);
391         }
392         notify();
393     }
394
395     /**
396      * start or continue processing plan
397      */
398     private synchronized void proceed() {
399         boolean processed = false;
400         LOG.debug("proceeding plan item[{}]: {}", planItemCounter, eventPlan.peek());
401         if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
402             SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
403                     .peek();
404             processNotification(notification);
405             processed = true;
406         } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
407             SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
408                     .peek();
409             processRpcResponse(rpcResponse);
410             processed = true;
411         } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
412             SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan.peek();
413             try {
414                 callbackEvent.getCallback().call();
415             } catch (Exception e) {
416                 LOG.error(e.getMessage(), e);
417                occuredExceptions.add(e);
418             }
419             processed = true;
420         }
421
422         if (processed) {
423             next();
424         } else {
425             try {
426                 LOG.debug("now WAITING for OF_LISTENER to act ..");
427                 wait(proceedTimeout);
428             } catch (InterruptedException e) {
429                 LOG.error(e.getMessage(), e);
430             }
431         }
432     }
433
434     @Override
435     public void run() {
436         LOG.debug("|---> evenPlan STARTING ..");
437         planItemCounter = 0;
438         while (!eventPlan.isEmpty()) {
439             planTouched = false;
440             proceed();
441             if (!planTouched) {
442                 occuredExceptions.add(new IllegalStateException(
443                         "eventPlan STALLED, planItemCounter="+planItemCounter));
444                 break;
445             }
446         }
447
448         try {
449             Thread.sleep(JOB_DELAY);
450         } catch (InterruptedException e) {
451             LOG.error(e.getMessage(), e);
452         }
453         LOG.debug("<---| eventPlan DONE");
454     }
455
456     /**
457      * @param notificationEvent
458      */
459     private synchronized void processNotification(
460             final SwitchTestNotificationEvent notificationEvent) {
461
462         Notification notification = notificationEvent
463                 .getPlannedNotification();
464         LOG.debug("notificating OF_LISTENER: "
465                 + notification.getClass().getSimpleName());
466
467         // system events
468         if (notification instanceof DisconnectEvent) {
469             systemListener
470             .onDisconnectEvent((DisconnectEvent) notification);
471         }
472         // of notifications
473         else if (notification instanceof EchoRequestMessage) {
474             ofListener
475             .onEchoRequestMessage((EchoRequestMessage) notification);
476         } else if (notification instanceof ErrorMessage) {
477             ofListener.onErrorMessage((ErrorMessage) notification);
478         } else if (notification instanceof ExperimenterMessage) {
479             ofListener
480             .onExperimenterMessage((ExperimenterMessage) notification);
481         } else if (notification instanceof FlowRemovedMessage) {
482             ofListener
483             .onFlowRemovedMessage((FlowRemovedMessage) notification);
484         } else if (notification instanceof HelloMessage) {
485             ofListener.onHelloMessage((HelloMessage) notification);
486         } else if (notification instanceof MultipartReplyMessage) {
487             ofListener
488             .onMultipartReplyMessage((MultipartReplyMessage) notification);
489         } else if (notification instanceof MultipartRequestMessage) {
490             ofListener
491             .onMultipartRequestMessage((MultipartRequestMessage) notification);
492         } else if (notification instanceof PacketInMessage) {
493             ofListener
494             .onPacketInMessage((PacketInMessage) notification);
495         } else if (notification instanceof PortStatusMessage) {
496             ofListener
497             .onPortStatusMessage((PortStatusMessage) notification);
498         }
499         // default
500         else {
501             occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " +
502                     "message listening not supported for type: "
503                             + notification.getClass()));
504         }
505
506         LOG.debug("notification ["+notification.getClass().getSimpleName()+"] .. done");
507     }
508
509     /**
510      * @param rpcResponse
511      */
512     private synchronized void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
513         OfHeader plannedRpcResponseValue = rpcResponse
514                 .getPlannedRpcResponse();
515         LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
516
517         @SuppressWarnings("unchecked")
518         SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
519         .get(rpcResponse.getXid());
520
521         if (response != null) {
522             boolean successful = plannedRpcResponseValue != null;
523             Collection<RpcError> errors;
524             if (successful) {
525                 errors = Collections.emptyList();
526             } else {
527                 errors = Lists
528                         .newArrayList(RpcErrors
529                                 .getRpcError(
530                                         "unit",
531                                         "unit",
532                                         "not requested",
533                                         ErrorSeverity.ERROR,
534                                         "planned response to RPC.id = "
535                                                 + rpcResponse.getXid(),
536                                                 ErrorType.RPC,
537                                                 new Exception(
538                                                         "rpc response failed (planned behavior)")));
539             }
540             RpcResult<?> result = Rpcs.getRpcResult(successful,
541                     plannedRpcResponseValue, errors);
542             response.set(result);
543         } else {
544             String msg = "RpcResponse not expected: xid="
545                     + rpcResponse.getXid()
546                     + ", "
547                     + plannedRpcResponseValue.getClass()
548                     .getSimpleName();
549             LOG.error(msg);
550             occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + msg));
551         }
552
553         LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
554     }
555
556     /**
557      * @param arg0
558      *            rpc call content
559      * @return rpc future result
560      */
561     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
562             IN arg0) {
563         SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
564         rpcResults.put(arg0.getXid(), result);
565         return result;
566     }
567
568     /**
569      * @return rpc future result
570      */
571     private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
572         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
573         List<RpcError> errors = Collections.emptyList();
574         result.set(Rpcs.getRpcResult(true, (Void) null, errors));
575         return result;
576     }
577
578     /**
579      * @param eventPlan
580      *            the eventPlan to set
581      */
582     public void setEventPlan(Stack<? extends SwitchTestEvent> eventPlan) {
583         this.eventPlan = eventPlan;
584     }
585
586     /**
587      * @param proceedTimeout
588      *            max timeout for processing one planned event (in [ms])
589      */
590     public void setProceedTimeout(long proceedTimeout) {
591         this.proceedTimeout = proceedTimeout;
592     }
593
594     /**
595      * @return the occuredExceptions
596      */
597     public List<Exception> getOccuredExceptions() {
598         return occuredExceptions;
599     }
600
601     @Override
602     public void fireConnectionReadyNotification() {
603             connectionReadyListener.onConnectionReady();
604     }
605
606     @Override
607     public void setConnectionReadyListener(
608             ConnectionReadyListener connectionReadyListener) {
609         this.connectionReadyListener = connectionReadyListener;
610     }
611
612     @Override
613     public Future<RpcResult<Void>> sendMultipartRequestMessage(
614             SendMultipartRequestMessageInput arg0) {
615         checkRpcAndNext(arg0, "sendMultipartRequestMessage");
616         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
617         return result;
618     }
619 }