1 /**
2 	High level stream manipulation functions.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.operations;
9 
10 public import vibe.core.stream;
11 
12 import vibe.core.log;
13 import vibe.utils.array : AllocAppender;
14 import vibe.internal.allocator;
15 import vibe.internal.freelistref;
16 import vibe.stream.wrapper : ProxyStream;
17 
18 import std.algorithm;
19 import std.array;
20 import std.exception;
21 import std.range : isOutputRange;
22 import core.time : Duration, seconds;
23 
24 
25 /**************************************************************************************************/
26 /* Public functions                                                                               */
27 /**************************************************************************************************/
28 
29 /**
30 	Reads and returns a single line from the stream.
31 
32 	Throws:
33 		An exception if either the stream end was hit without hitting a newline first, or
34 		if more than max_bytes have been read from the stream.
35 */
36 ubyte[] readLine(InputStream)(InputStream stream, size_t max_bytes = size_t.max, string linesep = "\r\n", IAllocator alloc = vibeThreadAllocator()) /*@ufcs*/
37 	if (isInputStream!InputStream)
38 {
39 	auto output = AllocAppender!(ubyte[])(alloc);
40 	output.reserve(max_bytes < 64 ? max_bytes : 64);
41 	readLine(stream, output, max_bytes, linesep);
42 	return output.data();
43 }
44 /// ditto
45 void readLine(InputStream, OutputStream)(InputStream stream, OutputStream dst, size_t max_bytes = size_t.max, string linesep = "\r\n")
46 	if (isInputStream!InputStream && isOutputStream!OutputStream)
47 {
48 	import vibe.stream.wrapper;
49 	auto dstrng = streamOutputRange(dst);
50 	readLine(stream, dstrng, max_bytes, linesep);
51 }
52 /// ditto
53 void readLine(R, InputStream)(InputStream stream, ref R dst, size_t max_bytes = size_t.max, string linesep = "\r\n")
54 	if (isOutputRange!(R, ubyte))
55 {
56 	readUntil(stream, dst, cast(const(ubyte)[])linesep, max_bytes);
57 }
58 
59 @safe unittest {
60 	import vibe.stream.memory : createMemoryOutputStream, createMemoryStream;
61 
62 	auto inp = createMemoryStream(cast(ubyte[])"Hello, World!\r\nThis is a test.\r\nNot a full line.".dup);
63 	assert(inp.readLine() == cast(const(ubyte)[])"Hello, World!");
64 	assert(inp.readLine() == cast(const(ubyte)[])"This is a test.");
65 	assertThrown(inp.readLine);
66 
67 	// start over
68 	inp.seek(0);
69 
70 	// read into an output buffer
71 	auto app = appender!(ubyte[]);
72 	inp.readLine(app);
73 	assert(app.data == cast(const(ubyte)[])"Hello, World!");
74 
75 	// read into an output stream
76 	auto os = createMemoryOutputStream();
77 	inp.readLine(os);
78 	assert(os.data == cast(const(ubyte)[])"This is a test.");
79 }
80 
81 
82 /**
83 	Reads all data of a stream until the specified end marker is detected.
84 
85 	Params:
86 		stream = The input stream which is searched for end_marker
87 		end_marker = The byte sequence which is searched in the stream
88 		max_bytes = An optional limit of how much data is to be read from the
89 			input stream; if the limit is reaached before hitting the end
90 			marker, an exception is thrown.
91 		alloc = An optional allocator that is used to build the result string
92 			in the string variant of this function
93 		dst = The output stream, to which the prefix to the end marker of the
94 			input stream is written
95 
96 	Returns:
97 		The string variant of this function returns the complete prefix to the
98 		end marker of the input stream, excluding the end marker itself.
99 
100 	Throws:
101 		An exception if either the stream end was hit without hitting a marker
102 		first, or if more than max_bytes have been read from the stream in
103 		case of max_bytes != 0.
104 
105 	Remarks:
106 		This function uses an algorithm inspired by the
107 		$(LINK2 http://en.wikipedia.org/wiki/Boyer%E2%80%93Moore_string_search_algorithm,
108 		Boyer-Moore string search algorithm). However, contrary to the original
109 		algorithm, it will scan the whole input string exactly once, without
110 		jumping over portions of it. This allows the algorithm to work with
111 		constant memory requirements and without the memory copies that would
112 		be necessary for streams that do not hold their complete data in
113 		memory.
114 
115 		The current implementation has a run time complexity of O(n*m+m²) and
116 		O(n+m) in typical cases, with n being the length of the scanned input
117 		string and m the length of the marker.
118 */
119 ubyte[] readUntil(InputStream)(InputStream stream, in ubyte[] end_marker, size_t max_bytes = size_t.max, IAllocator alloc = vibeThreadAllocator()) /*@ufcs*/
120 	if (isInputStream!InputStream)
121 {
122 	auto output = AllocAppender!(ubyte[])(alloc);
123 	output.reserve(max_bytes < 64 ? max_bytes : 64);
124 	readUntil(stream, output, end_marker, max_bytes);
125 	return output.data();
126 }
127 /// ditto
128 void readUntil(InputStream, OutputStream)(InputStream stream, OutputStream dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/
129 	if (isInputStream!InputStream && isOutputStream!OutputStream)
130 {
131 	import vibe.stream.wrapper;
132 	auto dstrng = streamOutputRange(dst);
133 	readUntil(stream, dstrng, end_marker, max_bytes);
134 }
135 /// ditto
136 void readUntil(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/
137 	if (isOutputRange!(R, ubyte) && isInputStream!InputStream)
138 {
139 	assert(max_bytes > 0 && end_marker.length > 0);
140 
141 	if (end_marker.length <= 2)
142 		readUntilSmall(stream, dst, end_marker, max_bytes);
143 	else
144 		readUntilGeneric(stream, dst, end_marker, max_bytes);
145 }
146 
147 @safe unittest {
148 	import vibe.stream.memory;
149 
150 	auto text = "1231234123111223123334221111112221231333123123123123123213123111111111114".dup;
151 	auto stream = createMemoryStream(cast(ubyte[])text);
152 	void test(string s, size_t expected) @safe {
153 		stream.seek(0);
154 		auto result = cast(char[])readUntil(stream, cast(const(ubyte)[])s);
155 		assert(result.length == expected, "Wrong result index");
156 		assert(result == text[0 .. result.length], "Wrong result contents: "~result~" vs "~text[0 .. result.length]);
157 		assert(stream.leastSize() == stream.size() - expected - s.length, "Wrong number of bytes left in stream");
158 
159 		stream.seek(0);
160 		auto inp2 = new NoPeekProxy!InputStream(stream);
161 		result = cast(char[])readUntil(inp2, cast(const(ubyte)[])s);
162 		assert(result.length == expected, "Wrong result index");
163 		assert(result == text[0 .. result.length], "Wrong result contents: "~result~" vs "~text[0 .. result.length]);
164 		assert(stream.leastSize() == stream.size() - expected - s.length, "Wrong number of bytes left in stream");
165 	}
166 	foreach( i; 0 .. text.length ){
167 		stream.peekWindow = i;
168 		test("1", 0);
169 		test("2", 1);
170 		test("3", 2);
171 		test("12", 0);
172 		test("23", 1);
173 		test("31", 2);
174 		test("123", 0);
175 		test("231", 1);
176 		test("1231", 0);
177 		test("3123", 2);
178 		test("11223", 11);
179 		test("11222", 28);
180 		test("114", 70);
181 		test("111111111114", 61);
182 	}
183 	// TODO: test
184 }
185 
186 @safe unittest {
187 	import vibe.stream.memory : createMemoryOutputStream, createMemoryStream, MemoryStream;
188 	import vibe.stream.wrapper : ProxyStream;
189 
190 	auto text = cast(ubyte[])"ab\nc\rd\r\ne".dup;
191 	void test(string marker, size_t idx)
192 	{
193 		// code path for peek support
194 		auto inp = createMemoryStream(text);
195 		auto dst = appender!(ubyte[]);
196 		readUntil(inp, dst, cast(const(ubyte)[])marker);
197 		assert(dst.data == text[0 .. idx]);
198 		assert(inp.peek == text[idx+marker.length .. $]);
199 
200 		// code path for no peek support
201 		inp.seek(0);
202 		dst = appender!(ubyte[]);
203 		auto inp2 = new NoPeekProxy!MemoryStream(inp);
204 		readUntil(inp2, dst, cast(const(ubyte)[])marker);
205 		assert(dst.data == text[0 .. idx]);
206 		assert(inp.readAll() == text[idx+marker.length .. $]);
207 	}
208 	test("\r\n", 6);
209 	test("\r", 4);
210 	test("\n", 2);
211 }
212 
213 /**
214 	Reads the complete contents of a stream, optionally limited by max_bytes.
215 
216 	Throws:
217 		An exception is thrown if the stream contains more than max_bytes data.
218 */
219 ubyte[] readAll(InputStream)(InputStream stream, size_t max_bytes = size_t.max, size_t reserve_bytes = 0) /*@ufcs*/
220 	if (isInputStream!InputStream)
221 {
222 	import vibe.internal.freelistref;
223 
224 	if (max_bytes == 0) logDebug("Deprecated behavior: readAll() called with max_bytes==0, use max_bytes==size_t.max instead.");
225 
226 	// prepare output buffer
227 	auto dst = AllocAppender!(ubyte[])(() @trusted { return GCAllocator.instance.allocatorObject; } ());
228 	reserve_bytes = max(reserve_bytes, min(max_bytes, stream.leastSize));
229 	if (reserve_bytes) dst.reserve(reserve_bytes);
230 
231 	size_t n = 0;
232 	while (!stream.empty) {
233 		size_t chunk = min(stream.leastSize, size_t.max);
234 		n += chunk;
235 		enforce(!max_bytes || n <= max_bytes, "Input data too long!");
236 		dst.reserve(chunk);
237 		dst.append((scope buf) {
238 			stream.read(buf[0 .. chunk]);
239 			return chunk;
240 		});
241 	}
242 	return dst.data;
243 }
244 
245 /**
246 	Reads the complete contents of a stream, assuming UTF-8 encoding.
247 
248 	Params:
249 		stream = Specifies the stream from which to read.
250 		sanitize = If true, the input data will not be validated but will instead be made valid UTF-8.
251 		max_bytes = Optional size limit of the data that is read.
252 
253 	Returns:
254 		The full contents of the stream, excluding a possible BOM, are returned as a UTF-8 string.
255 
256 	Throws:
257 		An exception is thrown if max_bytes != 0 and the stream contains more than max_bytes data.
258 		If the sanitize parameter is false and the stream contains invalid UTF-8 code sequences,
259 		a UTFException is thrown.
260 */
261 string readAllUTF8(InputStream)(InputStream stream, bool sanitize = false, size_t max_bytes = size_t.max)
262 	if (isInputStream!InputStream)
263 {
264 	import std.utf;
265 	import vibe.utils.string;
266 	auto data = readAll(stream, max_bytes);
267 	if( sanitize ) return stripUTF8Bom(sanitizeUTF8(data));
268 	else {
269 		auto ret = () @trusted { return cast(string)data; } ();
270 		validate(ret);
271 		return stripUTF8Bom(ret);
272 	}
273 }
274 
275 /**
276 	Pipes a stream to another while keeping the latency within the specified threshold.
277 
278 	Params:
279 		destination = The destination stram to pipe into
280 		source =      The source stream to read data from
281 		nbytes =      Number of bytes to pipe through. The default of zero means to pipe
282 					  the whole input stream.
283 		max_latency = The maximum time before data is flushed to destination. The default value
284 					  of 0 s will flush after each chunk of data read from source.
285 
286 	See_also: OutputStream.write
287 */
288 void pipeRealtime(OutputStream, ConnectionStream)(OutputStream destination, ConnectionStream source, ulong nbytes = 0, Duration max_latency = 0.seconds)
289 	if (isOutputStream!OutputStream && isConnectionStream!ConnectionStream)
290 {
291 	import std.datetime.stopwatch : StopWatch;
292 	import vibe.internal.freelistref;
293 
294 	static struct Buffer { ubyte[64*1024] bytes = void; }
295 	auto bufferobj = FreeListRef!(Buffer, false)();
296 	auto buffer = bufferobj.bytes[];
297 
298 	//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
299 	auto least_size = source.leastSize;
300 	StopWatch sw;
301 	sw.start();
302 	while (nbytes > 0 || least_size > 0) {
303 		size_t chunk = min(nbytes > 0 ? nbytes : ulong.max, least_size, buffer.length);
304 		assert(chunk > 0, "leastSize returned zero for non-empty stream.");
305 		//logTrace("read pipe chunk %d", chunk);
306 		source.read(buffer[0 .. chunk]);
307 		destination.write(buffer[0 .. chunk]);
308 		if (nbytes > 0) nbytes -= chunk;
309 
310 		auto remaining_latency = max_latency - cast(Duration)sw.peek();
311 		if (remaining_latency > 0.seconds)
312 			source.waitForData(remaining_latency);
313 
314 		if (cast(Duration)sw.peek >= max_latency) {
315 			logTrace("pipeRealtime flushing.");
316 			destination.flush();
317 			sw.reset();
318 		} else {
319 			logTrace("pipeRealtime not flushing.");
320 		}
321 
322 		least_size = source.leastSize;
323 		if (!least_size) {
324 			enforce(nbytes == 0, "Reading past end of input.");
325 			break;
326 		}
327 	}
328 	destination.flush();
329 }
330 
331 unittest {
332 	import vibe.core.net : TCPConnection;
333 	import vibe.core.stream : nullSink;
334 
335 	void test()
336 	{
337 		TCPConnection c;
338 		pipeRealtime(nullSink, c);
339 	}
340 }
341 
342 
343 /**
344 	Consumes `bytes.length` bytes of the stream and determines if the contents
345 	match up.
346 
347 	Returns: True $(I iff) the consumed bytes equal the passed array.
348 	Throws: Throws an exception if reading from the stream fails.
349 */
350 bool skipBytes(InputStream)(InputStream stream, const(ubyte)[] bytes)
351 	if (isInputStream!InputStream)
352 {
353 	bool matched = true;
354 	ubyte[128] buf = void;
355 	while (bytes.length) {
356 		auto len = min(buf.length, bytes.length);
357 		stream.read(buf[0 .. len], IOMode.all);
358 		if (buf[0 .. len] != bytes[0 .. len]) matched = false;
359 		bytes = bytes[len .. $];
360 	}
361 	return matched;
362 }
363 
364 private struct Buffer { ubyte[64*1024-4] bytes = void; } // 64k - 4 bytes for reference count
365 
366 private void readUntilSmall(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max)
367 	if (isInputStream!InputStream)
368 {
369 	assert(end_marker.length >= 1 && end_marker.length <= 2);
370 
371 	size_t nmatched = 0;
372 	size_t nmarker = end_marker.length;
373 
374 	while (true) {
375 		enforce(!stream.empty, "Reached EOF while searching for end marker.");
376 		enforce(max_bytes > 0, "Reached maximum number of bytes while searching for end marker.");
377 		auto max_peek = max(max_bytes, max_bytes+nmarker); // account for integer overflow
378 		auto pm = stream.peek()[0 .. min($, max_bytes)];
379 		if (!pm.length || nmatched == 1) { // no peek support - inefficient route
380 			ubyte[2] buf = void;
381 			auto l = nmarker - nmatched;
382 			stream.read(buf[0 .. l], IOMode.all);
383 			foreach (i; 0 .. l) {
384 				if (buf[i] == end_marker[nmatched]) {
385 					nmatched++;
386 				} else if (buf[i] == end_marker[0]) {
387 					foreach (j; 0 .. nmatched) dst.put(end_marker[j]);
388 					nmatched = 1;
389 				} else {
390 					foreach (j; 0 .. nmatched) dst.put(end_marker[j]);
391 					nmatched = 0;
392 					dst.put(buf[i]);
393 				}
394 				if (nmatched == nmarker) return;
395 			}
396 		} else {
397 			assert(nmatched == 0);
398 
399 			auto idx = pm.countUntil(end_marker[0]);
400 			if (idx < 0) {
401 				dst.put(pm);
402 				max_bytes -= pm.length;
403 				stream.skip(pm.length);
404 			} else {
405 				dst.put(pm[0 .. idx]);
406 				if (nmarker == 1) {
407 					stream.skip(idx+1);
408 					return;
409 				} else if (idx+1 < pm.length && pm[idx+1] == end_marker[1]) {
410 					assert(nmarker == 2);
411 					stream.skip(idx+2);
412 					return;
413 				} else {
414 					nmatched++;
415 					stream.skip(idx+1);
416 				}
417 			}
418 		}
419 	}
420 }
421 
422 @safe unittest { // issue #1741
423 	static class S : InputStream {
424 		ubyte[] src;
425 		ubyte[] buf;
426 		size_t nread;
427 
428 		this(scope ubyte[] bytes...)
429 		{
430 			src = bytes.dup;
431 		}
432 
433 		@property bool empty() { return nread >= src.length; }
434 		@property ulong leastSize() { if (!buf.length && !nread) buf = src; return src.length - nread; }
435 		@property bool dataAvailableForRead() { return buf.length > 0; }
436 		const(ubyte)[] peek() { return buf; }
437 		size_t read(scope ubyte[] dst, IOMode) {
438 			if (!buf.length) buf = src;
439 			dst[] = buf[0 .. dst.length];
440 			nread += dst.length;
441 			buf = buf[dst.length .. $];
442 			return dst.length;
443 		}
444 		alias InputStream.read read;
445 	}
446 
447 
448 	auto s = new S('X', '\r', '\n');
449 	auto dst = appender!(ubyte[]);
450 	readUntilSmall(s, dst, ['\r', '\n']);
451 	assert(dst.data == ['X']);
452 }
453 
454 
455 private void readUntilGeneric(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/
456 	if (isOutputRange!(R, ubyte) && isInputStream!InputStream)
457 {
458 	// allocate internal jump table to optimize the number of comparisons
459 	size_t[8] nmatchoffsetbuffer = void;
460 	size_t[] nmatchoffset;
461 	if (end_marker.length <= nmatchoffsetbuffer.length) nmatchoffset = nmatchoffsetbuffer[0 .. end_marker.length];
462 	else nmatchoffset = new size_t[end_marker.length];
463 
464 	// precompute the jump table
465 	nmatchoffset[0] = 0;
466 	foreach( i; 1 .. end_marker.length ){
467 		nmatchoffset[i] = i;
468 		foreach_reverse( j; 1 .. i )
469 			if( end_marker[j .. i] == end_marker[0 .. i-j] ){
470 				nmatchoffset[i] = i-j;
471 				break;
472 			}
473 		assert(nmatchoffset[i] > 0 && nmatchoffset[i] <= i);
474 	}
475 
476 	size_t nmatched = 0;
477 	Buffer* bufferobj;
478 	bufferobj = new Buffer;
479 	scope (exit) () @trusted {
480 		import core.memory : __delete;
481 		__delete(bufferobj);
482 	} ();
483 	auto buf = bufferobj.bytes[];
484 
485 	ulong bytes_read = 0;
486 
487 	void skip2(size_t nbytes)
488 	{
489 		bytes_read += nbytes;
490 		stream.skip(nbytes);
491 	}
492 
493 	while( !stream.empty ){
494 		enforce(bytes_read < max_bytes, "Reached byte limit before reaching end marker.");
495 
496 		// try to get as much data as possible, either by peeking into the stream or
497 		// by reading as much as isguaranteed to not exceed the end marker length
498 		// the block size is also always limited by the max_bytes parameter.
499 		size_t nread = 0;
500 		auto least_size = stream.leastSize(); // NOTE: blocks until data is available
501 		auto max_read = max_bytes - bytes_read;
502 		auto str = stream.peek(); // try to get some data for free
503 		if( str.length == 0 ){ // if not, read as much as possible without reading past the end
504 			nread = min(least_size, end_marker.length-nmatched, buf.length, max_read);
505 			stream.read(buf[0 .. nread]);
506 			str = buf[0 .. nread];
507 			bytes_read += nread;
508 		} else if( str.length > max_read ){
509 			str.length = cast(size_t)max_read;
510 		}
511 
512 		// remember how much of the marker was already matched before processing the current block
513 		size_t nmatched_start = nmatched;
514 
515 		// go through the current block trying to match the marker
516 		size_t i = 0;
517 		for (i = 0; i < str.length; i++) {
518 			auto ch = str[i];
519 			// if we have a mismatch, use the jump table to try other possible prefixes
520 			// of the marker
521 			while( nmatched > 0 && ch != end_marker[nmatched] )
522 				nmatched -= nmatchoffset[nmatched];
523 
524 			// if we then have a match, increase the match count and test for full match
525 			if (ch == end_marker[nmatched])
526 				if (++nmatched == end_marker.length) {
527 					i++;
528 					break;
529 				}
530 		}
531 
532 
533 		// write out any false match part of previous blocks
534 		if( nmatched_start > 0 ){
535 			if( nmatched <= i ) () @trusted { dst.put(end_marker[0 .. nmatched_start]); } ();
536 			else () @trusted { dst.put(end_marker[0 .. nmatched_start-nmatched+i]); } ();
537 		}
538 
539 		// write out any unmatched part of the current block
540 		if( nmatched < i ) () @trusted { dst.put(str[0 .. i-nmatched]); } ();
541 
542 		// got a full, match => out
543 		if (nmatched >= end_marker.length) {
544 			// in case of a full match skip data in the stream until the end of
545 			// the marker
546 			skip2(i - nread);
547 			return;
548 		}
549 
550 		// otherwise skip this block in the stream
551 		skip2(str.length - nread);
552 	}
553 
554 	enforce(false, "Reached EOF before reaching end marker.");
555 }
556 
557 private void skip(InputStream)(InputStream str, ulong count)
558 	if (isInputStream!InputStream)
559 {
560 	ubyte[256] buf = void;
561 	while (count > 0) {
562 		auto n = min(buf.length, count);
563 		str.read(buf[0 .. n], IOMode.all);
564 		count -= n;
565 	}
566 }
567 
568 private class NoPeekProxy(InputStream) : ProxyStream
569 	if (isInputStream!InputStream)
570 {
571 	this(InputStream stream)
572 	{
573 		import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy;
574 		super(interfaceProxy!(.InputStream)(stream), InterfaceProxy!OutputStream.init, true);
575 	}
576 
577 	override const(ubyte)[] peek() { return null; }
578 }