2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core.plan;
11 import com.google.common.base.Joiner;
12 import com.google.common.collect.Lists;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.net.InetSocketAddress;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.List;
24 import java.util.Stack;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import org.opendaylight.controller.sal.common.util.RpcErrors;
29 import org.opendaylight.controller.sal.common.util.Rpcs;
30 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
31 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
32 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
33 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
34 import org.opendaylight.openflowjava.protocol.api.extensibility.AlienMessageListener;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
74 import org.opendaylight.yangtools.yang.binding.Notification;
75 import org.opendaylight.yangtools.yang.common.RpcError;
76 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
77 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
78 import org.opendaylight.yangtools.yang.common.RpcResult;
79 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
86 public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
88 /** notify and rpc-response thread start default delay in [ms] */
89 public static final int JOB_DELAY = 50;
91 protected static final Logger LOG = LoggerFactory
92 .getLogger(ConnectionAdapterStackImpl.class);
94 protected Stack<? extends SwitchTestEvent> eventPlan;
95 protected OpenflowProtocolListener ofListener;
96 protected SystemNotificationsListener systemListener;
98 protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
99 protected boolean planTouched = false;
101 private long proceedTimeout;
103 protected List<Exception> occuredExceptions = new ArrayList<>();
105 private ConnectionReadyListener connectionReadyListener;
107 private int planItemCounter;
109 private boolean autoRead = true;
110 private final ExecutorService pool;
111 private boolean packetInFiltering;
117 public ConnectionAdapterStackImpl() {
118 pool = Executors.newSingleThreadExecutor();
122 public synchronized Future<RpcResult<BarrierOutput>> barrier(
123 final BarrierInput arg0) {
124 checkRpcAndNext(arg0, "barrier");
125 SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
130 public synchronized Future<RpcResult<EchoOutput>> echo(final EchoInput arg0) {
131 checkRpcAndNext(arg0, "echo");
132 Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
137 public Future<RpcResult<Void>> echoReply(final EchoReplyInput arg0) {
138 checkRpcAndNext(arg0, "echoReply");
139 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
144 public Future<RpcResult<Void>> experimenter(final ExperimenterInput arg0) {
145 checkRpcAndNext(arg0, "experimenter");
146 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
151 public Future<RpcResult<Void>> flowMod(final FlowModInput arg0) {
152 checkRpcAndNext(arg0, "flowMod");
153 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
158 public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(
159 final GetAsyncInput arg0) {
160 checkRpcAndNext(arg0, "echo");
161 Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
166 public synchronized Future<RpcResult<GetConfigOutput>> getConfig(
167 final GetConfigInput arg0) {
168 checkRpcAndNext(arg0, "echo");
169 Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
174 public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
175 final GetFeaturesInput arg0) {
176 checkRpcAndNext(arg0, "getFeatures");
177 Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
182 public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
183 final GetQueueConfigInput arg0) {
184 checkRpcAndNext(arg0, "echo");
185 Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
190 public Future<RpcResult<Void>> groupMod(final GroupModInput arg0) {
191 checkRpcAndNext(arg0, "groupMod");
192 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
197 public Future<RpcResult<Void>> hello(final HelloInput arg0) {
198 checkRpcAndNext(arg0, "helloReply");
199 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
204 public Future<RpcResult<Void>> meterMod(final MeterModInput arg0) {
205 checkRpcAndNext(arg0, "meterMod");
206 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
211 public Future<RpcResult<Void>> packetOut(final PacketOutInput arg0) {
212 checkRpcAndNext(arg0, "packetOut");
213 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
218 public Future<RpcResult<Void>> portMod(final PortModInput arg0) {
219 checkRpcAndNext(arg0, "portMod");
220 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
225 public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
226 final RoleRequestInput arg0) {
227 checkRpcAndNext(arg0, "echo");
228 Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
233 public Future<RpcResult<Void>> setAsync(final SetAsyncInput arg0) {
234 checkRpcAndNext(arg0, "setAsync");
235 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
240 public Future<RpcResult<Void>> setConfig(final SetConfigInput arg0) {
241 checkRpcAndNext(arg0, "setConfig");
242 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
247 public Future<RpcResult<Void>> tableMod(final TableModInput arg0) {
248 checkRpcAndNext(arg0, "tableMod");
249 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
254 public Future<Boolean> disconnect() {
255 LOG.debug("adapter is told to disconnect");
256 DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
257 disconnectEventBuilder.setInfo("disconnected by plugin");
258 systemListener.onDisconnectEvent(disconnectEventBuilder.build());
263 public boolean isAlive() {
269 public void setMessageListener(final OpenflowProtocolListener ofListener) {
270 this.ofListener = ofListener;
274 public void checkListeners() {
275 if (ofListener == null || systemListener == null
276 || connectionReadyListener == null) {
278 .add(new IllegalStateException("missing listeners"));
283 public void setSystemListener(final SystemNotificationsListener systemListener) {
284 this.systemListener = systemListener;
288 public void setAlienMessageListener(final AlienMessageListener alienMessageListener) {
298 private boolean checkRpc(final OfHeader rpcInput, final String rpcName) {
300 boolean finished = true;
302 if (eventPlan.isEmpty()) {
303 throw new IllegalStateException("eventPlan already depleted");
306 LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName,
307 rpcInput.getVersion(), rpcInput.getXid());
308 if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
309 && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
310 if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
311 SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan
313 msg = "expected [notification: "
314 + notifEvent.getPlannedNotification() + "], got ["
315 + rpcInput.getClass().getSimpleName() + "]";
316 } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
317 SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan
319 msg = "expected [rpc: " + rpcEvent.getPlannedRpcResponse()
320 + "], got [" + rpcInput.getClass().getSimpleName()
324 if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
325 SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
327 Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll
329 List<String> msgLot = new ArrayList<>();
331 if (eventBag == null || eventBag.isEmpty()) {
332 msg = "no wait events in bag";
335 for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
336 String msgPart = checkSingleRpcContent(rpcInput,
337 rpcName, switchTestWaitForRpc);
339 if (msgPart != null) {
342 LOG.debug("wait event matched: {}", rpcName);
343 eventBag.remove(switchTestWaitForRpc);
344 if (eventBag.isEmpty()) {
353 if (!msgLot.isEmpty()) {
354 msg = Joiner.on(" | ").join(msgLot);
356 } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
357 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
359 msg = checkSingleRpcContent(rpcInput, rpcName,
365 LOG.debug("rpc check .. FAILED: " + msg);
366 occuredExceptions.add(new IllegalArgumentException("step:"
367 + planItemCounter + " | " + msg));
369 LOG.debug("rpc check .. OK");
377 * @param switchTestWaitForRpc
380 private static String checkSingleRpcContent(final OfHeader rpcInput,
381 final String rpcName, final SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
382 String failureMsg = null;
383 if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
384 failureMsg = "expected rpc name ["
385 + switchTestWaitForRpc.getRpcName() + "], got [" + rpcName
387 } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
388 failureMsg = "expected " + rpcName + ".xid ["
389 + switchTestWaitForRpc.getXid() + "], got ["
390 + rpcInput.getXid() + "]";
402 private synchronized void checkRpcAndNext(final OfHeader rpcInput, final String rpcName) {
403 boolean finished = checkRpc(rpcInput, rpcName);
410 * discard current event, execute next, if possible
412 private void next() {
413 LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})",
414 planItemCounter, eventPlan.peek());
419 Thread.sleep(JOB_DELAY);
420 } catch (InterruptedException e) {
421 LOG.error(e.getMessage(), e);
427 * start or continue processing plan
429 private synchronized void proceed() {
430 boolean processed = false;
431 LOG.debug("proceeding plan item[{}]: {}", planItemCounter,
433 if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
434 SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
436 processNotification(notification);
438 } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
439 SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
441 processRpcResponse(rpcResponse);
443 } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
444 SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan
447 callbackEvent.getCallback().call();
448 } catch (Exception e) {
449 LOG.error(e.getMessage(), e);
450 occuredExceptions.add(e);
459 LOG.debug("now WAITING for OF_LISTENER to act ..");
460 wait(proceedTimeout);
461 } catch (InterruptedException e) {
462 LOG.error(e.getMessage(), e);
469 LOG.debug("|---> evenPlan STARTING ..");
471 while (!eventPlan.isEmpty()) {
476 .add(new IllegalStateException(
477 "eventPlan STALLED, planItemCounter="
484 Thread.sleep(JOB_DELAY);
485 } catch (InterruptedException e) {
486 LOG.error(e.getMessage(), e);
488 LOG.debug("<---| eventPlan DONE");
492 * @param notificationEvent
494 private synchronized void processNotification(
495 final SwitchTestNotificationEvent notificationEvent) {
497 Notification notification = notificationEvent.getPlannedNotification();
498 LOG.debug("notificating OF_LISTENER: "
499 + notification.getClass().getSimpleName());
502 if (notification instanceof DisconnectEvent) {
503 systemListener.onDisconnectEvent((DisconnectEvent) notification);
506 else if (notification instanceof EchoRequestMessage) {
507 ofListener.onEchoRequestMessage((EchoRequestMessage) notification);
508 } else if (notification instanceof ErrorMessage) {
509 ofListener.onErrorMessage((ErrorMessage) notification);
510 } else if (notification instanceof ExperimenterMessage) {
512 .onExperimenterMessage((ExperimenterMessage) notification);
513 } else if (notification instanceof FlowRemovedMessage) {
514 ofListener.onFlowRemovedMessage((FlowRemovedMessage) notification);
515 } else if (notification instanceof HelloMessage) {
516 ofListener.onHelloMessage((HelloMessage) notification);
517 } else if (notification instanceof MultipartReplyMessage) {
519 .onMultipartReplyMessage((MultipartReplyMessage) notification);
520 } else if (notification instanceof PacketInMessage) {
521 ofListener.onPacketInMessage((PacketInMessage) notification);
522 } else if (notification instanceof PortStatusMessage) {
523 ofListener.onPortStatusMessage((PortStatusMessage) notification);
527 occuredExceptions.add(new IllegalStateException("step:"
528 + planItemCounter + " | "
529 + "message listening not supported for type: "
530 + notification.getClass()));
533 LOG.debug("notification [" + notification.getClass().getSimpleName()
540 private synchronized void processRpcResponse(
541 final SwitchTestRcpResponseEvent rpcResponse) {
542 OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
543 LOG.debug("rpc-responding to OF_LISTENER: {}", rpcResponse.getXid());
545 @SuppressWarnings("unchecked")
546 final SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
547 .get(rpcResponse.getXid());
549 if (response != null) {
550 boolean successful = plannedRpcResponseValue != null;
551 Collection<RpcError> errors;
553 errors = Collections.emptyList();
555 errors = Lists.newArrayList(RpcErrors.getRpcError("unit",
556 "unit", "not requested", ErrorSeverity.ERROR,
557 "planned response to RPC.id = " + rpcResponse.getXid(),
558 ErrorType.RPC, new Exception(
559 "rpc response failed (planned behavior)")));
562 final RpcResult<?> result = Rpcs.getRpcResult(successful,
563 plannedRpcResponseValue, errors);
564 setFutureViaPool(response, result);
566 String msg = "RpcResponse not expected: xid="
567 + rpcResponse.getXid() + ", "
568 + plannedRpcResponseValue.getClass().getSimpleName();
570 occuredExceptions.add(new IllegalStateException("step:"
571 + planItemCounter + " | " + msg));
574 LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
577 private void setFutureViaPool(final SettableFuture<RpcResult<?>> response, final RpcResult<?> result) {
578 pool.execute(new Runnable() {
581 response.set(result);
589 * @return rpc future result
591 private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
593 SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
594 rpcResults.put(arg0.getXid(), result);
599 * @return rpc future result
601 private static ListenableFuture<RpcResult<Void>> createOneWayRpcResult() {
602 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
607 * the eventPlan to set
609 public void setEventPlan(final Stack<? extends SwitchTestEvent> eventPlan) {
610 this.eventPlan = eventPlan;
614 * @param proceedTimeout
615 * max timeout for processing one planned event (in [ms])
617 public void setProceedTimeout(final long proceedTimeout) {
618 this.proceedTimeout = proceedTimeout;
622 * @return the occuredExceptions
624 public List<Exception> getOccuredExceptions() {
625 return occuredExceptions;
629 public void fireConnectionReadyNotification() {
630 connectionReadyListener.onConnectionReady();
634 public void setConnectionReadyListener(
635 final ConnectionReadyListener connectionReadyListener) {
636 this.connectionReadyListener = connectionReadyListener;
640 public Future<RpcResult<Void>> multipartRequest(final MultipartRequestInput arg0) {
641 checkRpcAndNext(arg0, "multipartRequestInput");
642 ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
647 public InetSocketAddress getRemoteAddress() {
648 return InetSocketAddress.createUnresolved("unittest-odl.example.org", 4242);
652 public boolean isAutoRead() {
657 public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(final T t, final int i, final long l) {
662 public void setAutoRead(final boolean autoRead) {
663 this.autoRead = autoRead;
667 public void setPacketInFiltering(final boolean packetInFiltering) {
668 this.packetInFiltering = packetInFiltering;