1 /**
2 	Stream interface for passing data between different tasks.
3 
4 	Copyright: © 2013 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.taskpipe;
9 
10 public import vibe.core.stream;
11 
12 import core.sync.mutex;
13 import core.time;
14 import std.algorithm : min;
15 import std.exception;
16 import vibe.core.core;
17 import vibe.core.sync;
18 import vibe.utils.array;
19 
20 
21 /**
22 	Implements a unidirectional data pipe between two tasks.
23 */
24 final class TaskPipe : ConnectionStream {
25 	private {
26 		TaskPipeImpl m_pipe;
27 	}
28 
29 	/** Constructs a new pipe ready for use.
30 	*/
31 	this(bool grow_when_full = false)
32 	{
33 		m_pipe = new TaskPipeImpl(grow_when_full);
34 	}
35 
36 	/// Size of the (fixed) FIFO buffer used to transfer data between tasks
37 	@property size_t bufferSize() const { return m_pipe.bufferSize; }
38 	/// ditto
39 	@property void bufferSize(size_t nbytes) { m_pipe.bufferSize = nbytes; }
40 
41 	@property bool empty() { return leastSize() == 0; }
42 	@property ulong leastSize() { m_pipe.waitForData(); return m_pipe.fill; }
43 	@property bool dataAvailableForRead() { return m_pipe.fill > 0; }
44 	@property bool connected() const { return m_pipe.open; }
45 
46 	void close() { m_pipe.close(); }
47 	bool waitForData(Duration timeout)
48 	{
49 		if (dataAvailableForRead) return true;
50 		m_pipe.waitForData(timeout);
51 		return dataAvailableForRead;
52 	}
53 	const(ubyte)[] peek() { return m_pipe.peek; }
54 	size_t read(scope ubyte[] dst, IOMode mode) { return m_pipe.read(dst, mode); }
55 	alias read = ConnectionStream.read;
56 	size_t write(in ubyte[] bytes, IOMode mode) { return m_pipe.write(bytes, mode); }
57 	alias write = ConnectionStream.write;
58 	void flush() {}
59 	void finalize() { m_pipe.close(); }
60 }
61 
62 
63 /**
64 	Underyling pipe implementation for TaskPipe with no Stream interface.
65 */
66 private final class TaskPipeImpl {
67 	@safe:
68 
69 	private {
70 		Mutex m_mutex;
71 		InterruptibleTaskCondition m_condition;
72 		vibe.utils.array.FixedRingBuffer!ubyte m_buffer;
73 		bool m_closed = false;
74 		bool m_growWhenFull;
75 	}
76 
77 	/** Constructs a new pipe ready for use.
78 	*/
79 	this(bool grow_when_full = false)
80 	{
81 		m_mutex = new Mutex;
82 		() @trusted { m_condition = new InterruptibleTaskCondition(m_mutex); } ();
83 		m_buffer.capacity = 2048;
84 		m_growWhenFull = grow_when_full;
85 	}
86 
87 	/// Size of the (fixed) buffer used to transfer data between tasks
88 	@property size_t bufferSize() const { return m_buffer.capacity; }
89 	/// ditto
90 	@property void bufferSize(size_t nbytes) { m_buffer.capacity = nbytes; }
91 
92 	/// Number of bytes currently in the transfer buffer
93 	@property size_t fill()
94 	const {
95 		synchronized (m_mutex) {
96 			return m_buffer.length;
97 		}
98 	}
99 
100 	@property bool open() const { return !m_closed; }
101 
102 	/** Closes the pipe.
103 	*/
104 	void close()
105 	{
106 		synchronized (m_mutex) m_closed = true;
107 		() @trusted { m_condition.notifyAll(); } ();
108 	}
109 
110 	/** Blocks until at least one byte of data has been written to the pipe.
111 	*/
112 	void waitForData(Duration timeout = Duration.max)
113 	{
114 		import std.datetime : Clock, SysTime, UTC;
115 		bool have_timeout = timeout > 0.seconds && timeout != Duration.max;
116 		SysTime now = Clock.currTime(UTC());
117 		SysTime timeout_target;
118 		if (have_timeout) timeout_target = now + timeout;
119 
120 		synchronized (m_mutex) {
121 			while (m_buffer.empty && !m_closed && (!have_timeout || now < timeout_target)) {
122 				if (have_timeout)
123 					() @trusted { m_condition.wait(timeout_target - now); } ();
124 				else () @trusted { m_condition.wait(); } ();
125 				now = Clock.currTime(UTC());
126 			}
127 		}
128 	}
129 
130 	/** Writes the given byte array to the pipe.
131 	*/
132 	size_t write(const(ubyte)[] data, IOMode mode)
133 	{
134 		size_t ret = 0;
135 
136 		enforce(!m_closed, "Writing to closed task pipe.");
137 
138 		while (data.length > 0){
139 			bool need_signal;
140 			synchronized (m_mutex) {
141 				if (m_growWhenFull && m_buffer.full) {
142 					size_t new_sz = m_buffer.capacity;
143 					while (new_sz - m_buffer.capacity < data.length) new_sz += 2;
144 					m_buffer.capacity = new_sz;
145 				} else while (m_buffer.full) {
146 					if (mode == IOMode.immediate || mode == IOMode.once && ret > 0)
147 						return ret;
148 					if (m_closed) throw new Exception("Pipe closed while writing data");
149 					() @trusted { m_condition.wait(); } ();
150 				}
151 
152 				need_signal = m_buffer.empty;
153 				auto len = min(m_buffer.freeSpace, data.length);
154 				m_buffer.put(data[0 .. len]);
155 				data = data[len .. $];
156 				ret += len;
157 			}
158 			if (need_signal) () @trusted { m_condition.notifyAll(); } ();
159 		}
160 		if (!m_growWhenFull) vibe.core.core.yield();
161 
162 		return ret;
163 	}
164 
165 	/** Returns a temporary view of the beginning of the transfer buffer.
166 
167 		Note that a call to read invalidates this array slice. Blocks in case
168 		of a filled up transfer buffer.
169 	*/
170 	const(ubyte[]) peek()
171 	{
172 		synchronized (m_mutex) {
173 			return m_buffer.peek();
174 		}
175 	}
176 
177 	/** Reads data into the supplied buffer.
178 
179 		Blocks until a sufficient amount of data has been written to the pipe.
180 	*/
181 	size_t read(scope ubyte[] dst, IOMode mode)
182 	{
183 		size_t ret = 0;
184 
185 		while (dst.length > 0) {
186 			bool need_signal;
187 			size_t len;
188 			synchronized (m_mutex) {
189 				while (m_buffer.empty && !m_closed) {
190 					if (mode == IOMode.immediate || mode == IOMode.once && ret > 0)
191 						return ret;
192 					() @trusted { m_condition.wait(); } ();
193 				}
194 
195 				need_signal = m_buffer.full;
196 				enforce(!m_buffer.empty, "Reading past end of closed pipe.");
197 				len = min(dst.length, m_buffer.length);
198 				m_buffer.read(dst[0 .. len]);
199 				ret += len;
200 			}
201 			if (need_signal) () @trusted { m_condition.notifyAll(); } ();
202 			dst = dst[len .. $];
203 		}
204 		vibe.core.core.yield();
205 
206 		return ret;
207 	}
208 }
209 
210 unittest { // issue #1501 - deadlock in TaskPipe
211 	import std.datetime : Clock, UTC;
212 	import core.time : msecs;
213 
214 	// test read after write and write after read
215 	foreach (i; 0 .. 2) {
216 		auto p = new TaskPipe;
217 		p.bufferSize = 2048;
218 
219 		Task a, b;
220 		a = runTask({ ubyte[2100] buf; if (i == 0) p.read(buf, IOMode.all); else p.write(buf, IOMode.all); });
221 		b = runTask({ ubyte[2100] buf; if (i == 0) p.write(buf, IOMode.all); else p.read(buf, IOMode.all); });
222 
223 		auto joiner = runTask({
224 			auto starttime = Clock.currTime(UTC());
225 			while (a.running || b.running) {
226 				if (Clock.currTime(UTC()) - starttime > 500.msecs)
227 					assert(false, "TaskPipe is dead locked.");
228 				yield();
229 			}
230 		});
231 
232 		joiner.join();
233 	}
234 }
235 
236 unittest { // issue #
237 	auto t = runTask({
238 		auto tp = new TaskPipeImpl;
239 		tp.waitForData(10.msecs);
240 		exitEventLoop();
241 	});
242 	runTask({
243 		sleep(500.msecs);
244 		assert(!t.running, "TaskPipeImpl.waitForData didn't timeout.");
245 	});
246 	runEventLoop();
247 }