Add isComplete callback to commitEntry
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueEntry.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13
14 import java.util.function.Function;
15
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
21
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 final class OutboundQueueEntry {
26     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntry.class);
27     public static final Function<OfHeader, Boolean> DEFAULT_IS_COMPLETE = new Function<OfHeader, Boolean>() {
28
29         @Override
30         public Boolean apply(final OfHeader message) {
31             if (message instanceof MultipartReplyMessage) {
32                 return !((MultipartReplyMessage) message).getFlags().isOFPMPFREQMORE();
33             }
34
35             return true;
36         }
37
38     };
39
40     private FutureCallback<OfHeader> callback;
41     private OfHeader message;
42     private boolean completed;
43     private boolean barrier;
44     private volatile boolean committed;
45     private OutboundQueueException lastException = null;
46     private Function<OfHeader, Boolean> isCompletedFunction = DEFAULT_IS_COMPLETE;
47
48     void commit(final OfHeader message, final FutureCallback<OfHeader> callback) {
49         commit(message, callback, DEFAULT_IS_COMPLETE);
50     }
51
52     void commit(final OfHeader message, final FutureCallback<OfHeader> callback,
53             final Function<OfHeader, Boolean> isCompletedFunction) {
54         if (this.completed) {
55             LOG.warn("Can't commit a completed message.");
56             if (callback != null) {
57                 callback.onFailure(lastException);
58             }
59         } else {
60             this.message = message;
61             this.callback = callback;
62             this.barrier = message instanceof BarrierInput;
63             this.isCompletedFunction = isCompletedFunction;
64
65             // Volatile write, needs to be last
66             this.committed = true;
67         }
68     }
69
70     void reset() {
71         barrier = false;
72         callback = null;
73         completed = false;
74         message = null;
75
76         // Volatile write, needs to be last
77         committed = false;
78     }
79
80     boolean isBarrier() {
81         return barrier;
82     }
83
84     boolean isCommitted() {
85         return committed;
86     }
87
88     boolean isCompleted() {
89         return completed;
90     }
91
92     OfHeader takeMessage() {
93         final OfHeader ret = message;
94         if (!barrier) {
95             checkCompletionNeed();
96         }
97         message = null;
98         return ret;
99     }
100
101     private void checkCompletionNeed() {
102         if (callback == null || (message instanceof PacketOutInput)) {
103             completed = true;
104             if (callback != null) {
105                 callback.onSuccess(null);
106                 callback = null;
107             }
108             committed = false;
109         }
110     }
111
112     boolean complete(final OfHeader response) {
113         Preconditions.checkState(!completed, "Attempted to complete a completed message with response %s", response);
114
115         // Multipart requests are special, we have to look at them to see
116         // if there is something outstanding and adjust ourselves accordingly
117         final boolean reallyComplete = isCompletedFunction.apply(response);
118
119         completed = reallyComplete;
120         if (callback != null) {
121             callback.onSuccess(response);
122             if (reallyComplete) {
123                 // We will not need the callback anymore, make sure it can be GC'd
124                 callback = null;
125             }
126         }
127         LOG.debug("Entry {} completed {} with response {}", this, completed, response);
128         return reallyComplete;
129     }
130
131     void fail(final OutboundQueueException cause) {
132         if (!completed) {
133             lastException = cause;
134             completed = true;
135             if (callback != null) {
136                 callback.onFailure(cause);
137                 callback = null;
138             }
139         } else {
140             LOG.warn("Ignoring failure {} for completed message", cause);
141         }
142     }
143
144     @VisibleForTesting
145     /** This method is only for testing to prove that after queue entry is completed there is not callback future */
146     boolean hasCallback() {
147         return (callback != null);
148     }
149 }