2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core.plan;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.HashMap;
15 import java.util.List;
17 import java.util.Stack;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.ScheduledThreadPoolExecutor;
21 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.sal.common.util.RpcErrors;
24 import org.opendaylight.controller.sal.common.util.Rpcs;
25 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
64 import org.opendaylight.yangtools.yang.binding.Notification;
65 import org.opendaylight.yangtools.yang.common.RpcError;
66 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
67 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
68 import org.opendaylight.yangtools.yang.common.RpcResult;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
72 import com.google.common.collect.Lists;
73 import com.google.common.util.concurrent.SettableFuture;
78 public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
80 /** notify and rpc-response thread start default delay in [ms] */
81 public static final int JOB_DELAY = 50;
83 protected static final Logger LOG = LoggerFactory
84 .getLogger(ConnectionAdapterStackImpl.class);
86 protected Stack<? extends SwitchTestEvent> eventPlan;
87 protected OpenflowProtocolListener ofListener;
88 protected SystemNotificationsListener systemListener;
90 protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
91 private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
93 protected boolean planTouched = false;
95 private long proceedTimeout;
97 protected List<Exception> occuredExceptions = new ArrayList<>();
102 public ConnectionAdapterStackImpl() {
107 public Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
108 checkRpc(arg0, "barrier");
109 SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
115 public Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
116 checkRpc(arg0, "echo");
117 Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
123 public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
124 checkRpc(arg0, "echoReply");
125 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
131 public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
132 checkRpc(arg0, "experimenter");
133 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
139 public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
140 checkRpc(arg0, "flowMod");
141 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
147 public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
148 checkRpc(arg0, "echo");
149 Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
155 public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
156 checkRpc(arg0, "echo");
157 Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
163 public Future<RpcResult<GetFeaturesOutput>> getFeatures(
164 GetFeaturesInput arg0) {
165 checkRpc(arg0, "getFeatures");
166 Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
172 public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
173 GetQueueConfigInput arg0) {
174 checkRpc(arg0, "echo");
175 Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
181 public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
182 checkRpc(arg0, "groupMod");
183 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
189 public Future<RpcResult<Void>> hello(HelloInput arg0) {
190 checkRpc(arg0, "helloReply");
191 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
197 public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
198 checkRpc(arg0, "meterMod");
199 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
205 public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
206 checkRpc(arg0, "packetOut");
207 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
213 public Future<RpcResult<Void>> portMod(PortModInput arg0) {
214 checkRpc(arg0, "portMod");
215 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
221 public Future<RpcResult<RoleRequestOutput>> roleRequest(
222 RoleRequestInput arg0) {
223 checkRpc(arg0, "echo");
224 Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
230 public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
231 checkRpc(arg0, "setAsync");
232 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
238 public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
239 checkRpc(arg0, "setConfig");
240 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
246 public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
247 checkRpc(arg0, "tableMod");
248 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
254 public Future<Boolean> disconnect() {
255 // TODO Auto-generated method stub
260 public boolean isAlive() {
266 public void setMessageListener(OpenflowProtocolListener ofListener) {
267 this.ofListener = ofListener;
271 public void checkListeners() {
272 if (ofListener == null || systemListener == null) {
274 .add(new IllegalStateException("missing listeners"));
279 public void setSystemListener(SystemNotificationsListener systemListener) {
280 this.systemListener = systemListener;
289 private synchronized void checkRpc(OfHeader rpcInput, String rpcName) {
291 LOG.debug("checking rpc: " + rpcName);
292 if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)) {
293 msg = "expected [rpc], got [" + rpcInput.getClass().getSimpleName()
296 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
298 if (!rpcName.equals(switchTestRpcEvent.getRpcName())) {
299 msg = "expected rpc name [" + switchTestRpcEvent.getRpcName()
300 + "], got [" + rpcName + "]";
301 } else if (!rpcInput.getXid().equals(switchTestRpcEvent.getXid())) {
302 msg = "expected xid [" + switchTestRpcEvent.getXid()
303 + "], got [" + rpcInput.getXid() + "]";
308 LOG.debug("check .. FAILED: " + msg);
309 occuredExceptions.add(new IllegalArgumentException(msg));
311 LOG.debug("check .. OK");
315 * discard current event, execute next, if possible
317 private synchronized void next() {
318 LOG.debug("STEPPING TO NEXT event in plan");
325 * start or continue processing plan
327 private synchronized void proceed() {
328 boolean processed = false;
329 LOG.debug("proceeding plan item: " + eventPlan.peek());
330 if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
331 SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
333 processNotification(notification);
335 } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
336 SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
338 processRpcResponse(rpcResponse);
346 LOG.debug("now waiting for HANDLER to act");
347 wait(proceedTimeout);
348 } catch (InterruptedException e) {
349 LOG.error(e.getMessage(), e);
356 LOG.debug("evenPlan STARTING ..");
357 while (!eventPlan.isEmpty()) {
361 occuredExceptions.add(new IllegalStateException(
362 "eventPlan STALLED"));
367 pool.awaitTermination(10 * JOB_DELAY, TimeUnit.MILLISECONDS);
368 } catch (InterruptedException e) {
369 LOG.error(e.getMessage(), e);
371 LOG.debug("eventPlan done");
375 * @param notificationEvent
377 private void processNotification(
378 final SwitchTestNotificationEvent notificationEvent) {
380 Callable<Void> notifyCmd = new Callable<Void>() {
382 public Void call() throws Exception {
383 Notification notification = notificationEvent
384 .getPlannedNotification();
385 LOG.debug("notificating HANDLER: "
386 + notification.getClass().getSimpleName());
389 if (notification instanceof DisconnectEvent) {
391 .onDisconnectEvent((DisconnectEvent) notification);
394 else if (notification instanceof EchoRequestMessage) {
396 .onEchoRequestMessage((EchoRequestMessage) notification);
397 } else if (notification instanceof ErrorMessage) {
398 ofListener.onErrorMessage((ErrorMessage) notification);
399 } else if (notification instanceof ExperimenterMessage) {
401 .onExperimenterMessage((ExperimenterMessage) notification);
402 } else if (notification instanceof FlowRemovedMessage) {
404 .onFlowRemovedMessage((FlowRemovedMessage) notification);
405 } else if (notification instanceof HelloMessage) {
406 ofListener.onHelloMessage((HelloMessage) notification);
407 } else if (notification instanceof MultipartReplyMessage) {
409 .onMultipartReplyMessage((MultipartReplyMessage) notification);
410 } else if (notification instanceof MultipartRequestMessage) {
412 .onMultipartRequestMessage((MultipartRequestMessage) notification);
413 } else if (notification instanceof PacketInMessage) {
415 .onPacketInMessage((PacketInMessage) notification);
416 } else if (notification instanceof PortStatusMessage) {
418 .onPortStatusMessage((PortStatusMessage) notification);
422 occuredExceptions.add(new IllegalStateException(
423 "message listening not supported for type: "
424 + notification.getClass()));
427 LOG.debug("thread finished");
433 pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
439 private void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
440 Callable<Void> notifyCmd = new Callable<Void>() {
442 public Void call() throws Exception {
444 OfHeader plannedRpcResponseValue = rpcResponse
445 .getPlannedRpcResponse();
446 LOG.debug("rpc-responding to HANDLER: " + rpcResponse.getXid());
448 @SuppressWarnings("unchecked")
449 SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
450 .get(rpcResponse.getXid());
452 if (response != null) {
453 boolean successful = plannedRpcResponseValue != null;
454 Collection<RpcError> errors;
456 errors = Collections.emptyList();
459 .newArrayList(RpcErrors
465 "planned response to RPC.id = "
466 + rpcResponse.getXid(),
469 "rpc response failed (planned behavior)")));
471 RpcResult<?> result = Rpcs.getRpcResult(successful,
472 plannedRpcResponseValue, errors);
473 response.set(result);
475 String msg = "RpcResponse not expected: xid="
476 + rpcResponse.getXid()
478 + plannedRpcResponseValue.getClass()
481 occuredExceptions.add(new IllegalStateException(msg));
484 LOG.debug("thread finished");
489 pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
495 * @return rpc future result
497 private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
499 SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
500 rpcResults.put(arg0.getXid(), result);
505 * @return rpc future result
507 private static SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
508 SettableFuture<RpcResult<Void>> result = SettableFuture.create();
515 * the eventPlan to set
517 public void setEventPlan(Stack<? extends SwitchTestEvent> eventPlan) {
518 this.eventPlan = eventPlan;
522 * @param proceedTimeout
523 * max timeout for processing one planned event (in [ms])
525 public void setProceedTimeout(long proceedTimeout) {
526 this.proceedTimeout = proceedTimeout;
530 * @return the occuredExceptions
532 public List<Exception> getOccuredExceptions() {
533 return occuredExceptions;