2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
14 import java.util.function.Function;
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 final class OutboundQueueEntry {
26 private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntry.class);
27 public static final Function<OfHeader, Boolean> DEFAULT_IS_COMPLETE = new Function<OfHeader, Boolean>() {
30 public Boolean apply(final OfHeader message) {
31 if (message instanceof MultipartReplyMessage) {
32 return !((MultipartReplyMessage) message).getFlags().isOFPMPFREQMORE();
40 private FutureCallback<OfHeader> callback;
41 private OfHeader message;
42 private boolean completed;
43 private boolean barrier;
44 private volatile boolean committed;
45 private Function<OfHeader, Boolean> isCompletedFunction = DEFAULT_IS_COMPLETE;
47 void commit(final OfHeader messageToCommit, final FutureCallback<OfHeader> commitCallback) {
48 commit(messageToCommit, commitCallback, DEFAULT_IS_COMPLETE);
51 void commit(final OfHeader messageToCommit, final FutureCallback<OfHeader> commitCallback,
52 final Function<OfHeader, Boolean> isCommitCompletedFunction) {
54 LOG.warn("Can't commit a completed message.");
55 if (commitCallback != null) {
56 commitCallback.onFailure(new OutboundQueueException("Can't commit a completed message."));
59 this.message = messageToCommit;
60 this.callback = commitCallback;
61 this.barrier = messageToCommit instanceof BarrierInput;
62 this.isCompletedFunction = isCommitCompletedFunction;
64 // Volatile write, needs to be last
65 this.committed = true;
75 // Volatile write, needs to be last
83 boolean isCommitted() {
87 boolean isCompleted() {
91 OfHeader takeMessage() {
92 final OfHeader ret = message;
94 checkCompletionNeed();
100 private void checkCompletionNeed() {
101 if (callback == null || (message instanceof PacketOutInput)) {
103 if (callback != null) {
104 callback.onSuccess(null);
111 boolean complete(final OfHeader response) {
112 Preconditions.checkState(!completed, "Attempted to complete a completed message with response %s", response);
114 // Multipart requests are special, we have to look at them to see
115 // if there is something outstanding and adjust ourselves accordingly
116 final boolean reallyComplete = isCompletedFunction.apply(response);
118 completed = reallyComplete;
119 if (callback != null) {
120 callback.onSuccess(response);
121 if (reallyComplete) {
122 // We will not need the callback anymore, make sure it can be GC'd
126 LOG.debug("Entry {} completed {} with response {}", this, completed, response);
127 return reallyComplete;
130 void fail(final OutboundQueueException cause) {
133 if (callback != null) {
134 callback.onFailure(cause);
138 LOG.warn("Ignoring failure {} for completed message", cause);
143 /** This method is only for testing to prove that after queue entry is completed there is not callback future */
144 boolean hasCallback() {
145 return (callback != null);