OPNFLWPLUG-1034
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueEntry.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13
14 import java.util.function.Function;
15
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
21
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 final class OutboundQueueEntry {
26     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntry.class);
27     public static final Function<OfHeader, Boolean> DEFAULT_IS_COMPLETE = new Function<OfHeader, Boolean>() {
28
29         @Override
30         public Boolean apply(final OfHeader message) {
31             if (message instanceof MultipartReplyMessage) {
32                 return !((MultipartReplyMessage) message).getFlags().isOFPMPFREQMORE();
33             }
34
35             return true;
36         }
37
38     };
39
40     private FutureCallback<OfHeader> callback;
41     private OfHeader message;
42     private boolean completed;
43     private boolean barrier;
44     private volatile boolean committed;
45     private Function<OfHeader, Boolean> isCompletedFunction = DEFAULT_IS_COMPLETE;
46
47     void commit(final OfHeader messageToCommit, final FutureCallback<OfHeader> commitCallback) {
48         commit(messageToCommit, commitCallback, DEFAULT_IS_COMPLETE);
49     }
50
51     void commit(final OfHeader messageToCommit, final FutureCallback<OfHeader> commitCallback,
52             final Function<OfHeader, Boolean> isCommitCompletedFunction) {
53         if (this.completed) {
54             LOG.warn("Can't commit a completed message.");
55             if (commitCallback != null) {
56                 commitCallback.onFailure(new OutboundQueueException("Can't commit a completed message."));
57             }
58         } else {
59             this.message = messageToCommit;
60             this.callback = commitCallback;
61             this.barrier = messageToCommit instanceof BarrierInput;
62             this.isCompletedFunction = isCommitCompletedFunction;
63
64             // Volatile write, needs to be last
65             this.committed = true;
66         }
67     }
68
69     void reset() {
70         barrier = false;
71         callback = null;
72         completed = false;
73         message = null;
74
75         // Volatile write, needs to be last
76         committed = false;
77     }
78
79     boolean isBarrier() {
80         return barrier;
81     }
82
83     boolean isCommitted() {
84         return committed;
85     }
86
87     boolean isCompleted() {
88         return completed;
89     }
90
91     OfHeader takeMessage() {
92         final OfHeader ret = message;
93         if (!barrier) {
94             checkCompletionNeed();
95         }
96         message = null;
97         return ret;
98     }
99
100     private void checkCompletionNeed() {
101         if (callback == null || (message instanceof PacketOutInput)) {
102             completed = true;
103             if (callback != null) {
104                 callback.onSuccess(null);
105                 callback = null;
106             }
107             committed = false;
108         }
109     }
110
111     boolean complete(final OfHeader response) {
112         Preconditions.checkState(!completed, "Attempted to complete a completed message with response %s", response);
113
114         // Multipart requests are special, we have to look at them to see
115         // if there is something outstanding and adjust ourselves accordingly
116         final boolean reallyComplete = isCompletedFunction.apply(response);
117
118         completed = reallyComplete;
119         if (callback != null) {
120             callback.onSuccess(response);
121             if (reallyComplete) {
122                 // We will not need the callback anymore, make sure it can be GC'd
123                 callback = null;
124             }
125         }
126         LOG.debug("Entry {} completed {} with response {}", this, completed, response);
127         return reallyComplete;
128     }
129
130     void fail(final OutboundQueueException cause) {
131         if (!completed) {
132             completed = true;
133             if (callback != null) {
134                 callback.onFailure(cause);
135                 callback = null;
136             }
137         } else {
138             LOG.warn("Ignoring failure {} for completed message", cause);
139         }
140     }
141
142     @VisibleForTesting
143     /** This method is only for testing to prove that after queue entry is completed there is not callback future */
144     boolean hasCallback() {
145         return (callback != null);
146     }
147 }