bug 2446 - High priority (control) queue stop reading from channel if is full
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / plan / ConnectionAdapterStackImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core.plan;
10
11 import java.net.InetSocketAddress;
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Set;
19 import java.util.Stack;
20 import java.util.concurrent.Future;
21
22 import org.opendaylight.controller.sal.common.util.RpcErrors;
23 import org.opendaylight.controller.sal.common.util.Rpcs;
24 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
25 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
65 import org.opendaylight.yangtools.yang.binding.Notification;
66 import org.opendaylight.yangtools.yang.common.RpcError;
67 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
68 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
69 import org.opendaylight.yangtools.yang.common.RpcResult;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 import com.google.common.base.Joiner;
74 import com.google.common.collect.Lists;
75 import com.google.common.util.concurrent.SettableFuture;
76
77 /**
78  * @author mirehak
79  */
80 public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
81
82     /** notify and rpc-response thread start default delay in [ms] */
83     public static final int JOB_DELAY = 50;
84
85     protected static final Logger LOG = LoggerFactory
86             .getLogger(ConnectionAdapterStackImpl.class);
87
88     protected Stack<? extends SwitchTestEvent> eventPlan;
89     protected OpenflowProtocolListener ofListener;
90     protected SystemNotificationsListener systemListener;
91
92     protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
93     protected boolean planTouched = false;
94
95     private long proceedTimeout;
96
97     protected List<Exception> occuredExceptions = new ArrayList<>();
98
99     private ConnectionReadyListener connectionReadyListener;
100
101     private int planItemCounter;
102
103     private boolean autoRead = true;
104
105     /**
106      * default ctor
107      */
108     public ConnectionAdapterStackImpl() {
109         // do nothing
110     }
111
112     @Override
113     public synchronized Future<RpcResult<BarrierOutput>> barrier(
114             BarrierInput arg0) {
115         checkRpcAndNext(arg0, "barrier");
116         SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
117         return result;
118     }
119
120     @Override
121     public synchronized Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
122         checkRpcAndNext(arg0, "echo");
123         Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
124         return result;
125     }
126
127     @Override
128     public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
129         checkRpcAndNext(arg0, "echoReply");
130         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
131         return result;
132     }
133
134     @Override
135     public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
136         checkRpcAndNext(arg0, "experimenter");
137         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
138         return result;
139     }
140
141     @Override
142     public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
143         checkRpcAndNext(arg0, "flowMod");
144         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
145         return result;
146     }
147
148     @Override
149     public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(
150             GetAsyncInput arg0) {
151         checkRpcAndNext(arg0, "echo");
152         Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
153         return result;
154     }
155
156     @Override
157     public synchronized Future<RpcResult<GetConfigOutput>> getConfig(
158             GetConfigInput arg0) {
159         checkRpcAndNext(arg0, "echo");
160         Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
161         return result;
162     }
163
164     @Override
165     public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
166             GetFeaturesInput arg0) {
167         checkRpcAndNext(arg0, "getFeatures");
168         Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
169         return result;
170     }
171
172     @Override
173     public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
174             GetQueueConfigInput arg0) {
175         checkRpcAndNext(arg0, "echo");
176         Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
177         return result;
178     }
179
180     @Override
181     public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
182         checkRpcAndNext(arg0, "groupMod");
183         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
184         return result;
185     }
186
187     @Override
188     public Future<RpcResult<Void>> hello(HelloInput arg0) {
189         checkRpcAndNext(arg0, "helloReply");
190         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
191         return result;
192     }
193
194     @Override
195     public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
196         checkRpcAndNext(arg0, "meterMod");
197         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
198         return result;
199     }
200
201     @Override
202     public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
203         checkRpcAndNext(arg0, "packetOut");
204         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
205         return result;
206     }
207
208     @Override
209     public Future<RpcResult<Void>> portMod(PortModInput arg0) {
210         checkRpcAndNext(arg0, "portMod");
211         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
212         return result;
213     }
214
215     @Override
216     public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
217             RoleRequestInput arg0) {
218         checkRpcAndNext(arg0, "echo");
219         Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
220         return result;
221     }
222
223     @Override
224     public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
225         checkRpcAndNext(arg0, "setAsync");
226         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
227         return result;
228     }
229
230     @Override
231     public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
232         checkRpcAndNext(arg0, "setConfig");
233         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
234         return result;
235     }
236
237     @Override
238     public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
239         checkRpcAndNext(arg0, "tableMod");
240         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
241         return result;
242     }
243
244     @Override
245     public Future<Boolean> disconnect() {
246         LOG.debug("adapter is told to disconnect");
247         DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
248         disconnectEventBuilder.setInfo("disconnected by plugin");
249         systemListener.onDisconnectEvent(disconnectEventBuilder.build());
250         return null;
251     }
252
253     @Override
254     public boolean isAlive() {
255         // TODO make dynamic
256         return true;
257     }
258
259     @Override
260     public void setMessageListener(OpenflowProtocolListener ofListener) {
261         this.ofListener = ofListener;
262     }
263
264     @Override
265     public void checkListeners() {
266         if (ofListener == null || systemListener == null
267                 || connectionReadyListener == null) {
268             occuredExceptions
269                     .add(new IllegalStateException("missing listeners"));
270         }
271     }
272
273     @Override
274     public void setSystemListener(SystemNotificationsListener systemListener) {
275         this.systemListener = systemListener;
276     }
277
278     /**
279      * @param rpcInput
280      *            rpc call parameter
281      * @param rpcName
282      *            rpc yang name
283      */
284     private boolean checkRpc(OfHeader rpcInput, String rpcName) {
285         String msg = null;
286         boolean finished = true;
287
288         if (eventPlan.isEmpty()) {
289             throw new IllegalStateException("eventPlan already depleted");
290         }
291
292         LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName,
293                 rpcInput.getVersion(), rpcInput.getXid());
294         if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
295                 && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
296             if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
297                 SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan
298                         .peek());
299                 msg = "expected [notification: "
300                         + notifEvent.getPlannedNotification() + "], got ["
301                         + rpcInput.getClass().getSimpleName() + "]";
302             } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
303                 SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan
304                         .peek());
305                 msg = "expected [rpc: " + rpcEvent.getPlannedRpcResponse()
306                         + "], got [" + rpcInput.getClass().getSimpleName()
307                         + "]";
308             }
309         } else {
310             if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
311                 SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
312                         .peek();
313                 Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll
314                         .getWaitEventBag();
315                 List<String> msgLot = new ArrayList<>();
316
317                 if (eventBag == null || eventBag.isEmpty()) {
318                     msg = "no wait events in bag";
319                 } else {
320                     finished = false;
321                     for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
322                         String msgPart = checkSingleRpcContent(rpcInput,
323                                 rpcName, switchTestWaitForRpc);
324
325                         if (msgPart != null) {
326                             msgLot.add(msgPart);
327                         } else {
328                             LOG.debug("wait event matched: {}", rpcName);
329                             eventBag.remove(switchTestWaitForRpc);
330                             if (eventBag.isEmpty()) {
331                                 finished = true;
332                             }
333                             msgLot.clear();
334                             break;
335                         }
336                     }
337                 }
338
339                 if (!msgLot.isEmpty()) {
340                     msg = Joiner.on(" | ").join(msgLot);
341                 }
342             } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
343                 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
344                         .peek();
345                 msg = checkSingleRpcContent(rpcInput, rpcName,
346                         switchTestRpcEvent);
347             }
348         }
349
350         if (msg != null) {
351             LOG.debug("rpc check .. FAILED: " + msg);
352             occuredExceptions.add(new IllegalArgumentException("step:"
353                     + planItemCounter + " | " + msg));
354         } else {
355             LOG.debug("rpc check .. OK");
356         }
357         return finished;
358     }
359
360     /**
361      * @param rpcInput
362      * @param rpcName
363      * @param msgTmp
364      * @param switchTestWaitForRpc
365      * @return
366      */
367     private static String checkSingleRpcContent(OfHeader rpcInput,
368             String rpcName, SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
369         String failureMsg = null;
370         if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
371             failureMsg = "expected rpc name ["
372                     + switchTestWaitForRpc.getRpcName() + "], got [" + rpcName
373                     + "]";
374         } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
375             failureMsg = "expected " + rpcName + ".xid ["
376                     + switchTestWaitForRpc.getXid() + "], got ["
377                     + rpcInput.getXid() + "]";
378         }
379
380         return failureMsg;
381     }
382
383     /**
384      * @param rpcInput
385      *            rpc call parameter
386      * @param rpcName
387      *            rpc yang name
388      */
389     private synchronized void checkRpcAndNext(OfHeader rpcInput, String rpcName) {
390         boolean finished = checkRpc(rpcInput, rpcName);
391         if (finished) {
392             next();
393         }
394     }
395
396     /**
397      * discard current event, execute next, if possible
398      */
399     private void next() {
400         LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})",
401                 planItemCounter, eventPlan.peek());
402         eventPlan.pop();
403         planItemCounter++;
404         planTouched = true;
405         try {
406             Thread.sleep(JOB_DELAY);
407         } catch (InterruptedException e) {
408             LOG.error(e.getMessage(), e);
409         }
410         notify();
411     }
412
413     /**
414      * start or continue processing plan
415      */
416     private synchronized void proceed() {
417         boolean processed = false;
418         LOG.debug("proceeding plan item[{}]: {}", planItemCounter,
419                 eventPlan.peek());
420         if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
421             SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
422                     .peek();
423             processNotification(notification);
424             processed = true;
425         } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
426             SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
427                     .peek();
428             processRpcResponse(rpcResponse);
429             processed = true;
430         } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
431             SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan
432                     .peek();
433             try {
434                 callbackEvent.getCallback().call();
435             } catch (Exception e) {
436                 LOG.error(e.getMessage(), e);
437                 occuredExceptions.add(e);
438             }
439             processed = true;
440         }
441
442         if (processed) {
443             next();
444         } else {
445             try {
446                 LOG.debug("now WAITING for OF_LISTENER to act ..");
447                 wait(proceedTimeout);
448             } catch (InterruptedException e) {
449                 LOG.error(e.getMessage(), e);
450             }
451         }
452     }
453
454     @Override
455     public void run() {
456         LOG.debug("|---> evenPlan STARTING ..");
457         planItemCounter = 0;
458         while (!eventPlan.isEmpty()) {
459             planTouched = false;
460             proceed();
461             if (!planTouched) {
462                 occuredExceptions
463                         .add(new IllegalStateException(
464                                 "eventPlan STALLED, planItemCounter="
465                                         + planItemCounter));
466                 break;
467             }
468         }
469
470         try {
471             Thread.sleep(JOB_DELAY);
472         } catch (InterruptedException e) {
473             LOG.error(e.getMessage(), e);
474         }
475         LOG.debug("<---| eventPlan DONE");
476     }
477
478     /**
479      * @param notificationEvent
480      */
481     private synchronized void processNotification(
482             final SwitchTestNotificationEvent notificationEvent) {
483
484         Notification notification = notificationEvent.getPlannedNotification();
485         LOG.debug("notificating OF_LISTENER: "
486                 + notification.getClass().getSimpleName());
487
488         // system events
489         if (notification instanceof DisconnectEvent) {
490             systemListener.onDisconnectEvent((DisconnectEvent) notification);
491         }
492         // of notifications
493         else if (notification instanceof EchoRequestMessage) {
494             ofListener.onEchoRequestMessage((EchoRequestMessage) notification);
495         } else if (notification instanceof ErrorMessage) {
496             ofListener.onErrorMessage((ErrorMessage) notification);
497         } else if (notification instanceof ExperimenterMessage) {
498             ofListener
499                     .onExperimenterMessage((ExperimenterMessage) notification);
500         } else if (notification instanceof FlowRemovedMessage) {
501             ofListener.onFlowRemovedMessage((FlowRemovedMessage) notification);
502         } else if (notification instanceof HelloMessage) {
503             ofListener.onHelloMessage((HelloMessage) notification);
504         } else if (notification instanceof MultipartReplyMessage) {
505             ofListener
506                     .onMultipartReplyMessage((MultipartReplyMessage) notification);
507         } else if (notification instanceof PacketInMessage) {
508             ofListener.onPacketInMessage((PacketInMessage) notification);
509         } else if (notification instanceof PortStatusMessage) {
510             ofListener.onPortStatusMessage((PortStatusMessage) notification);
511         }
512         // default
513         else {
514             occuredExceptions.add(new IllegalStateException("step:"
515                     + planItemCounter + " | "
516                     + "message listening not supported for type: "
517                     + notification.getClass()));
518         }
519
520         LOG.debug("notification [" + notification.getClass().getSimpleName()
521                 + "] .. done");
522     }
523
524     /**
525      * @param rpcResponse
526      */
527     private synchronized void processRpcResponse(
528             final SwitchTestRcpResponseEvent rpcResponse) {
529         OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
530         LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
531
532         @SuppressWarnings("unchecked")
533         SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
534                 .get(rpcResponse.getXid());
535
536         if (response != null) {
537             boolean successful = plannedRpcResponseValue != null;
538             Collection<RpcError> errors;
539             if (successful) {
540                 errors = Collections.emptyList();
541             } else {
542                 errors = Lists.newArrayList(RpcErrors.getRpcError("unit",
543                         "unit", "not requested", ErrorSeverity.ERROR,
544                         "planned response to RPC.id = " + rpcResponse.getXid(),
545                         ErrorType.RPC, new Exception(
546                                 "rpc response failed (planned behavior)")));
547             }
548             RpcResult<?> result = Rpcs.getRpcResult(successful,
549                     plannedRpcResponseValue, errors);
550             response.set(result);
551         } else {
552             String msg = "RpcResponse not expected: xid="
553                     + rpcResponse.getXid() + ", "
554                     + plannedRpcResponseValue.getClass().getSimpleName();
555             LOG.error(msg);
556             occuredExceptions.add(new IllegalStateException("step:"
557                     + planItemCounter + " | " + msg));
558         }
559
560         LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
561     }
562
563     /**
564      * @param arg0
565      *            rpc call content
566      * @return rpc future result
567      */
568     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
569             IN arg0) {
570         SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
571         rpcResults.put(arg0.getXid(), result);
572         return result;
573     }
574
575     /**
576      * @return rpc future result
577      */
578     private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
579         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
580         List<RpcError> errors = Collections.emptyList();
581         result.set(Rpcs.getRpcResult(true, (Void) null, errors));
582         return result;
583     }
584
585     /**
586      * @param eventPlan
587      *            the eventPlan to set
588      */
589     public void setEventPlan(Stack<? extends SwitchTestEvent> eventPlan) {
590         this.eventPlan = eventPlan;
591     }
592
593     /**
594      * @param proceedTimeout
595      *            max timeout for processing one planned event (in [ms])
596      */
597     public void setProceedTimeout(long proceedTimeout) {
598         this.proceedTimeout = proceedTimeout;
599     }
600
601     /**
602      * @return the occuredExceptions
603      */
604     public List<Exception> getOccuredExceptions() {
605         return occuredExceptions;
606     }
607
608     @Override
609     public void fireConnectionReadyNotification() {
610         connectionReadyListener.onConnectionReady();
611     }
612
613     @Override
614     public void setConnectionReadyListener(
615             ConnectionReadyListener connectionReadyListener) {
616         this.connectionReadyListener = connectionReadyListener;
617     }
618
619     @Override
620     public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput arg0) {
621         checkRpcAndNext(arg0, "multipartRequestInput");
622         SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
623         return result;
624     }
625
626     @Override
627     public InetSocketAddress getRemoteAddress() {
628         // TODO Auto-generated method stub
629         return null;
630     }
631
632     @Override
633     public boolean isAutoRead() {
634         return autoRead;
635     }
636
637     @Override
638     public void setAutoRead(boolean autoRead) {
639         this.autoRead = autoRead;
640     }
641
642 }