1 /** 2 High level stream manipulation functions. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.operations; 9 10 public import vibe.core.stream; 11 12 import vibe.core.log; 13 import vibe.utils.array : AllocAppender; 14 import vibe.internal.allocator; 15 import vibe.internal.freelistref; 16 import vibe.stream.wrapper : ProxyStream; 17 18 import std.algorithm; 19 import std.array; 20 import std.exception; 21 import std.range : isOutputRange; 22 import core.time : Duration, seconds; 23 24 25 /**************************************************************************************************/ 26 /* Public functions */ 27 /**************************************************************************************************/ 28 29 /** 30 Reads and returns a single line from the stream. 31 32 Throws: 33 An exception if either the stream end was hit without hitting a newline first, or 34 if more than max_bytes have been read from the stream. 35 */ 36 ubyte[] readLine(InputStream)(InputStream stream, size_t max_bytes = size_t.max, string linesep = "\r\n", IAllocator alloc = vibeThreadAllocator()) /*@ufcs*/ 37 if (isInputStream!InputStream) 38 { 39 auto output = AllocAppender!(ubyte[])(alloc); 40 output.reserve(max_bytes < 64 ? max_bytes : 64); 41 readLine(stream, output, max_bytes, linesep); 42 return output.data(); 43 } 44 /// ditto 45 void readLine(InputStream, OutputStream)(InputStream stream, OutputStream dst, size_t max_bytes = size_t.max, string linesep = "\r\n") 46 if (isInputStream!InputStream && isOutputStream!OutputStream) 47 { 48 import vibe.stream.wrapper; 49 auto dstrng = streamOutputRange(dst); 50 readLine(stream, dstrng, max_bytes, linesep); 51 } 52 /// ditto 53 void readLine(R, InputStream)(InputStream stream, ref R dst, size_t max_bytes = size_t.max, string linesep = "\r\n") 54 if (isOutputRange!(R, ubyte)) 55 { 56 readUntil(stream, dst, cast(const(ubyte)[])linesep, max_bytes); 57 } 58 59 @safe unittest { 60 import vibe.stream.memory : createMemoryOutputStream, createMemoryStream; 61 62 auto inp = createMemoryStream(cast(ubyte[])"Hello, World!\r\nThis is a test.\r\nNot a full line.".dup); 63 assert(inp.readLine() == cast(const(ubyte)[])"Hello, World!"); 64 assert(inp.readLine() == cast(const(ubyte)[])"This is a test."); 65 assertThrown(inp.readLine); 66 67 // start over 68 inp.seek(0); 69 70 // read into an output buffer 71 auto app = appender!(ubyte[]); 72 inp.readLine(app); 73 assert(app.data == cast(const(ubyte)[])"Hello, World!"); 74 75 // read into an output stream 76 auto os = createMemoryOutputStream(); 77 inp.readLine(os); 78 assert(os.data == cast(const(ubyte)[])"This is a test."); 79 } 80 81 82 /** 83 Reads all data of a stream until the specified end marker is detected. 84 85 Params: 86 stream = The input stream which is searched for end_marker 87 end_marker = The byte sequence which is searched in the stream 88 max_bytes = An optional limit of how much data is to be read from the 89 input stream; if the limit is reaached before hitting the end 90 marker, an exception is thrown. 91 alloc = An optional allocator that is used to build the result string 92 in the string variant of this function 93 dst = The output stream, to which the prefix to the end marker of the 94 input stream is written 95 96 Returns: 97 The string variant of this function returns the complete prefix to the 98 end marker of the input stream, excluding the end marker itself. 99 100 Throws: 101 An exception if either the stream end was hit without hitting a marker 102 first, or if more than max_bytes have been read from the stream in 103 case of max_bytes != 0. 104 105 Remarks: 106 This function uses an algorithm inspired by the 107 $(LINK2 http://en.wikipedia.org/wiki/Boyer%E2%80%93Moore_string_search_algorithm, 108 Boyer-Moore string search algorithm). However, contrary to the original 109 algorithm, it will scan the whole input string exactly once, without 110 jumping over portions of it. This allows the algorithm to work with 111 constant memory requirements and without the memory copies that would 112 be necessary for streams that do not hold their complete data in 113 memory. 114 115 The current implementation has a run time complexity of O(n*m+m²) and 116 O(n+m) in typical cases, with n being the length of the scanned input 117 string and m the length of the marker. 118 */ 119 ubyte[] readUntil(InputStream)(InputStream stream, in ubyte[] end_marker, size_t max_bytes = size_t.max, IAllocator alloc = vibeThreadAllocator()) /*@ufcs*/ 120 if (isInputStream!InputStream) 121 { 122 auto output = AllocAppender!(ubyte[])(alloc); 123 output.reserve(max_bytes < 64 ? max_bytes : 64); 124 readUntil(stream, output, end_marker, max_bytes); 125 return output.data(); 126 } 127 /// ditto 128 void readUntil(InputStream, OutputStream)(InputStream stream, OutputStream dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/ 129 if (isInputStream!InputStream && isOutputStream!OutputStream) 130 { 131 import vibe.stream.wrapper; 132 auto dstrng = streamOutputRange(dst); 133 readUntil(stream, dstrng, end_marker, max_bytes); 134 } 135 /// ditto 136 void readUntil(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/ 137 if (isOutputRange!(R, ubyte) && isInputStream!InputStream) 138 { 139 assert(max_bytes > 0 && end_marker.length > 0); 140 141 if (end_marker.length <= 2) 142 readUntilSmall(stream, dst, end_marker, max_bytes); 143 else 144 readUntilGeneric(stream, dst, end_marker, max_bytes); 145 } 146 147 @safe unittest { 148 import vibe.stream.memory; 149 150 auto text = "1231234123111223123334221111112221231333123123123123123213123111111111114".dup; 151 auto stream = createMemoryStream(cast(ubyte[])text); 152 void test(string s, size_t expected) @safe { 153 stream.seek(0); 154 auto result = cast(char[])readUntil(stream, cast(const(ubyte)[])s); 155 assert(result.length == expected, "Wrong result index"); 156 assert(result == text[0 .. result.length], "Wrong result contents: "~result~" vs "~text[0 .. result.length]); 157 assert(stream.leastSize() == stream.size() - expected - s.length, "Wrong number of bytes left in stream"); 158 159 stream.seek(0); 160 auto inp2 = new NoPeekProxy!InputStream(stream); 161 result = cast(char[])readUntil(inp2, cast(const(ubyte)[])s); 162 assert(result.length == expected, "Wrong result index"); 163 assert(result == text[0 .. result.length], "Wrong result contents: "~result~" vs "~text[0 .. result.length]); 164 assert(stream.leastSize() == stream.size() - expected - s.length, "Wrong number of bytes left in stream"); 165 } 166 foreach( i; 0 .. text.length ){ 167 stream.peekWindow = i; 168 test("1", 0); 169 test("2", 1); 170 test("3", 2); 171 test("12", 0); 172 test("23", 1); 173 test("31", 2); 174 test("123", 0); 175 test("231", 1); 176 test("1231", 0); 177 test("3123", 2); 178 test("11223", 11); 179 test("11222", 28); 180 test("114", 70); 181 test("111111111114", 61); 182 } 183 // TODO: test 184 } 185 186 @safe unittest { 187 import vibe.stream.memory : createMemoryOutputStream, createMemoryStream, MemoryStream; 188 import vibe.stream.wrapper : ProxyStream; 189 190 auto text = cast(ubyte[])"ab\nc\rd\r\ne".dup; 191 void test(string marker, size_t idx) 192 { 193 // code path for peek support 194 auto inp = createMemoryStream(text); 195 auto dst = appender!(ubyte[]); 196 readUntil(inp, dst, cast(const(ubyte)[])marker); 197 assert(dst.data == text[0 .. idx]); 198 assert(inp.peek == text[idx+marker.length .. $]); 199 200 // code path for no peek support 201 inp.seek(0); 202 dst = appender!(ubyte[]); 203 auto inp2 = new NoPeekProxy!MemoryStream(inp); 204 readUntil(inp2, dst, cast(const(ubyte)[])marker); 205 assert(dst.data == text[0 .. idx]); 206 assert(inp.readAll() == text[idx+marker.length .. $]); 207 } 208 test("\r\n", 6); 209 test("\r", 4); 210 test("\n", 2); 211 } 212 213 /** 214 Reads the complete contents of a stream, optionally limited by max_bytes. 215 216 Throws: 217 An exception is thrown if the stream contains more than max_bytes data. 218 */ 219 ubyte[] readAll(InputStream)(InputStream stream, size_t max_bytes = size_t.max, size_t reserve_bytes = 0) /*@ufcs*/ 220 if (isInputStream!InputStream) 221 { 222 import vibe.internal.freelistref; 223 224 if (max_bytes == 0) logDebug("Deprecated behavior: readAll() called with max_bytes==0, use max_bytes==size_t.max instead."); 225 226 // prepare output buffer 227 auto dst = AllocAppender!(ubyte[])(() @trusted { return GCAllocator.instance.allocatorObject; } ()); 228 reserve_bytes = max(reserve_bytes, min(max_bytes, stream.leastSize)); 229 if (reserve_bytes) dst.reserve(reserve_bytes); 230 231 size_t n = 0; 232 while (!stream.empty) { 233 size_t chunk = min(stream.leastSize, size_t.max); 234 n += chunk; 235 enforce(!max_bytes || n <= max_bytes, "Input data too long!"); 236 dst.reserve(chunk); 237 dst.append((scope buf) { 238 stream.read(buf[0 .. chunk]); 239 return chunk; 240 }); 241 } 242 return dst.data; 243 } 244 245 /** 246 Reads the complete contents of a stream, assuming UTF-8 encoding. 247 248 Params: 249 stream = Specifies the stream from which to read. 250 sanitize = If true, the input data will not be validated but will instead be made valid UTF-8. 251 max_bytes = Optional size limit of the data that is read. 252 253 Returns: 254 The full contents of the stream, excluding a possible BOM, are returned as a UTF-8 string. 255 256 Throws: 257 An exception is thrown if max_bytes != 0 and the stream contains more than max_bytes data. 258 If the sanitize parameter is false and the stream contains invalid UTF-8 code sequences, 259 a UTFException is thrown. 260 */ 261 string readAllUTF8(InputStream)(InputStream stream, bool sanitize = false, size_t max_bytes = size_t.max) 262 if (isInputStream!InputStream) 263 { 264 import std.utf; 265 import vibe.utils.string; 266 auto data = readAll(stream, max_bytes); 267 if( sanitize ) return stripUTF8Bom(sanitizeUTF8(data)); 268 else { 269 auto ret = () @trusted { return cast(string)data; } (); 270 validate(ret); 271 return stripUTF8Bom(ret); 272 } 273 } 274 275 /** 276 Pipes a stream to another while keeping the latency within the specified threshold. 277 278 Params: 279 destination = The destination stram to pipe into 280 source = The source stream to read data from 281 nbytes = Number of bytes to pipe through. The default of zero means to pipe 282 the whole input stream. 283 max_latency = The maximum time before data is flushed to destination. The default value 284 of 0 s will flush after each chunk of data read from source. 285 286 See_also: OutputStream.write 287 */ 288 void pipeRealtime(OutputStream, ConnectionStream)(OutputStream destination, ConnectionStream source, ulong nbytes = 0, Duration max_latency = 0.seconds) 289 if (isOutputStream!OutputStream && isConnectionStream!ConnectionStream) 290 { 291 import std.datetime.stopwatch : StopWatch; 292 import vibe.internal.freelistref; 293 294 static struct Buffer { ubyte[64*1024] bytes = void; } 295 auto bufferobj = FreeListRef!(Buffer, false)(); 296 auto buffer = bufferobj.bytes[]; 297 298 //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); 299 auto least_size = source.leastSize; 300 StopWatch sw; 301 sw.start(); 302 while (nbytes > 0 || least_size > 0) { 303 size_t chunk = min(nbytes > 0 ? nbytes : ulong.max, least_size, buffer.length); 304 assert(chunk > 0, "leastSize returned zero for non-empty stream."); 305 //logTrace("read pipe chunk %d", chunk); 306 source.read(buffer[0 .. chunk]); 307 destination.write(buffer[0 .. chunk]); 308 if (nbytes > 0) nbytes -= chunk; 309 310 auto remaining_latency = max_latency - cast(Duration)sw.peek(); 311 if (remaining_latency > 0.seconds) 312 source.waitForData(remaining_latency); 313 314 if (cast(Duration)sw.peek >= max_latency) { 315 logTrace("pipeRealtime flushing."); 316 destination.flush(); 317 sw.reset(); 318 } else { 319 logTrace("pipeRealtime not flushing."); 320 } 321 322 least_size = source.leastSize; 323 if (!least_size) { 324 enforce(nbytes == 0, "Reading past end of input."); 325 break; 326 } 327 } 328 destination.flush(); 329 } 330 331 unittest { 332 import vibe.core.net : TCPConnection; 333 import vibe.core.stream : nullSink; 334 335 void test() 336 { 337 TCPConnection c; 338 pipeRealtime(nullSink, c); 339 } 340 } 341 342 343 /** 344 Consumes `bytes.length` bytes of the stream and determines if the contents 345 match up. 346 347 Returns: True $(I iff) the consumed bytes equal the passed array. 348 Throws: Throws an exception if reading from the stream fails. 349 */ 350 bool skipBytes(InputStream)(InputStream stream, const(ubyte)[] bytes) 351 if (isInputStream!InputStream) 352 { 353 bool matched = true; 354 ubyte[128] buf = void; 355 while (bytes.length) { 356 auto len = min(buf.length, bytes.length); 357 stream.read(buf[0 .. len], IOMode.all); 358 if (buf[0 .. len] != bytes[0 .. len]) matched = false; 359 bytes = bytes[len .. $]; 360 } 361 return matched; 362 } 363 364 private struct Buffer { ubyte[64*1024-4] bytes = void; } // 64k - 4 bytes for reference count 365 366 private void readUntilSmall(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) 367 if (isInputStream!InputStream) 368 { 369 assert(end_marker.length >= 1 && end_marker.length <= 2); 370 371 size_t nmatched = 0; 372 size_t nmarker = end_marker.length; 373 374 while (true) { 375 enforce(!stream.empty, "Reached EOF while searching for end marker."); 376 enforce(max_bytes > 0, "Reached maximum number of bytes while searching for end marker."); 377 auto max_peek = max(max_bytes, max_bytes+nmarker); // account for integer overflow 378 auto pm = stream.peek()[0 .. min($, max_bytes)]; 379 if (!pm.length || nmatched == 1) { // no peek support - inefficient route 380 ubyte[2] buf = void; 381 auto l = nmarker - nmatched; 382 stream.read(buf[0 .. l], IOMode.all); 383 foreach (i; 0 .. l) { 384 if (buf[i] == end_marker[nmatched]) { 385 nmatched++; 386 } else if (buf[i] == end_marker[0]) { 387 foreach (j; 0 .. nmatched) dst.put(end_marker[j]); 388 nmatched = 1; 389 } else { 390 foreach (j; 0 .. nmatched) dst.put(end_marker[j]); 391 nmatched = 0; 392 dst.put(buf[i]); 393 } 394 if (nmatched == nmarker) return; 395 } 396 } else { 397 assert(nmatched == 0); 398 399 auto idx = pm.countUntil(end_marker[0]); 400 if (idx < 0) { 401 dst.put(pm); 402 max_bytes -= pm.length; 403 stream.skip(pm.length); 404 } else { 405 dst.put(pm[0 .. idx]); 406 if (nmarker == 1) { 407 stream.skip(idx+1); 408 return; 409 } else if (idx+1 < pm.length && pm[idx+1] == end_marker[1]) { 410 assert(nmarker == 2); 411 stream.skip(idx+2); 412 return; 413 } else { 414 nmatched++; 415 stream.skip(idx+1); 416 } 417 } 418 } 419 } 420 } 421 422 @safe unittest { // issue #1741 423 static class S : InputStream { 424 ubyte[] src; 425 ubyte[] buf; 426 size_t nread; 427 428 this(scope ubyte[] bytes...) 429 { 430 src = bytes.dup; 431 } 432 433 @property bool empty() { return nread >= src.length; } 434 @property ulong leastSize() { if (!buf.length && !nread) buf = src; return src.length - nread; } 435 @property bool dataAvailableForRead() { return buf.length > 0; } 436 const(ubyte)[] peek() { return buf; } 437 size_t read(scope ubyte[] dst, IOMode) { 438 if (!buf.length) buf = src; 439 dst[] = buf[0 .. dst.length]; 440 nread += dst.length; 441 buf = buf[dst.length .. $]; 442 return dst.length; 443 } 444 alias InputStream.read read; 445 } 446 447 448 auto s = new S('X', '\r', '\n'); 449 auto dst = appender!(ubyte[]); 450 readUntilSmall(s, dst, ['\r', '\n']); 451 assert(dst.data == ['X']); 452 } 453 454 455 private void readUntilGeneric(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/ 456 if (isOutputRange!(R, ubyte) && isInputStream!InputStream) 457 { 458 // allocate internal jump table to optimize the number of comparisons 459 size_t[8] nmatchoffsetbuffer = void; 460 size_t[] nmatchoffset; 461 if (end_marker.length <= nmatchoffsetbuffer.length) nmatchoffset = nmatchoffsetbuffer[0 .. end_marker.length]; 462 else nmatchoffset = new size_t[end_marker.length]; 463 464 // precompute the jump table 465 nmatchoffset[0] = 0; 466 foreach( i; 1 .. end_marker.length ){ 467 nmatchoffset[i] = i; 468 foreach_reverse( j; 1 .. i ) 469 if( end_marker[j .. i] == end_marker[0 .. i-j] ){ 470 nmatchoffset[i] = i-j; 471 break; 472 } 473 assert(nmatchoffset[i] > 0 && nmatchoffset[i] <= i); 474 } 475 476 size_t nmatched = 0; 477 Buffer* bufferobj; 478 bufferobj = new Buffer; 479 scope (exit) () @trusted { 480 import core.memory : __delete; 481 __delete(bufferobj); 482 } (); 483 auto buf = bufferobj.bytes[]; 484 485 ulong bytes_read = 0; 486 487 void skip2(size_t nbytes) 488 { 489 bytes_read += nbytes; 490 stream.skip(nbytes); 491 } 492 493 while( !stream.empty ){ 494 enforce(bytes_read < max_bytes, "Reached byte limit before reaching end marker."); 495 496 // try to get as much data as possible, either by peeking into the stream or 497 // by reading as much as isguaranteed to not exceed the end marker length 498 // the block size is also always limited by the max_bytes parameter. 499 size_t nread = 0; 500 auto least_size = stream.leastSize(); // NOTE: blocks until data is available 501 auto max_read = max_bytes - bytes_read; 502 auto str = stream.peek(); // try to get some data for free 503 if( str.length == 0 ){ // if not, read as much as possible without reading past the end 504 nread = min(least_size, end_marker.length-nmatched, buf.length, max_read); 505 stream.read(buf[0 .. nread]); 506 str = buf[0 .. nread]; 507 bytes_read += nread; 508 } else if( str.length > max_read ){ 509 str.length = cast(size_t)max_read; 510 } 511 512 // remember how much of the marker was already matched before processing the current block 513 size_t nmatched_start = nmatched; 514 515 // go through the current block trying to match the marker 516 size_t i = 0; 517 for (i = 0; i < str.length; i++) { 518 auto ch = str[i]; 519 // if we have a mismatch, use the jump table to try other possible prefixes 520 // of the marker 521 while( nmatched > 0 && ch != end_marker[nmatched] ) 522 nmatched -= nmatchoffset[nmatched]; 523 524 // if we then have a match, increase the match count and test for full match 525 if (ch == end_marker[nmatched]) 526 if (++nmatched == end_marker.length) { 527 i++; 528 break; 529 } 530 } 531 532 533 // write out any false match part of previous blocks 534 if( nmatched_start > 0 ){ 535 if( nmatched <= i ) () @trusted { dst.put(end_marker[0 .. nmatched_start]); } (); 536 else () @trusted { dst.put(end_marker[0 .. nmatched_start-nmatched+i]); } (); 537 } 538 539 // write out any unmatched part of the current block 540 if( nmatched < i ) () @trusted { dst.put(str[0 .. i-nmatched]); } (); 541 542 // got a full, match => out 543 if (nmatched >= end_marker.length) { 544 // in case of a full match skip data in the stream until the end of 545 // the marker 546 skip2(i - nread); 547 return; 548 } 549 550 // otherwise skip this block in the stream 551 skip2(str.length - nread); 552 } 553 554 enforce(false, "Reached EOF before reaching end marker."); 555 } 556 557 private void skip(InputStream)(InputStream str, ulong count) 558 if (isInputStream!InputStream) 559 { 560 ubyte[256] buf = void; 561 while (count > 0) { 562 auto n = min(buf.length, count); 563 str.read(buf[0 .. n], IOMode.all); 564 count -= n; 565 } 566 } 567 568 private class NoPeekProxy(InputStream) : ProxyStream 569 if (isInputStream!InputStream) 570 { 571 this(InputStream stream) 572 { 573 import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy; 574 super(interfaceProxy!(.InputStream)(stream), InterfaceProxy!OutputStream.init, true); 575 } 576 577 override const(ubyte)[] peek() { return null; } 578 }