1 /**
2 	Stream interface for passing data between different tasks.
3 
4 	Copyright: © 2013 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.taskpipe;
9 
10 public import vibe.core.stream;
11 
12 import core.sync.mutex;
13 import core.time;
14 import std.algorithm : min;
15 import std.exception;
16 import vibe.core.core;
17 import vibe.core.sync;
18 import vibe.utils.array;
19 
20 
21 /**
22 	Implements a unidirectional data pipe between two tasks.
23 */
24 final class TaskPipe : ConnectionStream {
25 	private {
26 		TaskPipeImpl m_pipe;
27 	}
28 
29 	/** Constructs a new pipe ready for use.
30 	*/
31 	this(bool grow_when_full = false)
32 	{
33 		m_pipe = new TaskPipeImpl(grow_when_full);
34 	}
35 
36 	/// Size of the (fixed) FIFO buffer used to transfer data between tasks
37 	@property size_t bufferSize() const { return m_pipe.bufferSize; }
38 	/// ditto
39 	@property void bufferSize(size_t nbytes) { m_pipe.bufferSize = nbytes; }
40 
41 	@property bool empty() { return leastSize() == 0; }
42 	@property ulong leastSize() { m_pipe.waitForData(); return m_pipe.fill; }
43 	@property bool dataAvailableForRead() { return m_pipe.fill > 0; }
44 	@property bool connected() const { return m_pipe.open; }
45 
46 	void close() { m_pipe.close(); }
47 	bool waitForData(Duration timeout)
48 	{
49 		if (dataAvailableForRead) return true;
50 		m_pipe.waitForData(timeout);
51 		return dataAvailableForRead;
52 	}
53 	const(ubyte)[] peek() { return m_pipe.peek; }
54 	size_t read(scope ubyte[] dst, IOMode mode) { return m_pipe.read(dst, mode); }
55 	alias read = ConnectionStream.read;
56 	static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) {
57 		override size_t write(scope const(ubyte)[] bytes, IOMode mode) { return m_pipe.write(bytes, mode); }
58 	} else {
59 		override size_t write(in ubyte[] bytes, IOMode mode) { return m_pipe.write(bytes, mode); }
60 	}
61 	alias write = ConnectionStream.write;
62 	void flush() {}
63 	void finalize() { m_pipe.close(); }
64 }
65 
66 
67 /**
68 	Underyling pipe implementation for TaskPipe with no Stream interface.
69 */
70 private final class TaskPipeImpl {
71 	@safe:
72 
73 	private {
74 		Mutex m_mutex;
75 		InterruptibleTaskCondition m_condition;
76 		vibe.utils.array.FixedRingBuffer!ubyte m_buffer;
77 		bool m_closed = false;
78 		bool m_growWhenFull;
79 	}
80 
81 	/** Constructs a new pipe ready for use.
82 	*/
83 	this(bool grow_when_full = false)
84 	{
85 		m_mutex = new Mutex;
86 		() @trusted { m_condition = new InterruptibleTaskCondition(m_mutex); } ();
87 		m_buffer.capacity = 2048;
88 		m_growWhenFull = grow_when_full;
89 	}
90 
91 	/// Size of the (fixed) buffer used to transfer data between tasks
92 	@property size_t bufferSize() const { return m_buffer.capacity; }
93 	/// ditto
94 	@property void bufferSize(size_t nbytes) { m_buffer.capacity = nbytes; }
95 
96 	/// Number of bytes currently in the transfer buffer
97 	@property size_t fill()
98 	const {
99 		synchronized (m_mutex) {
100 			return m_buffer.length;
101 		}
102 	}
103 
104 	@property bool open() const { return !m_closed; }
105 
106 	/** Closes the pipe.
107 	*/
108 	void close()
109 	{
110 		synchronized (m_mutex) m_closed = true;
111 		() @trusted { m_condition.notifyAll(); } ();
112 	}
113 
114 	/** Blocks until at least one byte of data has been written to the pipe.
115 	*/
116 	void waitForData(Duration timeout = Duration.max)
117 	{
118 		import std.datetime : Clock, SysTime, UTC;
119 		bool have_timeout = timeout > 0.seconds && timeout != Duration.max;
120 		SysTime now = Clock.currTime(UTC());
121 		SysTime timeout_target;
122 		if (have_timeout) timeout_target = now + timeout;
123 
124 		synchronized (m_mutex) {
125 			while (m_buffer.empty && !m_closed && (!have_timeout || now < timeout_target)) {
126 				if (have_timeout)
127 					() @trusted { m_condition.wait(timeout_target - now); } ();
128 				else () @trusted { m_condition.wait(); } ();
129 				now = Clock.currTime(UTC());
130 			}
131 		}
132 	}
133 
134 	/** Writes the given byte array to the pipe.
135 	*/
136 	size_t write(scope const(ubyte)[] data, IOMode mode)
137 	{
138 		size_t ret = 0;
139 
140 		enforce(!m_closed, "Writing to closed task pipe.");
141 
142 		while (data.length > 0){
143 			bool need_signal;
144 			synchronized (m_mutex) {
145 				if (m_growWhenFull && m_buffer.full) {
146 					size_t new_sz = m_buffer.capacity;
147 					while (new_sz - m_buffer.capacity < data.length) new_sz += 2;
148 					m_buffer.capacity = new_sz;
149 				} else while (m_buffer.full) {
150 					if (mode == IOMode.immediate || mode == IOMode.once && ret > 0)
151 						return ret;
152 					if (m_closed) throw new Exception("Pipe closed while writing data");
153 					() @trusted { m_condition.wait(); } ();
154 				}
155 
156 				need_signal = m_buffer.empty;
157 				auto len = min(m_buffer.freeSpace, data.length);
158 				m_buffer.put(data[0 .. len]);
159 				data = data[len .. $];
160 				ret += len;
161 			}
162 			if (need_signal) () @trusted { m_condition.notifyAll(); } ();
163 		}
164 		if (!m_growWhenFull) vibe.core.core.yield();
165 
166 		return ret;
167 	}
168 
169 	/** Returns a temporary view of the beginning of the transfer buffer.
170 
171 		Note that a call to read invalidates this array slice. Blocks in case
172 		of a filled up transfer buffer.
173 	*/
174 	const(ubyte[]) peek()
175 	{
176 		synchronized (m_mutex) {
177 			return m_buffer.peek();
178 		}
179 	}
180 
181 	/** Reads data into the supplied buffer.
182 
183 		Blocks until a sufficient amount of data has been written to the pipe.
184 	*/
185 	size_t read(scope ubyte[] dst, IOMode mode)
186 	{
187 		size_t ret = 0;
188 
189 		while (dst.length > 0) {
190 			bool need_signal;
191 			size_t len;
192 			synchronized (m_mutex) {
193 				while (m_buffer.empty && !m_closed) {
194 					if (mode == IOMode.immediate || mode == IOMode.once && ret > 0)
195 						return ret;
196 					() @trusted { m_condition.wait(); } ();
197 				}
198 
199 				need_signal = m_buffer.full;
200 				enforce(!m_buffer.empty, "Reading past end of closed pipe.");
201 				len = min(dst.length, m_buffer.length);
202 				m_buffer.read(dst[0 .. len]);
203 				ret += len;
204 			}
205 			if (need_signal) () @trusted { m_condition.notifyAll(); } ();
206 			dst = dst[len .. $];
207 		}
208 		vibe.core.core.yield();
209 
210 		return ret;
211 	}
212 }
213 
214 unittest { // issue #1501 - deadlock in TaskPipe
215 	import std.datetime : Clock, UTC;
216 	import core.time : msecs;
217 
218 	// test read after write and write after read
219 	foreach (i; 0 .. 2) {
220 		auto p = new TaskPipe;
221 		p.bufferSize = 2048;
222 
223 		Task a, b;
224 		a = runTask({
225 			ubyte[2100] buf;
226 			try {
227 				if (i == 0) p.read(buf, IOMode.all);
228 				else p.write(buf, IOMode.all);
229 			} catch (Exception e) assert(false, e.msg);
230 		});
231 		b = runTask({
232 			ubyte[2100] buf;
233 			try {
234 				if (i == 0) p.write(buf, IOMode.all);
235 				else p.read(buf, IOMode.all);
236 			} catch (Exception e) assert(false, e.msg);
237 		});
238 
239 		auto joiner = runTask({
240 			try {
241 				auto starttime = Clock.currTime(UTC());
242 				while (a.running || b.running) {
243 					if (Clock.currTime(UTC()) - starttime > 500.msecs)
244 						assert(false, "TaskPipe is dead locked.");
245 					yield();
246 				}
247 			} catch (Exception e) assert(false, e.msg);
248 		});
249 
250 		joiner.join();
251 	}
252 }
253 
254 unittest { // issue #
255 	auto t = runTask({
256 		try {
257 			auto tp = new TaskPipeImpl;
258 			tp.waitForData(10.msecs);
259 			exitEventLoop();
260 		} catch (Exception e) assert(false, e.msg);
261 	});
262 	runTask({
263 		sleepUninterruptible(500.msecs);
264 		assert(!t.running, "TaskPipeImpl.waitForData didn't timeout.");
265 	});
266 	runEventLoop();
267 }