fixed API change initiated by fix in library
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / plan / ConnectionAdapterStackImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core.plan;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.HashMap;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Set;
18 import java.util.Stack;
19 import java.util.concurrent.Future;
20
21 import org.opendaylight.controller.sal.common.util.RpcErrors;
22 import org.opendaylight.controller.sal.common.util.Rpcs;
23 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
24 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
64 import org.opendaylight.yangtools.yang.binding.Notification;
65 import org.opendaylight.yangtools.yang.common.RpcError;
66 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
67 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
68 import org.opendaylight.yangtools.yang.common.RpcResult;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 import com.google.common.base.Joiner;
73 import com.google.common.collect.Lists;
74 import com.google.common.util.concurrent.SettableFuture;
75
76 /**
77  * @author mirehak
78  */
79 public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
80
81     /** notify and rpc-response thread start default delay in [ms] */
82     public static final int JOB_DELAY = 50;
83
84     protected static final Logger LOG = LoggerFactory
85             .getLogger(ConnectionAdapterStackImpl.class);
86
87     protected Stack<? extends SwitchTestEvent> eventPlan;
88     protected OpenflowProtocolListener ofListener;
89     protected SystemNotificationsListener systemListener;
90
91     protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
92     protected boolean planTouched = false;
93
94     private long proceedTimeout;
95
96     protected List<Exception> occuredExceptions = new ArrayList<>();
97
98     private ConnectionReadyListener connectionReadyListener;
99
100     private int planItemCounter;
101
102     /**
103      * default ctor
104      */
105     public ConnectionAdapterStackImpl() {
106         // do nothing
107     }
108
109     @Override
110     public synchronized Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
111         checkRpcAndNext(arg0, "barrier");
112         SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
113         return result;
114     }
115
116     @Override
117     public synchronized Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
118         checkRpcAndNext(arg0, "echo");
119         Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
120         return result;
121     }
122
123     @Override
124     public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
125         checkRpcAndNext(arg0, "echoReply");
126         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
127         return result;
128     }
129
130     @Override
131     public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
132         checkRpcAndNext(arg0, "experimenter");
133         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
134         return result;
135     }
136
137     @Override
138     public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
139         checkRpcAndNext(arg0, "flowMod");
140         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
141         return result;
142     }
143
144     @Override
145     public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
146         checkRpcAndNext(arg0, "echo");
147         Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
148         return result;
149     }
150
151     @Override
152     public synchronized Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
153         checkRpcAndNext(arg0, "echo");
154         Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
155         return result;
156     }
157
158     @Override
159     public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
160             GetFeaturesInput arg0) {
161         checkRpcAndNext(arg0, "getFeatures");
162         Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
163         return result;
164     }
165
166     @Override
167     public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
168             GetQueueConfigInput arg0) {
169         checkRpcAndNext(arg0, "echo");
170         Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
171         return result;
172     }
173
174     @Override
175     public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
176         checkRpcAndNext(arg0, "groupMod");
177         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
178         return result;
179     }
180
181     @Override
182     public Future<RpcResult<Void>> hello(HelloInput arg0) {
183         checkRpcAndNext(arg0, "helloReply");
184         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
185         return result;
186     }
187
188     @Override
189     public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
190         checkRpcAndNext(arg0, "meterMod");
191         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
192         return result;
193     }
194
195     @Override
196     public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
197         checkRpcAndNext(arg0, "packetOut");
198         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
199         return result;
200     }
201
202     @Override
203     public Future<RpcResult<Void>> portMod(PortModInput arg0) {
204         checkRpcAndNext(arg0, "portMod");
205         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
206         return result;
207     }
208
209     @Override
210     public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
211             RoleRequestInput arg0) {
212         checkRpcAndNext(arg0, "echo");
213         Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
214         return result;
215     }
216
217     @Override
218     public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
219         checkRpcAndNext(arg0, "setAsync");
220         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
221         return result;
222     }
223
224     @Override
225     public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
226         checkRpcAndNext(arg0, "setConfig");
227         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
228         return result;
229     }
230
231     @Override
232     public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
233         checkRpcAndNext(arg0, "tableMod");
234         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
235         return result;
236     }
237
238     @Override
239     public Future<Boolean> disconnect() {
240         LOG.info("adapter is told to disconnect");
241         DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
242         disconnectEventBuilder.setInfo("disconnected by plugin");
243         systemListener.onDisconnectEvent(disconnectEventBuilder.build());
244         return null;
245     }
246
247     @Override
248     public boolean isAlive() {
249         // TODO make dynamic
250         return true;
251     }
252
253     @Override
254     public void setMessageListener(OpenflowProtocolListener ofListener) {
255         this.ofListener = ofListener;
256     }
257
258     @Override
259     public void checkListeners() {
260         if (ofListener == null || systemListener == null || connectionReadyListener == null) {
261             occuredExceptions
262                     .add(new IllegalStateException("missing listeners"));
263         }
264     }
265
266     @Override
267     public void setSystemListener(SystemNotificationsListener systemListener) {
268         this.systemListener = systemListener;
269     }
270
271     /**
272      * @param rpcInput
273      *            rpc call parameter
274      * @param rpcName
275      *            rpc yang name
276      */
277     private boolean checkRpc(OfHeader rpcInput, String rpcName) {
278         String msg = null;
279         boolean finished = true;
280
281         if (eventPlan.isEmpty()) {
282             throw new IllegalStateException("eventPlan already depleted");
283         }
284
285         LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, rpcInput.getVersion(), rpcInput.getXid());
286         if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
287                 && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
288             if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
289                 SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
290                 msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
291                         + "]";
292             } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
293                 SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan.peek());
294                 msg = "expected [rpc: " +rpcEvent.getPlannedRpcResponse()+ "], got [" + rpcInput.getClass().getSimpleName()
295                         + "]";
296             }
297         } else {
298             if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
299                 SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
300                         .peek();
301                 Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll.getWaitEventBag();
302                 List<String> msgLot = new ArrayList<>();
303
304                 if (eventBag == null || eventBag.isEmpty()) {
305                     msg = "no wait events in bag";
306                 } else {
307                     finished = false;
308                     for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
309                         String msgPart = checkSingleRpcContent(rpcInput, rpcName, switchTestWaitForRpc);
310
311                         if (msgPart != null) {
312                             msgLot.add(msgPart);
313                         } else {
314                             LOG.debug("wait event matched: {}", rpcName);
315                             eventBag.remove(switchTestWaitForRpc);
316                             if (eventBag.isEmpty()) {
317                                 finished = true;
318                             }
319                             msgLot.clear();
320                             break;
321                         }
322                     }
323                 }
324
325                 if (!msgLot.isEmpty()) {
326                     msg = Joiner.on(" | ").join(msgLot);
327                 }
328             } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
329                 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
330                         .peek();
331                 msg = checkSingleRpcContent(rpcInput, rpcName, switchTestRpcEvent);
332             }
333         }
334
335         if (msg != null) {
336             LOG.debug("rpc check .. FAILED: " + msg);
337             occuredExceptions.add(new IllegalArgumentException("step:"+planItemCounter+" | "+msg));
338         } else {
339             LOG.debug("rpc check .. OK");
340         }
341         return finished;
342     }
343
344     /**
345      * @param rpcInput
346      * @param rpcName
347      * @param msgTmp
348      * @param switchTestWaitForRpc
349      * @return
350      */
351     private static String checkSingleRpcContent(OfHeader rpcInput, String rpcName,
352             SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
353         String failureMsg = null;
354         if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
355             failureMsg = "expected rpc name [" + switchTestWaitForRpc.getRpcName()
356                     + "], got [" + rpcName + "]";
357         } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
358             failureMsg = "expected "+rpcName+".xid [" + switchTestWaitForRpc.getXid()
359                     + "], got [" + rpcInput.getXid() + "]";
360         }
361
362         return failureMsg;
363     }
364
365     /**
366      * @param rpcInput
367      *            rpc call parameter
368      * @param rpcName
369      *            rpc yang name
370      */
371     private synchronized void checkRpcAndNext(OfHeader rpcInput, String rpcName) {
372         boolean finished = checkRpc(rpcInput, rpcName);
373         if (finished) {
374             next();
375         }
376     }
377
378     /**
379      * discard current event, execute next, if possible
380      */
381     private void next() {
382         LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
383         eventPlan.pop();
384         planItemCounter ++;
385         planTouched = true;
386         try {
387             Thread.sleep(JOB_DELAY);
388         } catch (InterruptedException e) {
389             LOG.error(e.getMessage(), e);
390         }
391         notify();
392     }
393
394     /**
395      * start or continue processing plan
396      */
397     private synchronized void proceed() {
398         boolean processed = false;
399         LOG.debug("proceeding plan item[{}]: {}", planItemCounter, eventPlan.peek());
400         if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
401             SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
402                     .peek();
403             processNotification(notification);
404             processed = true;
405         } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
406             SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
407                     .peek();
408             processRpcResponse(rpcResponse);
409             processed = true;
410         } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
411             SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan.peek();
412             try {
413                 callbackEvent.getCallback().call();
414             } catch (Exception e) {
415                 LOG.error(e.getMessage(), e);
416                occuredExceptions.add(e);
417             }
418             processed = true;
419         }
420
421         if (processed) {
422             next();
423         } else {
424             try {
425                 LOG.debug("now WAITING for OF_LISTENER to act ..");
426                 wait(proceedTimeout);
427             } catch (InterruptedException e) {
428                 LOG.error(e.getMessage(), e);
429             }
430         }
431     }
432
433     @Override
434     public void run() {
435         LOG.debug("|---> evenPlan STARTING ..");
436         planItemCounter = 0;
437         while (!eventPlan.isEmpty()) {
438             planTouched = false;
439             proceed();
440             if (!planTouched) {
441                 occuredExceptions.add(new IllegalStateException(
442                         "eventPlan STALLED, planItemCounter="+planItemCounter));
443                 break;
444             }
445         }
446
447         try {
448             Thread.sleep(JOB_DELAY);
449         } catch (InterruptedException e) {
450             LOG.error(e.getMessage(), e);
451         }
452         LOG.debug("<---| eventPlan DONE");
453     }
454
455     /**
456      * @param notificationEvent
457      */
458     private synchronized void processNotification(
459             final SwitchTestNotificationEvent notificationEvent) {
460
461         Notification notification = notificationEvent
462                 .getPlannedNotification();
463         LOG.debug("notificating OF_LISTENER: "
464                 + notification.getClass().getSimpleName());
465
466         // system events
467         if (notification instanceof DisconnectEvent) {
468             systemListener
469             .onDisconnectEvent((DisconnectEvent) notification);
470         }
471         // of notifications
472         else if (notification instanceof EchoRequestMessage) {
473             ofListener
474             .onEchoRequestMessage((EchoRequestMessage) notification);
475         } else if (notification instanceof ErrorMessage) {
476             ofListener.onErrorMessage((ErrorMessage) notification);
477         } else if (notification instanceof ExperimenterMessage) {
478             ofListener
479             .onExperimenterMessage((ExperimenterMessage) notification);
480         } else if (notification instanceof FlowRemovedMessage) {
481             ofListener
482             .onFlowRemovedMessage((FlowRemovedMessage) notification);
483         } else if (notification instanceof HelloMessage) {
484             ofListener.onHelloMessage((HelloMessage) notification);
485         } else if (notification instanceof MultipartReplyMessage) {
486             ofListener
487             .onMultipartReplyMessage((MultipartReplyMessage) notification);
488         } else if (notification instanceof PacketInMessage) {
489             ofListener
490             .onPacketInMessage((PacketInMessage) notification);
491         } else if (notification instanceof PortStatusMessage) {
492             ofListener
493             .onPortStatusMessage((PortStatusMessage) notification);
494         }
495         // default
496         else {
497             occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " +
498                     "message listening not supported for type: "
499                             + notification.getClass()));
500         }
501
502         LOG.debug("notification ["+notification.getClass().getSimpleName()+"] .. done");
503     }
504
505     /**
506      * @param rpcResponse
507      */
508     private synchronized void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
509         OfHeader plannedRpcResponseValue = rpcResponse
510                 .getPlannedRpcResponse();
511         LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
512
513         @SuppressWarnings("unchecked")
514         SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
515         .get(rpcResponse.getXid());
516
517         if (response != null) {
518             boolean successful = plannedRpcResponseValue != null;
519             Collection<RpcError> errors;
520             if (successful) {
521                 errors = Collections.emptyList();
522             } else {
523                 errors = Lists
524                         .newArrayList(RpcErrors
525                                 .getRpcError(
526                                         "unit",
527                                         "unit",
528                                         "not requested",
529                                         ErrorSeverity.ERROR,
530                                         "planned response to RPC.id = "
531                                                 + rpcResponse.getXid(),
532                                                 ErrorType.RPC,
533                                                 new Exception(
534                                                         "rpc response failed (planned behavior)")));
535             }
536             RpcResult<?> result = Rpcs.getRpcResult(successful,
537                     plannedRpcResponseValue, errors);
538             response.set(result);
539         } else {
540             String msg = "RpcResponse not expected: xid="
541                     + rpcResponse.getXid()
542                     + ", "
543                     + plannedRpcResponseValue.getClass()
544                     .getSimpleName();
545             LOG.error(msg);
546             occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + msg));
547         }
548
549         LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
550     }
551
552     /**
553      * @param arg0
554      *            rpc call content
555      * @return rpc future result
556      */
557     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
558             IN arg0) {
559         SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
560         rpcResults.put(arg0.getXid(), result);
561         return result;
562     }
563
564     /**
565      * @return rpc future result
566      */
567     private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
568         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
569         List<RpcError> errors = Collections.emptyList();
570         result.set(Rpcs.getRpcResult(true, (Void) null, errors));
571         return result;
572     }
573
574     /**
575      * @param eventPlan
576      *            the eventPlan to set
577      */
578     public void setEventPlan(Stack<? extends SwitchTestEvent> eventPlan) {
579         this.eventPlan = eventPlan;
580     }
581
582     /**
583      * @param proceedTimeout
584      *            max timeout for processing one planned event (in [ms])
585      */
586     public void setProceedTimeout(long proceedTimeout) {
587         this.proceedTimeout = proceedTimeout;
588     }
589
590     /**
591      * @return the occuredExceptions
592      */
593     public List<Exception> getOccuredExceptions() {
594         return occuredExceptions;
595     }
596
597     @Override
598     public void fireConnectionReadyNotification() {
599             connectionReadyListener.onConnectionReady();
600     }
601
602     @Override
603     public void setConnectionReadyListener(
604             ConnectionReadyListener connectionReadyListener) {
605         this.connectionReadyListener = connectionReadyListener;
606     }
607
608     @Override
609     public Future<RpcResult<Void>> multipartRequest(
610             MultipartRequestInput arg0) {
611         checkRpcAndNext(arg0, "multipartRequestInput");
612         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
613         return result;
614     }
615 }