1 /** 2 Stream proxy and wrapper facilities. 3 4 Copyright: © 2013-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.wrapper; 9 10 public import vibe.core.stream; 11 12 import std.algorithm : min; 13 import std.exception; 14 import core.time; 15 import vibe.internal.interfaceproxy; 16 import vibe.internal.freelistref : FreeListRef; 17 18 19 ProxyStream createProxyStream(Stream)(Stream stream) 20 if (isStream!Stream) 21 { 22 return new ProxyStream(interfaceProxy!(.Stream)(stream), true); 23 } 24 25 ProxyStream createProxyStream(InputStream, OutputStream)(InputStream input, OutputStream output) 26 if (isInputStream!InputStream && isOutputStream!OutputStream) 27 { 28 return new ProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), true); 29 } 30 31 ConnectionProxyStream createConnectionProxyStream(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream) 32 if (isStream!Stream && isConnectionStream!ConnectionStream) 33 { 34 mixin validateStream!Stream; 35 mixin validateConnectionStream!ConnectionStream; 36 return new ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true); 37 } 38 39 /// private 40 FreeListRef!ConnectionProxyStream createConnectionProxyStreamFL(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream) 41 if (isStream!Stream && isConnectionStream!ConnectionStream) 42 { 43 mixin validateStream!Stream; 44 mixin validateConnectionStream!ConnectionStream; 45 return FreeListRef!ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true); 46 } 47 48 ConnectionProxyStream createConnectionProxyStream(InputStream, OutputStream, ConnectionStream)(InputStream input, OutputStream output, ConnectionStream connection_stream) 49 if (isInputStream!InputStream && isOutputStream!OutputStream && isConnectionStream!ConnectionStream) 50 { 51 return new ConnectionProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), interfaceProxy!(.ConnectionStream)(connection_stream), true); 52 } 53 54 55 /** 56 Provides a way to access varying streams using a constant stream reference. 57 */ 58 class ProxyStream : Stream { 59 @safe: 60 private { 61 InterfaceProxy!(.InputStream) m_input; 62 InterfaceProxy!(.OutputStream) m_output; 63 InterfaceProxy!(.Stream) m_underlying; 64 } 65 66 deprecated("Use createProxyStream instead.") 67 this(Stream stream = null) 68 { 69 m_underlying = interfaceProxy!Stream(stream); 70 m_input = interfaceProxy!InputStream(stream); 71 m_output = interfaceProxy!OutputStream(stream); 72 } 73 74 deprecated("Use createProxyStream instead.") 75 this(InputStream input, OutputStream output) 76 { 77 m_input = interfaceProxy!InputStream(input); 78 m_output = interfaceProxy!OutputStream(output); 79 } 80 81 /// private 82 this(InterfaceProxy!Stream stream, bool dummy) 83 { 84 m_underlying = stream; 85 m_input = stream; 86 m_output = stream; 87 } 88 89 /// private 90 this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, bool dummy) 91 { 92 m_input = input; 93 m_output = output; 94 } 95 96 /// The stream that is wrapped by this one 97 @property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; } 98 /// ditto 99 @property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; } 100 /// ditto 101 static if (!is(Stream == InterfaceProxy!Stream)) 102 @property void underlying(Stream value) { this.underlying = interfaceProxy!Stream(value); } 103 104 @property bool empty() { return m_input ? m_input.empty : true; } 105 106 @property ulong leastSize() { return m_input ? m_input.leastSize : 0; } 107 108 @property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; } 109 110 const(ubyte)[] peek() { return m_input.peek(); } 111 112 size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); } 113 114 alias read = Stream.read; 115 116 size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); } 117 118 alias write = Stream.write; 119 120 void flush() { m_output.flush(); } 121 122 void finalize() { m_output.finalize(); } 123 } 124 125 126 /** 127 Special kind of proxy stream for streams nested in a ConnectionStream. 128 129 This stream will forward all stream operations to the selected stream, 130 but will forward all connection related operations to the given 131 ConnectionStream. This allows wrapping embedded streams, such as 132 SSL streams in a ConnectionStream. 133 */ 134 class ConnectionProxyStream : ConnectionStream { 135 @safe: 136 137 private { 138 InterfaceProxy!ConnectionStream m_connection; 139 InterfaceProxy!Stream m_underlying; 140 InterfaceProxy!InputStream m_input; 141 InterfaceProxy!OutputStream m_output; 142 } 143 144 deprecated("Use createConnectionProxyStream instead.") 145 this(Stream stream, ConnectionStream connection_stream) 146 { 147 this(interfaceProxy!Stream(stream), interfaceProxy!ConnectionStream(connection_stream), true); 148 } 149 150 deprecated("Use createConnectionProxyStream instead.") 151 this(InputStream input, OutputStream output, ConnectionStream connection_stream) 152 { 153 this(interfaceProxy!InputStream(input), interfaceProxy!OutputStream(output), interfaceProxy!ConnectionStream(connection_stream), true); 154 } 155 156 /// private 157 this(InterfaceProxy!Stream stream, InterfaceProxy!ConnectionStream connection_stream, bool dummy) 158 { 159 assert(!!stream); 160 m_underlying = stream; 161 m_input = stream; 162 m_output = stream; 163 m_connection = connection_stream; 164 } 165 166 /// private 167 this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, InterfaceProxy!ConnectionStream connection_stream, bool dummy) 168 { 169 m_input = input; 170 m_output = output; 171 m_connection = connection_stream; 172 } 173 174 @property bool connected() 175 const { 176 if (!m_connection) 177 return true; 178 179 return m_connection.connected; 180 } 181 182 void close() 183 { 184 if (!m_connection) 185 return; 186 187 if (m_connection.connected) finalize(); 188 m_connection.close(); 189 } 190 191 bool waitForData(Duration timeout = 0.seconds) 192 { 193 if (this.dataAvailableForRead) return true; 194 195 if (!m_connection) 196 return timeout == 0.seconds ? !this.empty : false; 197 198 return m_connection.waitForData(timeout); 199 } 200 201 /// The stream that is wrapped by this one 202 @property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; } 203 /// ditto 204 @property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; } 205 /// ditto 206 static if (!is(Stream == InterfaceProxy!Stream)) 207 @property void underlying(Stream value) { this.underlying = InterfaceProxy!Stream(value); } 208 209 @property bool empty() { return m_input ? m_input.empty : true; } 210 211 @property ulong leastSize() { return m_input ? m_input.leastSize : 0; } 212 213 @property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; } 214 215 const(ubyte)[] peek() { return m_input.peek(); } 216 217 size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); } 218 219 alias read = ConnectionStream.read; 220 221 size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); } 222 223 alias write = ConnectionStream.write; 224 225 void flush() { m_output.flush(); } 226 227 void finalize() { m_output.finalize(); } 228 } 229 230 231 /** 232 Implements an input range interface on top of an InputStream using an 233 internal buffer. 234 235 The buffer is GC allocated and is filled chunk wise. Thus an InputStream 236 that has been wrapped in a StreamInputRange cannot be used reliably on its 237 own anymore. 238 239 Reading occurs in a fully lazy fashion. The first call to either front, 240 popFront or empty will potentially trigger waiting for the next chunk of 241 data to arrive - but especially popFront will not wait if it was called 242 after a call to front. This property allows the range to be used in 243 request-response scenarios. 244 */ 245 deprecated("Use streamInputRange() instead.") 246 StreamInputRange!OutputStream StreamInputRange()(InputStream stream) { return StreamInputRange!InputStream(stream); } 247 /// ditto 248 struct StreamInputRange(InputStream, size_t buffer_size = 256) 249 if (isInputStream!InputStream) 250 { 251 @safe: 252 private { 253 struct Buffer { 254 ubyte[buffer_size] data = void; 255 size_t fill = 0; 256 } 257 InputStream m_stream; 258 Buffer* m_buffer; 259 } 260 261 private this(InputStream stream) 262 { 263 m_stream = stream; 264 m_buffer = new Buffer; 265 } 266 267 @property bool empty() { return !m_buffer.fill && m_stream.empty; } 268 269 ubyte front() 270 { 271 if (m_buffer.fill < 1) readChunk(); 272 return m_buffer.data[$ - m_buffer.fill]; 273 } 274 void popFront() 275 { 276 assert(!empty); 277 if (m_buffer.fill < 1) readChunk(); 278 m_buffer.fill--; 279 } 280 281 private void readChunk() 282 { 283 auto sz = min(m_stream.leastSize, m_buffer.data.length); 284 assert(sz > 0); 285 m_stream.read(m_buffer.data[$-sz .. $]); 286 m_buffer.fill = sz; 287 } 288 } 289 /// ditto 290 auto streamInputRange(size_t buffer_size = 256, InputStream)(InputStream stream) 291 if (isInputStream!InputStream) 292 { 293 return StreamInputRange!(InputStream, buffer_size)(stream); 294 } 295 296 297 /** 298 Implements a buffered output range interface on top of an OutputStream. 299 */ 300 deprecated("Use streamOutputRange() instead.") 301 StreamOutputRange!OutputStream StreamOutputRange()(OutputStream stream) { return StreamOutputRange!OutputStream(stream); } 302 /// ditto 303 struct StreamOutputRange(OutputStream, size_t buffer_size = 256) 304 if (isOutputStream!OutputStream) 305 { 306 @safe: 307 308 private { 309 OutputStream m_stream; 310 size_t m_fill = 0; 311 ubyte[buffer_size] m_data = void; 312 bool m_flushInDestructor = true; 313 } 314 315 @disable this(this); 316 317 /// private 318 this(OutputStream stream) 319 { 320 m_stream = stream; 321 } 322 323 ~this() 324 { 325 if (m_flushInDestructor) { 326 scope (failure) () @trusted { destroy(m_stream); }(); // workaround for #2484 327 flush(); 328 } 329 } 330 331 void flush() 332 { 333 if (m_fill == 0) return; 334 writeToStream(m_data[0 .. m_fill]); 335 m_fill = 0; 336 } 337 338 void drop() 339 { 340 m_fill = 0; 341 } 342 343 void put(ubyte bt) 344 { 345 m_data[m_fill++] = bt; 346 if (m_fill >= m_data.length) flush(); 347 } 348 349 void put(const(ubyte)[] bts) 350 { 351 // avoid writing more chunks than necessary 352 if (bts.length + m_fill >= m_data.length * 2) { 353 flush(); 354 writeToStream(bts); 355 return; 356 } 357 358 while (bts.length) { 359 auto len = min(m_data.length - m_fill, bts.length); 360 m_data[m_fill .. m_fill + len] = bts[0 .. len]; 361 m_fill += len; 362 bts = bts[len .. $]; 363 if (m_fill >= m_data.length) flush(); 364 } 365 } 366 367 void put(char elem) { put(cast(ubyte)elem); } 368 void put(const(char)[] elems) { put(cast(const(ubyte)[])elems); } 369 370 void put(dchar elem) 371 { 372 import std.utf; 373 char[4] chars; 374 auto len = encode(chars, elem); 375 put(chars[0 .. len]); 376 } 377 378 void put(const(dchar)[] elems) { foreach( ch; elems ) put(ch); } 379 380 private void writeToStream(in ubyte[] bytes) 381 { 382 // if the write fails, do not attempt another write in the destructor 383 // to avoid throwing an exception twice or nested 384 m_flushInDestructor = false; 385 m_stream.write(bytes); 386 m_flushInDestructor = true; 387 } 388 } 389 /// ditto 390 auto streamOutputRange(size_t buffer_size = 256, OutputStream)(OutputStream stream) 391 if (isOutputStream!OutputStream) 392 { 393 return StreamOutputRange!(OutputStream, buffer_size)(stream); 394 } 395 396 unittest { 397 static long writeLength(ARGS...)(ARGS args) { 398 import vibe.stream.memory; 399 auto dst = createMemoryOutputStream; 400 { 401 auto rng = streamOutputRange(dst); 402 foreach (a; args) rng.put(a); 403 } 404 return dst.data.length; 405 } 406 assert(writeLength("hello", ' ', "world") == "hello world".length); 407 assert(writeLength("h\u00E4llo", ' ', "world") == "h\u00E4llo world".length); 408 assert(writeLength("hello", '\u00E4', "world") == "hello\u00E4world".length); 409 assert(writeLength("h\u1000llo", '\u1000', "world") == "h\u1000llo\u1000world".length); 410 auto test = "häl"; 411 assert(test.length == 4); 412 assert(writeLength(test[0], test[1], test[2], test[3]) == test.length); 413 } 414 415 unittest { 416 static struct ThrowOutputStream { 417 @safe: 418 size_t write(in ubyte[] bytes, IOMode mode) @blocking { throw new Exception("Write failed."); } 419 void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } 420 void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 421 void flush() @blocking {} 422 void finalize() @blocking {} 423 } 424 mixin validateOutputStream!ThrowOutputStream; 425 426 ThrowOutputStream str; 427 428 assertThrown!Exception(() { 429 auto r = streamOutputRange(str); 430 // too few bytes to auto-flush 431 assertNotThrown!Exception(r.put("test")); 432 } ()); 433 434 try { 435 auto r = streamOutputRange(str); 436 // too few bytes to auto-flush 437 assertNotThrown!Exception(r.put("test")); 438 assertThrown!Exception(r.flush()); 439 assertThrown!Exception(r.flush()); 440 } catch (Exception e) assert(false, "Descructor has thrown redundant exception"); 441 }