1 /**
2 	High level stream manipulation functions.
3 
4 	Copyright: © 2012-2016 RejectedSoftware e.K.
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.operations;
9 
10 public import vibe.core.stream;
11 
12 import vibe.core.log;
13 import vibe.utils.array : AllocAppender;
14 import vibe.internal.allocator;
15 import vibe.internal.freelistref;
16 import vibe.stream.wrapper : ProxyStream;
17 
18 import std.algorithm;
19 import std.array;
20 import std.datetime;
21 import std.exception;
22 import std.range : isOutputRange;
23 
24 
25 /**************************************************************************************************/
26 /* Public functions                                                                               */
27 /**************************************************************************************************/
28 
29 /**
30 	Reads and returns a single line from the stream.
31 
32 	Throws:
33 		An exception if either the stream end was hit without hitting a newline first, or
34 		if more than max_bytes have been read from the stream.
35 */
36 ubyte[] readLine(InputStream)(InputStream stream, size_t max_bytes = size_t.max, string linesep = "\r\n", IAllocator alloc = vibeThreadAllocator()) /*@ufcs*/
37 	if (isInputStream!InputStream)
38 {
39 	auto output = AllocAppender!(ubyte[])(alloc);
40 	output.reserve(max_bytes < 64 ? max_bytes : 64);
41 	readLine(stream, output, max_bytes, linesep);
42 	return output.data();
43 }
44 /// ditto
45 void readLine(InputStream, OutputStream)(InputStream stream, OutputStream dst, size_t max_bytes = size_t.max, string linesep = "\r\n")
46 	if (isInputStream!InputStream && isOutputStream!OutputStream)
47 {
48 	import vibe.stream.wrapper;
49 	auto dstrng = StreamOutputRange(dst);
50 	readLine(stream, dstrng, max_bytes, linesep);
51 }
52 /// ditto
53 void readLine(R, InputStream)(InputStream stream, ref R dst, size_t max_bytes = size_t.max, string linesep = "\r\n")
54 	if (isOutputRange!(R, ubyte))
55 {
56 	readUntil(stream, dst, cast(const(ubyte)[])linesep, max_bytes);
57 }
58 
59 @safe unittest {
60 	import vibe.stream.memory : createMemoryOutputStream, createMemoryStream;
61 
62 	auto inp = createMemoryStream(cast(ubyte[])"Hello, World!\r\nThis is a test.\r\nNot a full line.".dup);
63 	assert(inp.readLine() == cast(const(ubyte)[])"Hello, World!");
64 	assert(inp.readLine() == cast(const(ubyte)[])"This is a test.");
65 	assertThrown(inp.readLine);
66 
67 	// start over
68 	inp.seek(0);
69 
70 	// read into an output buffer
71 	auto app = appender!(ubyte[]);
72 	inp.readLine(app);
73 	assert(app.data == cast(const(ubyte)[])"Hello, World!");
74 
75 	// read into an output stream
76 	auto os = createMemoryOutputStream();
77 	inp.readLine(os);
78 	assert(os.data == cast(const(ubyte)[])"This is a test.");
79 }
80 
81 
82 /**
83 	Reads all data of a stream until the specified end marker is detected.
84 
85 	Params:
86 		stream = The input stream which is searched for end_marker
87 		end_marker = The byte sequence which is searched in the stream
88 		max_bytes = An optional limit of how much data is to be read from the
89 			input stream; if the limit is reaached before hitting the end
90 			marker, an exception is thrown.
91 		alloc = An optional allocator that is used to build the result string
92 			in the string variant of this function
93 		dst = The output stream, to which the prefix to the end marker of the
94 			input stream is written
95 
96 	Returns:
97 		The string variant of this function returns the complete prefix to the
98 		end marker of the input stream, excluding the end marker itself.
99 
100 	Throws:
101 		An exception if either the stream end was hit without hitting a marker
102 		first, or if more than max_bytes have been read from the stream in
103 		case of max_bytes != 0.
104 
105 	Remarks:
106 		This function uses an algorithm inspired by the
107 		$(LINK2 http://en.wikipedia.org/wiki/Boyer%E2%80%93Moore_string_search_algorithm,
108 		Boyer-Moore string search algorithm). However, contrary to the original
109 		algorithm, it will scan the whole input string exactly once, without
110 		jumping over portions of it. This allows the algorithm to work with
111 		constant memory requirements and without the memory copies that would
112 		be necessary for streams that do not hold their complete data in
113 		memory.
114 
115 		The current implementation has a run time complexity of O(n*m+m²) and
116 		O(n+m) in typical cases, with n being the length of the scanned input
117 		string and m the length of the marker.
118 */
119 ubyte[] readUntil(InputStream)(InputStream stream, in ubyte[] end_marker, size_t max_bytes = size_t.max, IAllocator alloc = vibeThreadAllocator()) /*@ufcs*/
120 	if (isInputStream!InputStream)
121 {
122 	auto output = AllocAppender!(ubyte[])(alloc);
123 	output.reserve(max_bytes < 64 ? max_bytes : 64);
124 	readUntil(stream, output, end_marker, max_bytes);
125 	return output.data();
126 }
127 /// ditto
128 void readUntil(InputStream, OutputStream)(InputStream stream, OutputStream dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/
129 	if (isInputStream!InputStream && isOutputStream!OutputStream)
130 {
131 	import vibe.stream.wrapper;
132 	auto dstrng = streamOutputRange(dst);
133 	readUntil(stream, dstrng, end_marker, max_bytes);
134 }
135 /// ditto
136 void readUntil(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/
137 	if (isOutputRange!(R, ubyte) && isInputStream!InputStream)
138 {
139 	assert(max_bytes > 0 && end_marker.length > 0);
140 
141 	if (end_marker.length <= 2)
142 		readUntilSmall(stream, dst, end_marker, max_bytes);
143 	else
144 		readUntilGeneric(stream, dst, end_marker, max_bytes);
145 }
146 
147 @safe unittest {
148 	import vibe.stream.memory;
149 
150 	auto text = "1231234123111223123334221111112221231333123123123123123213123111111111114".dup;
151 	auto stream = createMemoryStream(cast(ubyte[])text);
152 	void test(string s, size_t expected) @safe {
153 		stream.seek(0);
154 		auto result = cast(char[])readUntil(stream, cast(const(ubyte)[])s);
155 		assert(result.length == expected, "Wrong result index");
156 		assert(result == text[0 .. result.length], "Wrong result contents: "~result~" vs "~text[0 .. result.length]);
157 		assert(stream.leastSize() == stream.size() - expected - s.length, "Wrong number of bytes left in stream");
158 
159 		stream.seek(0);
160 		auto inp2 = new NoPeekProxy!InputStream(stream);
161 		result = cast(char[])readUntil(inp2, cast(const(ubyte)[])s);
162 		assert(result.length == expected, "Wrong result index");
163 		assert(result == text[0 .. result.length], "Wrong result contents: "~result~" vs "~text[0 .. result.length]);
164 		assert(stream.leastSize() == stream.size() - expected - s.length, "Wrong number of bytes left in stream");
165 	}
166 	foreach( i; 0 .. text.length ){
167 		stream.peekWindow = i;
168 		test("1", 0);
169 		test("2", 1);
170 		test("3", 2);
171 		test("12", 0);
172 		test("23", 1);
173 		test("31", 2);
174 		test("123", 0);
175 		test("231", 1);
176 		test("1231", 0);
177 		test("3123", 2);
178 		test("11223", 11);
179 		test("11222", 28);
180 		test("114", 70);
181 		test("111111111114", 61);
182 	}
183 	// TODO: test
184 }
185 
186 @safe unittest {
187 	import vibe.stream.memory : createMemoryOutputStream, createMemoryStream, MemoryStream;
188 	import vibe.stream.wrapper : ProxyStream;
189 
190 	auto text = cast(ubyte[])"ab\nc\rd\r\ne".dup;
191 	void test(string marker, size_t idx)
192 	{
193 		// code path for peek support
194 		auto inp = createMemoryStream(text);
195 		auto dst = appender!(ubyte[]);
196 		readUntil(inp, dst, cast(const(ubyte)[])marker);
197 		assert(dst.data == text[0 .. idx]);
198 		assert(inp.peek == text[idx+marker.length .. $]);
199 
200 		// code path for no peek support
201 		inp.seek(0);
202 		dst = appender!(ubyte[]);
203 		auto inp2 = new NoPeekProxy!MemoryStream(inp);
204 		readUntil(inp2, dst, cast(const(ubyte)[])marker);
205 		assert(dst.data == text[0 .. idx]);
206 		assert(inp.readAll() == text[idx+marker.length .. $]);
207 	}
208 	test("\r\n", 6);
209 	test("\r", 4);
210 	test("\n", 2);
211 }
212 
213 /**
214 	Reads the complete contents of a stream, optionally limited by max_bytes.
215 
216 	Throws:
217 		An exception is thrown if the stream contains more than max_bytes data.
218 */
219 ubyte[] readAll(InputStream)(InputStream stream, size_t max_bytes = size_t.max, size_t reserve_bytes = 0) /*@ufcs*/
220 	if (isInputStream!InputStream)
221 {
222 	import vibe.internal.freelistref;
223 
224 	if (max_bytes == 0) logDebug("Deprecated behavior: readAll() called with max_bytes==0, use max_bytes==size_t.max instead.");
225 
226 	// prepare output buffer
227 	auto dst = AllocAppender!(ubyte[])(() @trusted { return GCAllocator.instance.allocatorObject; } ());
228 	reserve_bytes = max(reserve_bytes, min(max_bytes, stream.leastSize));
229 	if (reserve_bytes) dst.reserve(reserve_bytes);
230 
231 	size_t n = 0;
232 	while (!stream.empty) {
233 		size_t chunk = min(stream.leastSize, size_t.max);
234 		n += chunk;
235 		enforce(!max_bytes || n <= max_bytes, "Input data too long!");
236 		dst.reserve(chunk);
237 		dst.append((scope buf) {
238 			stream.read(buf[0 .. chunk]);
239 			return chunk;
240 		});
241 	}
242 	return dst.data;
243 }
244 
245 /**
246 	Reads the complete contents of a stream, assuming UTF-8 encoding.
247 
248 	Params:
249 		stream = Specifies the stream from which to read.
250 		sanitize = If true, the input data will not be validated but will instead be made valid UTF-8.
251 		max_bytes = Optional size limit of the data that is read.
252 
253 	Returns:
254 		The full contents of the stream, excluding a possible BOM, are returned as a UTF-8 string.
255 
256 	Throws:
257 		An exception is thrown if max_bytes != 0 and the stream contains more than max_bytes data.
258 		If the sanitize parameter is false and the stream contains invalid UTF-8 code sequences,
259 		a UTFException is thrown.
260 */
261 string readAllUTF8(InputStream)(InputStream stream, bool sanitize = false, size_t max_bytes = size_t.max)
262 	if (isInputStream!InputStream)
263 {
264 	import std.utf;
265 	import vibe.utils.string;
266 	auto data = readAll(stream, max_bytes);
267 	if( sanitize ) return stripUTF8Bom(sanitizeUTF8(data));
268 	else {
269 		auto ret = () @trusted { return cast(string)data; } ();
270 		validate(ret);
271 		return stripUTF8Bom(ret);
272 	}
273 }
274 
275 /**
276 	Pipes a stream to another while keeping the latency within the specified threshold.
277 
278 	Params:
279 		destination = The destination stram to pipe into
280 		source =      The source stream to read data from
281 		nbytes =      Number of bytes to pipe through. The default of zero means to pipe
282 					  the whole input stream.
283 		max_latency = The maximum time before data is flushed to destination. The default value
284 					  of 0 s will flush after each chunk of data read from source.
285 
286 	See_also: OutputStream.write
287 */
288 void pipeRealtime(OutputStream, ConnectionStream)(OutputStream destination, ConnectionStream source, ulong nbytes = 0, Duration max_latency = 0.seconds)
289 	if (isOutputStream!OutputStream && isConnectionStream!ConnectionStream)
290 {
291 	import vibe.internal.freelistref;
292 
293 	static struct Buffer { ubyte[64*1024] bytes = void; }
294 	auto bufferobj = FreeListRef!(Buffer, false)();
295 	auto buffer = bufferobj.bytes[];
296 
297 	//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
298 	auto least_size = source.leastSize;
299 	StopWatch sw;
300 	sw.start();
301 	while (nbytes > 0 || least_size > 0) {
302 		size_t chunk = min(nbytes > 0 ? nbytes : ulong.max, least_size, buffer.length);
303 		assert(chunk > 0, "leastSize returned zero for non-empty stream.");
304 		//logTrace("read pipe chunk %d", chunk);
305 		source.read(buffer[0 .. chunk]);
306 		destination.write(buffer[0 .. chunk]);
307 		if (nbytes > 0) nbytes -= chunk;
308 
309 		auto remaining_latency = max_latency - cast(Duration)sw.peek();
310 		if (remaining_latency > 0.seconds)
311 			source.waitForData(remaining_latency);
312 
313 		if (cast(Duration)sw.peek >= max_latency) {
314 			logTrace("pipeRealtime flushing.");
315 			destination.flush();
316 			sw.reset();
317 		} else {
318 			logTrace("pipeRealtime not flushing.");
319 		}
320 
321 		least_size = source.leastSize;
322 		if (!least_size) {
323 			enforce(nbytes == 0, "Reading past end of input.");
324 			break;
325 		}
326 	}
327 	destination.flush();
328 }
329 
330 /**
331 	Consumes `bytes.length` bytes of the stream and determines if the contents
332 	match up.
333 
334 	Returns: True $(I iff) the consumed bytes equal the passed array.
335 	Throws: Throws an exception if reading from the stream fails.
336 */
337 bool skipBytes(InputStream)(InputStream stream, const(ubyte)[] bytes)
338 	if (isInputStream!InputStream)
339 {
340 	bool matched = true;
341 	ubyte[128] buf = void;
342 	while (bytes.length) {
343 		auto len = min(buf.length, bytes.length);
344 		stream.read(buf[0 .. len], IOMode.all);
345 		if (buf[0 .. len] != bytes[0 .. len]) matched = false;
346 		bytes = bytes[len .. $];
347 	}
348 	return matched;
349 }
350 
351 private struct Buffer { ubyte[64*1024-4] bytes = void; } // 64k - 4 bytes for reference count
352 
353 private void readUntilSmall(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max)
354 	if (isInputStream!InputStream)
355 {
356 	assert(end_marker.length >= 1 && end_marker.length <= 2);
357 
358 	size_t nmatched = 0;
359 	size_t nmarker = end_marker.length;
360 
361 	while (true) {
362 		enforce(!stream.empty, "Reached EOF while searching for end marker.");
363 		enforce(max_bytes > 0, "Reached maximum number of bytes while searching for end marker.");
364 		auto max_peek = max(max_bytes, max_bytes+nmarker); // account for integer overflow
365 		auto pm = stream.peek()[0 .. min($, max_bytes)];
366 		if (!pm.length || nmatched == 1) { // no peek support - inefficient route
367 			ubyte[2] buf = void;
368 			auto l = nmarker - nmatched;
369 			stream.read(buf[0 .. l], IOMode.all);
370 			foreach (i; 0 .. l) {
371 				if (buf[i] == end_marker[nmatched]) {
372 					nmatched++;
373 				} else if (buf[i] == end_marker[0]) {
374 					foreach (j; 0 .. nmatched) dst.put(end_marker[j]);
375 					nmatched = 1;
376 				} else {
377 					foreach (j; 0 .. nmatched) dst.put(end_marker[j]);
378 					nmatched = 0;
379 					dst.put(buf[i]);
380 				}
381 				if (nmatched == nmarker) return;
382 			}
383 		} else {
384 			assert(nmatched == 0);
385 
386 			auto idx = pm.countUntil(end_marker[0]);
387 			if (idx < 0) {
388 				dst.put(pm);
389 				max_bytes -= pm.length;
390 				stream.skip(pm.length);
391 			} else {
392 				dst.put(pm[0 .. idx]);
393 				if (nmarker == 1) {
394 					stream.skip(idx+1);
395 					return;
396 				} else if (idx+1 < pm.length && pm[idx+1] == end_marker[1]) {
397 					assert(nmarker == 2);
398 					stream.skip(idx+2);
399 					return;
400 				} else {
401 					nmatched++;
402 					stream.skip(idx+1);
403 				}
404 			}
405 		}
406 	}
407 }
408 
409 @safe unittest { // issue #1741
410 	static class S : InputStream {
411 		ubyte[] src;
412 		ubyte[] buf;
413 		size_t nread;
414 
415 		this(scope ubyte[] bytes...)
416 		{
417 			src = bytes.dup;
418 		}
419 
420 		@property bool empty() { return nread >= src.length; }
421 		@property ulong leastSize() { if (!buf.length && !nread) buf = src; return src.length - nread; }
422 		@property bool dataAvailableForRead() { return buf.length > 0; }
423 		const(ubyte)[] peek() { return buf; }
424 		size_t read(scope ubyte[] dst, IOMode) {
425 			if (!buf.length) buf = src;
426 			dst[] = buf[0 .. dst.length];
427 			nread += dst.length;
428 			buf = buf[dst.length .. $];
429 			return dst.length;
430 		}
431 		alias InputStream.read read;
432 	}
433 
434 
435 	auto s = new S('X', '\r', '\n');
436 	auto dst = appender!(ubyte[]);
437 	readUntilSmall(s, dst, ['\r', '\n']);
438 	assert(dst.data == ['X']);
439 }
440 
441 
442 private void readUntilGeneric(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/
443 	if (isOutputRange!(R, ubyte) && isInputStream!InputStream)
444 {
445 	// allocate internal jump table to optimize the number of comparisons
446 	size_t[8] nmatchoffsetbuffer = void;
447 	size_t[] nmatchoffset;
448 	if (end_marker.length <= nmatchoffsetbuffer.length) nmatchoffset = nmatchoffsetbuffer[0 .. end_marker.length];
449 	else nmatchoffset = new size_t[end_marker.length];
450 
451 	// precompute the jump table
452 	nmatchoffset[0] = 0;
453 	foreach( i; 1 .. end_marker.length ){
454 		nmatchoffset[i] = i;
455 		foreach_reverse( j; 1 .. i )
456 			if( end_marker[j .. i] == end_marker[0 .. i-j] ){
457 				nmatchoffset[i] = i-j;
458 				break;
459 			}
460 		assert(nmatchoffset[i] > 0 && nmatchoffset[i] <= i);
461 	}
462 
463 	size_t nmatched = 0;
464 	Buffer* bufferobj;
465 	bufferobj = new Buffer;
466 	scope (exit) () @trusted { delete bufferobj; } ();
467 	auto buf = bufferobj.bytes[];
468 
469 	ulong bytes_read = 0;
470 
471 	void skip2(size_t nbytes)
472 	{
473 		bytes_read += nbytes;
474 		stream.skip(nbytes);
475 	}
476 
477 	while( !stream.empty ){
478 		enforce(bytes_read < max_bytes, "Reached byte limit before reaching end marker.");
479 
480 		// try to get as much data as possible, either by peeking into the stream or
481 		// by reading as much as isguaranteed to not exceed the end marker length
482 		// the block size is also always limited by the max_bytes parameter.
483 		size_t nread = 0;
484 		auto least_size = stream.leastSize(); // NOTE: blocks until data is available
485 		auto max_read = max_bytes - bytes_read;
486 		auto str = stream.peek(); // try to get some data for free
487 		if( str.length == 0 ){ // if not, read as much as possible without reading past the end
488 			nread = min(least_size, end_marker.length-nmatched, buf.length, max_read);
489 			stream.read(buf[0 .. nread]);
490 			str = buf[0 .. nread];
491 			bytes_read += nread;
492 		} else if( str.length > max_read ){
493 			str.length = cast(size_t)max_read;
494 		}
495 
496 		// remember how much of the marker was already matched before processing the current block
497 		size_t nmatched_start = nmatched;
498 
499 		// go through the current block trying to match the marker
500 		size_t i = 0;
501 		for (i = 0; i < str.length; i++) {
502 			auto ch = str[i];
503 			// if we have a mismatch, use the jump table to try other possible prefixes
504 			// of the marker
505 			while( nmatched > 0 && ch != end_marker[nmatched] )
506 				nmatched -= nmatchoffset[nmatched];
507 
508 			// if we then have a match, increase the match count and test for full match
509 			if (ch == end_marker[nmatched])
510 				if (++nmatched == end_marker.length) {
511 					i++;
512 					break;
513 				}
514 		}
515 
516 
517 		// write out any false match part of previous blocks
518 		if( nmatched_start > 0 ){
519 			if( nmatched <= i ) () @trusted { dst.put(end_marker[0 .. nmatched_start]); } ();
520 			else () @trusted { dst.put(end_marker[0 .. nmatched_start-nmatched+i]); } ();
521 		}
522 
523 		// write out any unmatched part of the current block
524 		if( nmatched < i ) () @trusted { dst.put(str[0 .. i-nmatched]); } ();
525 
526 		// got a full, match => out
527 		if (nmatched >= end_marker.length) {
528 			// in case of a full match skip data in the stream until the end of
529 			// the marker
530 			skip2(i - nread);
531 			return;
532 		}
533 
534 		// otherwise skip this block in the stream
535 		skip2(str.length - nread);
536 	}
537 
538 	enforce(false, "Reached EOF before reaching end marker.");
539 }
540 
541 private void skip(InputStream)(InputStream str, ulong count)
542 	if (isInputStream!InputStream)
543 {
544 	ubyte[256] buf = void;
545 	while (count > 0) {
546 		auto n = min(buf.length, count);
547 		str.read(buf[0 .. n], IOMode.all);
548 		count -= n;
549 	}
550 }
551 
552 private class NoPeekProxy(InputStream) : ProxyStream
553 	if (isInputStream!InputStream)
554 {
555 	this(InputStream stream)
556 	{
557 		import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy;
558 		super(interfaceProxy!(.InputStream)(stream), InterfaceProxy!OutputStream.init, true);
559 	}
560 
561 	override const(ubyte)[] peek() { return null; }
562 }