1 /** 2 High level stream manipulation functions. 3 4 Copyright: © 2012-2016 RejectedSoftware e.K. 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.operations; 9 10 public import vibe.core.stream; 11 12 import vibe.core.log; 13 import vibe.utils.array : AllocAppender; 14 import vibe.internal.allocator; 15 import vibe.internal.freelistref; 16 import vibe.stream.wrapper : ProxyStream; 17 18 import std.algorithm; 19 import std.array; 20 import std.datetime; 21 import std.exception; 22 import std.range : isOutputRange; 23 24 25 /**************************************************************************************************/ 26 /* Public functions */ 27 /**************************************************************************************************/ 28 29 /** 30 Reads and returns a single line from the stream. 31 32 Throws: 33 An exception if either the stream end was hit without hitting a newline first, or 34 if more than max_bytes have been read from the stream. 35 */ 36 ubyte[] readLine(InputStream)(InputStream stream, size_t max_bytes = size_t.max, string linesep = "\r\n", IAllocator alloc = vibeThreadAllocator()) /*@ufcs*/ 37 if (isInputStream!InputStream) 38 { 39 auto output = AllocAppender!(ubyte[])(alloc); 40 output.reserve(max_bytes < 64 ? max_bytes : 64); 41 readLine(stream, output, max_bytes, linesep); 42 return output.data(); 43 } 44 /// ditto 45 void readLine(InputStream, OutputStream)(InputStream stream, OutputStream dst, size_t max_bytes = size_t.max, string linesep = "\r\n") 46 if (isInputStream!InputStream && isOutputStream!OutputStream) 47 { 48 import vibe.stream.wrapper; 49 auto dstrng = StreamOutputRange(dst); 50 readLine(stream, dstrng, max_bytes, linesep); 51 } 52 /// ditto 53 void readLine(R, InputStream)(InputStream stream, ref R dst, size_t max_bytes = size_t.max, string linesep = "\r\n") 54 if (isOutputRange!(R, ubyte)) 55 { 56 readUntil(stream, dst, cast(const(ubyte)[])linesep, max_bytes); 57 } 58 59 @safe unittest { 60 import vibe.stream.memory : createMemoryOutputStream, createMemoryStream; 61 62 auto inp = createMemoryStream(cast(ubyte[])"Hello, World!\r\nThis is a test.\r\nNot a full line.".dup); 63 assert(inp.readLine() == cast(const(ubyte)[])"Hello, World!"); 64 assert(inp.readLine() == cast(const(ubyte)[])"This is a test."); 65 assertThrown(inp.readLine); 66 67 // start over 68 inp.seek(0); 69 70 // read into an output buffer 71 auto app = appender!(ubyte[]); 72 inp.readLine(app); 73 assert(app.data == cast(const(ubyte)[])"Hello, World!"); 74 75 // read into an output stream 76 auto os = createMemoryOutputStream(); 77 inp.readLine(os); 78 assert(os.data == cast(const(ubyte)[])"This is a test."); 79 } 80 81 82 /** 83 Reads all data of a stream until the specified end marker is detected. 84 85 Params: 86 stream = The input stream which is searched for end_marker 87 end_marker = The byte sequence which is searched in the stream 88 max_bytes = An optional limit of how much data is to be read from the 89 input stream; if the limit is reaached before hitting the end 90 marker, an exception is thrown. 91 alloc = An optional allocator that is used to build the result string 92 in the string variant of this function 93 dst = The output stream, to which the prefix to the end marker of the 94 input stream is written 95 96 Returns: 97 The string variant of this function returns the complete prefix to the 98 end marker of the input stream, excluding the end marker itself. 99 100 Throws: 101 An exception if either the stream end was hit without hitting a marker 102 first, or if more than max_bytes have been read from the stream in 103 case of max_bytes != 0. 104 105 Remarks: 106 This function uses an algorithm inspired by the 107 $(LINK2 http://en.wikipedia.org/wiki/Boyer%E2%80%93Moore_string_search_algorithm, 108 Boyer-Moore string search algorithm). However, contrary to the original 109 algorithm, it will scan the whole input string exactly once, without 110 jumping over portions of it. This allows the algorithm to work with 111 constant memory requirements and without the memory copies that would 112 be necessary for streams that do not hold their complete data in 113 memory. 114 115 The current implementation has a run time complexity of O(n*m+m²) and 116 O(n+m) in typical cases, with n being the length of the scanned input 117 string and m the length of the marker. 118 */ 119 ubyte[] readUntil(InputStream)(InputStream stream, in ubyte[] end_marker, size_t max_bytes = size_t.max, IAllocator alloc = vibeThreadAllocator()) /*@ufcs*/ 120 if (isInputStream!InputStream) 121 { 122 auto output = AllocAppender!(ubyte[])(alloc); 123 output.reserve(max_bytes < 64 ? max_bytes : 64); 124 readUntil(stream, output, end_marker, max_bytes); 125 return output.data(); 126 } 127 /// ditto 128 void readUntil(InputStream, OutputStream)(InputStream stream, OutputStream dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/ 129 if (isInputStream!InputStream && isOutputStream!OutputStream) 130 { 131 import vibe.stream.wrapper; 132 auto dstrng = streamOutputRange(dst); 133 readUntil(stream, dstrng, end_marker, max_bytes); 134 } 135 /// ditto 136 void readUntil(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/ 137 if (isOutputRange!(R, ubyte) && isInputStream!InputStream) 138 { 139 assert(max_bytes > 0 && end_marker.length > 0); 140 141 if (end_marker.length <= 2) 142 readUntilSmall(stream, dst, end_marker, max_bytes); 143 else 144 readUntilGeneric(stream, dst, end_marker, max_bytes); 145 } 146 147 @safe unittest { 148 import vibe.stream.memory; 149 150 auto text = "1231234123111223123334221111112221231333123123123123123213123111111111114".dup; 151 auto stream = createMemoryStream(cast(ubyte[])text); 152 void test(string s, size_t expected) @safe { 153 stream.seek(0); 154 auto result = cast(char[])readUntil(stream, cast(const(ubyte)[])s); 155 assert(result.length == expected, "Wrong result index"); 156 assert(result == text[0 .. result.length], "Wrong result contents: "~result~" vs "~text[0 .. result.length]); 157 assert(stream.leastSize() == stream.size() - expected - s.length, "Wrong number of bytes left in stream"); 158 159 stream.seek(0); 160 auto inp2 = new NoPeekProxy!InputStream(stream); 161 result = cast(char[])readUntil(inp2, cast(const(ubyte)[])s); 162 assert(result.length == expected, "Wrong result index"); 163 assert(result == text[0 .. result.length], "Wrong result contents: "~result~" vs "~text[0 .. result.length]); 164 assert(stream.leastSize() == stream.size() - expected - s.length, "Wrong number of bytes left in stream"); 165 } 166 foreach( i; 0 .. text.length ){ 167 stream.peekWindow = i; 168 test("1", 0); 169 test("2", 1); 170 test("3", 2); 171 test("12", 0); 172 test("23", 1); 173 test("31", 2); 174 test("123", 0); 175 test("231", 1); 176 test("1231", 0); 177 test("3123", 2); 178 test("11223", 11); 179 test("11222", 28); 180 test("114", 70); 181 test("111111111114", 61); 182 } 183 // TODO: test 184 } 185 186 @safe unittest { 187 import vibe.stream.memory : createMemoryOutputStream, createMemoryStream, MemoryStream; 188 import vibe.stream.wrapper : ProxyStream; 189 190 auto text = cast(ubyte[])"ab\nc\rd\r\ne".dup; 191 void test(string marker, size_t idx) 192 { 193 // code path for peek support 194 auto inp = createMemoryStream(text); 195 auto dst = appender!(ubyte[]); 196 readUntil(inp, dst, cast(const(ubyte)[])marker); 197 assert(dst.data == text[0 .. idx]); 198 assert(inp.peek == text[idx+marker.length .. $]); 199 200 // code path for no peek support 201 inp.seek(0); 202 dst = appender!(ubyte[]); 203 auto inp2 = new NoPeekProxy!MemoryStream(inp); 204 readUntil(inp2, dst, cast(const(ubyte)[])marker); 205 assert(dst.data == text[0 .. idx]); 206 assert(inp.readAll() == text[idx+marker.length .. $]); 207 } 208 test("\r\n", 6); 209 test("\r", 4); 210 test("\n", 2); 211 } 212 213 /** 214 Reads the complete contents of a stream, optionally limited by max_bytes. 215 216 Throws: 217 An exception is thrown if the stream contains more than max_bytes data. 218 */ 219 ubyte[] readAll(InputStream)(InputStream stream, size_t max_bytes = size_t.max, size_t reserve_bytes = 0) /*@ufcs*/ 220 if (isInputStream!InputStream) 221 { 222 import vibe.internal.freelistref; 223 224 if (max_bytes == 0) logDebug("Deprecated behavior: readAll() called with max_bytes==0, use max_bytes==size_t.max instead."); 225 226 // prepare output buffer 227 auto dst = AllocAppender!(ubyte[])(() @trusted { return GCAllocator.instance.allocatorObject; } ()); 228 reserve_bytes = max(reserve_bytes, min(max_bytes, stream.leastSize)); 229 if (reserve_bytes) dst.reserve(reserve_bytes); 230 231 size_t n = 0; 232 while (!stream.empty) { 233 size_t chunk = min(stream.leastSize, size_t.max); 234 n += chunk; 235 enforce(!max_bytes || n <= max_bytes, "Input data too long!"); 236 dst.reserve(chunk); 237 dst.append((scope buf) { 238 stream.read(buf[0 .. chunk]); 239 return chunk; 240 }); 241 } 242 return dst.data; 243 } 244 245 /** 246 Reads the complete contents of a stream, assuming UTF-8 encoding. 247 248 Params: 249 stream = Specifies the stream from which to read. 250 sanitize = If true, the input data will not be validated but will instead be made valid UTF-8. 251 max_bytes = Optional size limit of the data that is read. 252 253 Returns: 254 The full contents of the stream, excluding a possible BOM, are returned as a UTF-8 string. 255 256 Throws: 257 An exception is thrown if max_bytes != 0 and the stream contains more than max_bytes data. 258 If the sanitize parameter is false and the stream contains invalid UTF-8 code sequences, 259 a UTFException is thrown. 260 */ 261 string readAllUTF8(InputStream)(InputStream stream, bool sanitize = false, size_t max_bytes = size_t.max) 262 if (isInputStream!InputStream) 263 { 264 import std.utf; 265 import vibe.utils.string; 266 auto data = readAll(stream, max_bytes); 267 if( sanitize ) return stripUTF8Bom(sanitizeUTF8(data)); 268 else { 269 auto ret = () @trusted { return cast(string)data; } (); 270 validate(ret); 271 return stripUTF8Bom(ret); 272 } 273 } 274 275 /** 276 Pipes a stream to another while keeping the latency within the specified threshold. 277 278 Params: 279 destination = The destination stram to pipe into 280 source = The source stream to read data from 281 nbytes = Number of bytes to pipe through. The default of zero means to pipe 282 the whole input stream. 283 max_latency = The maximum time before data is flushed to destination. The default value 284 of 0 s will flush after each chunk of data read from source. 285 286 See_also: OutputStream.write 287 */ 288 void pipeRealtime(OutputStream, ConnectionStream)(OutputStream destination, ConnectionStream source, ulong nbytes = 0, Duration max_latency = 0.seconds) 289 if (isOutputStream!OutputStream && isConnectionStream!ConnectionStream) 290 { 291 import vibe.internal.freelistref; 292 293 static struct Buffer { ubyte[64*1024] bytes = void; } 294 auto bufferobj = FreeListRef!(Buffer, false)(); 295 auto buffer = bufferobj.bytes[]; 296 297 //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); 298 auto least_size = source.leastSize; 299 StopWatch sw; 300 sw.start(); 301 while (nbytes > 0 || least_size > 0) { 302 size_t chunk = min(nbytes > 0 ? nbytes : ulong.max, least_size, buffer.length); 303 assert(chunk > 0, "leastSize returned zero for non-empty stream."); 304 //logTrace("read pipe chunk %d", chunk); 305 source.read(buffer[0 .. chunk]); 306 destination.write(buffer[0 .. chunk]); 307 if (nbytes > 0) nbytes -= chunk; 308 309 auto remaining_latency = max_latency - cast(Duration)sw.peek(); 310 if (remaining_latency > 0.seconds) 311 source.waitForData(remaining_latency); 312 313 if (cast(Duration)sw.peek >= max_latency) { 314 logTrace("pipeRealtime flushing."); 315 destination.flush(); 316 sw.reset(); 317 } else { 318 logTrace("pipeRealtime not flushing."); 319 } 320 321 least_size = source.leastSize; 322 if (!least_size) { 323 enforce(nbytes == 0, "Reading past end of input."); 324 break; 325 } 326 } 327 destination.flush(); 328 } 329 330 /** 331 Consumes `bytes.length` bytes of the stream and determines if the contents 332 match up. 333 334 Returns: True $(I iff) the consumed bytes equal the passed array. 335 Throws: Throws an exception if reading from the stream fails. 336 */ 337 bool skipBytes(InputStream)(InputStream stream, const(ubyte)[] bytes) 338 if (isInputStream!InputStream) 339 { 340 bool matched = true; 341 ubyte[128] buf = void; 342 while (bytes.length) { 343 auto len = min(buf.length, bytes.length); 344 stream.read(buf[0 .. len], IOMode.all); 345 if (buf[0 .. len] != bytes[0 .. len]) matched = false; 346 bytes = bytes[len .. $]; 347 } 348 return matched; 349 } 350 351 private struct Buffer { ubyte[64*1024-4] bytes = void; } // 64k - 4 bytes for reference count 352 353 private void readUntilSmall(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) 354 if (isInputStream!InputStream) 355 { 356 assert(end_marker.length >= 1 && end_marker.length <= 2); 357 358 size_t nmatched = 0; 359 size_t nmarker = end_marker.length; 360 361 while (true) { 362 enforce(!stream.empty, "Reached EOF while searching for end marker."); 363 enforce(max_bytes > 0, "Reached maximum number of bytes while searching for end marker."); 364 auto max_peek = max(max_bytes, max_bytes+nmarker); // account for integer overflow 365 auto pm = stream.peek()[0 .. min($, max_bytes)]; 366 if (!pm.length || nmatched == 1) { // no peek support - inefficient route 367 ubyte[2] buf = void; 368 auto l = nmarker - nmatched; 369 stream.read(buf[0 .. l], IOMode.all); 370 foreach (i; 0 .. l) { 371 if (buf[i] == end_marker[nmatched]) { 372 nmatched++; 373 } else if (buf[i] == end_marker[0]) { 374 foreach (j; 0 .. nmatched) dst.put(end_marker[j]); 375 nmatched = 1; 376 } else { 377 foreach (j; 0 .. nmatched) dst.put(end_marker[j]); 378 nmatched = 0; 379 dst.put(buf[i]); 380 } 381 if (nmatched == nmarker) return; 382 } 383 } else { 384 assert(nmatched == 0); 385 386 auto idx = pm.countUntil(end_marker[0]); 387 if (idx < 0) { 388 dst.put(pm); 389 max_bytes -= pm.length; 390 stream.skip(pm.length); 391 } else { 392 dst.put(pm[0 .. idx]); 393 if (nmarker == 1) { 394 stream.skip(idx+1); 395 return; 396 } else if (idx+1 < pm.length && pm[idx+1] == end_marker[1]) { 397 assert(nmarker == 2); 398 stream.skip(idx+2); 399 return; 400 } else { 401 nmatched++; 402 stream.skip(idx+1); 403 } 404 } 405 } 406 } 407 } 408 409 @safe unittest { // issue #1741 410 static class S : InputStream { 411 ubyte[] src; 412 ubyte[] buf; 413 size_t nread; 414 415 this(scope ubyte[] bytes...) 416 { 417 src = bytes.dup; 418 } 419 420 @property bool empty() { return nread >= src.length; } 421 @property ulong leastSize() { if (!buf.length && !nread) buf = src; return src.length - nread; } 422 @property bool dataAvailableForRead() { return buf.length > 0; } 423 const(ubyte)[] peek() { return buf; } 424 size_t read(scope ubyte[] dst, IOMode) { 425 if (!buf.length) buf = src; 426 dst[] = buf[0 .. dst.length]; 427 nread += dst.length; 428 buf = buf[dst.length .. $]; 429 return dst.length; 430 } 431 alias InputStream.read read; 432 } 433 434 435 auto s = new S('X', '\r', '\n'); 436 auto dst = appender!(ubyte[]); 437 readUntilSmall(s, dst, ['\r', '\n']); 438 assert(dst.data == ['X']); 439 } 440 441 442 private void readUntilGeneric(R, InputStream)(InputStream stream, ref R dst, in ubyte[] end_marker, ulong max_bytes = ulong.max) /*@ufcs*/ 443 if (isOutputRange!(R, ubyte) && isInputStream!InputStream) 444 { 445 // allocate internal jump table to optimize the number of comparisons 446 size_t[8] nmatchoffsetbuffer = void; 447 size_t[] nmatchoffset; 448 if (end_marker.length <= nmatchoffsetbuffer.length) nmatchoffset = nmatchoffsetbuffer[0 .. end_marker.length]; 449 else nmatchoffset = new size_t[end_marker.length]; 450 451 // precompute the jump table 452 nmatchoffset[0] = 0; 453 foreach( i; 1 .. end_marker.length ){ 454 nmatchoffset[i] = i; 455 foreach_reverse( j; 1 .. i ) 456 if( end_marker[j .. i] == end_marker[0 .. i-j] ){ 457 nmatchoffset[i] = i-j; 458 break; 459 } 460 assert(nmatchoffset[i] > 0 && nmatchoffset[i] <= i); 461 } 462 463 size_t nmatched = 0; 464 Buffer* bufferobj; 465 bufferobj = new Buffer; 466 scope (exit) () @trusted { delete bufferobj; } (); 467 auto buf = bufferobj.bytes[]; 468 469 ulong bytes_read = 0; 470 471 void skip2(size_t nbytes) 472 { 473 bytes_read += nbytes; 474 stream.skip(nbytes); 475 } 476 477 while( !stream.empty ){ 478 enforce(bytes_read < max_bytes, "Reached byte limit before reaching end marker."); 479 480 // try to get as much data as possible, either by peeking into the stream or 481 // by reading as much as isguaranteed to not exceed the end marker length 482 // the block size is also always limited by the max_bytes parameter. 483 size_t nread = 0; 484 auto least_size = stream.leastSize(); // NOTE: blocks until data is available 485 auto max_read = max_bytes - bytes_read; 486 auto str = stream.peek(); // try to get some data for free 487 if( str.length == 0 ){ // if not, read as much as possible without reading past the end 488 nread = min(least_size, end_marker.length-nmatched, buf.length, max_read); 489 stream.read(buf[0 .. nread]); 490 str = buf[0 .. nread]; 491 bytes_read += nread; 492 } else if( str.length > max_read ){ 493 str.length = cast(size_t)max_read; 494 } 495 496 // remember how much of the marker was already matched before processing the current block 497 size_t nmatched_start = nmatched; 498 499 // go through the current block trying to match the marker 500 size_t i = 0; 501 for (i = 0; i < str.length; i++) { 502 auto ch = str[i]; 503 // if we have a mismatch, use the jump table to try other possible prefixes 504 // of the marker 505 while( nmatched > 0 && ch != end_marker[nmatched] ) 506 nmatched -= nmatchoffset[nmatched]; 507 508 // if we then have a match, increase the match count and test for full match 509 if (ch == end_marker[nmatched]) 510 if (++nmatched == end_marker.length) { 511 i++; 512 break; 513 } 514 } 515 516 517 // write out any false match part of previous blocks 518 if( nmatched_start > 0 ){ 519 if( nmatched <= i ) () @trusted { dst.put(end_marker[0 .. nmatched_start]); } (); 520 else () @trusted { dst.put(end_marker[0 .. nmatched_start-nmatched+i]); } (); 521 } 522 523 // write out any unmatched part of the current block 524 if( nmatched < i ) () @trusted { dst.put(str[0 .. i-nmatched]); } (); 525 526 // got a full, match => out 527 if (nmatched >= end_marker.length) { 528 // in case of a full match skip data in the stream until the end of 529 // the marker 530 skip2(i - nread); 531 return; 532 } 533 534 // otherwise skip this block in the stream 535 skip2(str.length - nread); 536 } 537 538 enforce(false, "Reached EOF before reaching end marker."); 539 } 540 541 private void skip(InputStream)(InputStream str, ulong count) 542 if (isInputStream!InputStream) 543 { 544 ubyte[256] buf = void; 545 while (count > 0) { 546 auto n = min(buf.length, count); 547 str.read(buf[0 .. n], IOMode.all); 548 count -= n; 549 } 550 } 551 552 private class NoPeekProxy(InputStream) : ProxyStream 553 if (isInputStream!InputStream) 554 { 555 this(InputStream stream) 556 { 557 import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy; 558 super(interfaceProxy!(.InputStream)(stream), InterfaceProxy!OutputStream.init, true); 559 } 560 561 override const(ubyte)[] peek() { return null; } 562 }