1 /**
2 	Stream interface for passing data between different tasks.
3 
4 	Copyright: © 2013 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.taskpipe;
9 
10 public import vibe.core.stream;
11 
12 import core.sync.mutex;
13 import core.time;
14 import std.algorithm : min;
15 import std.exception;
16 import vibe.core.core;
17 import vibe.core.sync;
18 import vibe.utils.array;
19 
20 
21 /**
22 	Implements a unidirectional data pipe between two tasks.
23 */
24 final class TaskPipe : ConnectionStream {
25 	private {
26 		TaskPipeImpl m_pipe;
27 	}
28 
29 	/** Constructs a new pipe ready for use.
30 	*/
31 	this(bool grow_when_full = false)
32 	{
33 		m_pipe = new TaskPipeImpl(grow_when_full);
34 	}
35 
36 	/// Size of the (fixed) FIFO buffer used to transfer data between tasks
37 	@property size_t bufferSize() const { return m_pipe.bufferSize; }
38 	/// ditto
39 	@property void bufferSize(size_t nbytes) { m_pipe.bufferSize = nbytes; }
40 
41 	@property bool empty() { return leastSize() == 0; }
42 	@property ulong leastSize() { m_pipe.waitForData(); return m_pipe.fill; }
43 	@property bool dataAvailableForRead() { return m_pipe.fill > 0; }
44 	@property bool connected() const { return m_pipe.open; }
45 
46 	void close() { m_pipe.close(); }
47 	bool waitForData(Duration timeout)
48 	{
49 		if (dataAvailableForRead) return true;
50 		m_pipe.waitForData(timeout);
51 		return dataAvailableForRead;
52 	}
53 	const(ubyte)[] peek() { return m_pipe.peek; }
54 	size_t read(scope ubyte[] dst, IOMode mode) { return m_pipe.read(dst, mode); }
55 	alias read = ConnectionStream.read;
56 	size_t write(in ubyte[] bytes, IOMode mode) { return m_pipe.write(bytes, mode); }
57 	alias write = ConnectionStream.write;
58 	void flush() {}
59 	void finalize() { m_pipe.close(); }
60 }
61 
62 
63 /**
64 	Underyling pipe implementation for TaskPipe with no Stream interface.
65 */
66 private final class TaskPipeImpl {
67 	@safe:
68 
69 	private {
70 		Mutex m_mutex;
71 		InterruptibleTaskCondition m_condition;
72 		vibe.utils.array.FixedRingBuffer!ubyte m_buffer;
73 		bool m_closed = false;
74 		bool m_growWhenFull;
75 	}
76 
77 	/** Constructs a new pipe ready for use.
78 	*/
79 	this(bool grow_when_full = false)
80 	{
81 		m_mutex = new Mutex;
82 		() @trusted { m_condition = new InterruptibleTaskCondition(m_mutex); } ();
83 		m_buffer.capacity = 2048;
84 		m_growWhenFull = grow_when_full;
85 	}
86 
87 	/// Size of the (fixed) buffer used to transfer data between tasks
88 	@property size_t bufferSize() const { return m_buffer.capacity; }
89 	/// ditto
90 	@property void bufferSize(size_t nbytes) { m_buffer.capacity = nbytes; }
91 
92 	/// Number of bytes currently in the transfer buffer
93 	@property size_t fill()
94 	const {
95 		synchronized (m_mutex) {
96 			return m_buffer.length;
97 		}
98 	}
99 
100 	@property bool open() const { return !m_closed; }
101 
102 	/** Closes the pipe.
103 	*/
104 	void close()
105 	{
106 		synchronized (m_mutex) m_closed = true;
107 		() @trusted { m_condition.notifyAll(); } ();
108 	}
109 
110 	/** Blocks until at least one byte of data has been written to the pipe.
111 	*/
112 	void waitForData(Duration timeout = Duration.max)
113 	{
114 		import std.datetime : Clock, SysTime, UTC;
115 		bool have_timeout = timeout > 0.seconds && timeout != Duration.max;
116 		SysTime now = Clock.currTime(UTC());
117 		SysTime timeout_target;
118 		if (have_timeout) timeout_target = now + timeout;
119 
120 		synchronized (m_mutex) {
121 			while (m_buffer.empty && !m_closed && (!have_timeout || now < timeout_target)) {
122 				if (have_timeout)
123 					() @trusted { m_condition.wait(timeout_target - now); } ();
124 				else () @trusted { m_condition.wait(); } ();
125 				now = Clock.currTime(UTC());
126 			}
127 		}
128 	}
129 
130 	/** Writes the given byte array to the pipe.
131 	*/
132 	size_t write(const(ubyte)[] data, IOMode mode)
133 	{
134 		size_t ret = 0;
135 
136 		enforce(!m_closed, "Writing to closed task pipe.");
137 
138 		while (data.length > 0){
139 			bool need_signal;
140 			synchronized (m_mutex) {
141 				if (m_growWhenFull && m_buffer.full) {
142 					size_t new_sz = m_buffer.capacity;
143 					while (new_sz - m_buffer.capacity < data.length) new_sz += 2;
144 					m_buffer.capacity = new_sz;
145 				} else while (m_buffer.full) {
146 					if (mode == IOMode.immediate || mode == IOMode.once && ret > 0)
147 						return ret;
148 					() @trusted { m_condition.wait(); } ();
149 				}
150 
151 				need_signal = m_buffer.empty;
152 				auto len = min(m_buffer.freeSpace, data.length);
153 				m_buffer.put(data[0 .. len]);
154 				data = data[len .. $];
155 				ret += len;
156 			}
157 			if (need_signal) () @trusted { m_condition.notifyAll(); } ();
158 		}
159 		if (!m_growWhenFull) vibe.core.core.yield();
160 
161 		return ret;
162 	}
163 
164 	/** Returns a temporary view of the beginning of the transfer buffer.
165 
166 		Note that a call to read invalidates this array slice. Blocks in case
167 		of a filled up transfer buffer.
168 	*/
169 	const(ubyte[]) peek()
170 	{
171 		synchronized (m_mutex) {
172 			return m_buffer.peek();
173 		}
174 	}
175 
176 	/** Reads data into the supplied buffer.
177 
178 		Blocks until a sufficient amount of data has been written to the pipe.
179 	*/
180 	size_t read(scope ubyte[] dst, IOMode mode)
181 	{
182 		size_t ret = 0;
183 
184 		while (dst.length > 0) {
185 			bool need_signal;
186 			size_t len;
187 			synchronized (m_mutex) {
188 				while (m_buffer.empty && !m_closed) {
189 					if (mode == IOMode.immediate || mode == IOMode.once && ret > 0)
190 						return ret;
191 					() @trusted { m_condition.wait(); } ();
192 				}
193 
194 				need_signal = m_buffer.full;
195 				enforce(!m_buffer.empty, "Reading past end of closed pipe.");
196 				len = min(dst.length, m_buffer.length);
197 				m_buffer.read(dst[0 .. len]);
198 				ret += len;
199 			}
200 			if (need_signal) () @trusted { m_condition.notifyAll(); } ();
201 			dst = dst[len .. $];
202 		}
203 		vibe.core.core.yield();
204 
205 		return ret;
206 	}
207 }
208 
209 unittest { // issue #1501 - deadlock in TaskPipe
210 	import std.datetime : Clock, UTC;
211 	import core.time : msecs;
212 
213 	// test read after write and write after read
214 	foreach (i; 0 .. 2) {
215 		auto p = new TaskPipe;
216 		p.bufferSize = 2048;
217 
218 		Task a, b;
219 		a = runTask({ ubyte[2100] buf; if (i == 0) p.read(buf, IOMode.all); else p.write(buf, IOMode.all); });
220 		b = runTask({ ubyte[2100] buf; if (i == 0) p.write(buf, IOMode.all); else p.read(buf, IOMode.all); });
221 
222 		auto joiner = runTask({
223 			auto starttime = Clock.currTime(UTC());
224 			while (a.running || b.running) {
225 				if (Clock.currTime(UTC()) - starttime > 500.msecs)
226 					assert(false, "TaskPipe is dead locked.");
227 				yield();
228 			}
229 		});
230 
231 		joiner.join();
232 	}
233 }
234 
235 unittest { // issue #
236 	auto t = runTask({
237 		auto tp = new TaskPipeImpl;
238 		tp.waitForData(10.msecs);
239 		exitEventLoop();
240 	});
241 	runTask({
242 		sleep(500.msecs);
243 		assert(!t.running, "TaskPipeImpl.waitForData didn't timeout.");
244 	});
245 	runEventLoop();
246 }