1 /** 2 Stream proxy and wrapper facilities. 3 4 Copyright: © 2013-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.wrapper; 9 10 public import vibe.core.stream; 11 12 import std.algorithm : min; 13 import std.exception; 14 import core.time; 15 import vibe.internal.interfaceproxy; 16 import vibe.internal.freelistref : FreeListRef; 17 18 19 ProxyStream createProxyStream(Stream)(Stream stream) 20 if (isStream!Stream) 21 { 22 return new ProxyStream(interfaceProxy!(.Stream)(stream), true); 23 } 24 25 ProxyStream createProxyStream(InputStream, OutputStream)(InputStream input, OutputStream output) 26 if (isInputStream!InputStream && isOutputStream!OutputStream) 27 { 28 return new ProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), true); 29 } 30 31 ConnectionProxyStream createConnectionProxyStream(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream) 32 if (isStream!Stream && isConnectionStream!ConnectionStream) 33 { 34 mixin validateStream!Stream; 35 mixin validateConnectionStream!ConnectionStream; 36 return new ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true); 37 } 38 39 /// private 40 FreeListRef!ConnectionProxyStream createConnectionProxyStreamFL(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream) 41 if (isStream!Stream && isConnectionStream!ConnectionStream) 42 { 43 mixin validateStream!Stream; 44 mixin validateConnectionStream!ConnectionStream; 45 return FreeListRef!ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true); 46 } 47 48 ConnectionProxyStream createConnectionProxyStream(InputStream, OutputStream, ConnectionStream)(InputStream input, OutputStream output, ConnectionStream connection_stream) 49 if (isInputStream!InputStream && isOutputStream!OutputStream && isConnectionStream!ConnectionStream) 50 { 51 return new ConnectionProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), interfaceProxy!(.ConnectionStream)(connection_stream), true); 52 } 53 54 55 /** 56 Provides a way to access varying streams using a constant stream reference. 57 */ 58 class ProxyStream : Stream { 59 @safe: 60 private { 61 InterfaceProxy!(.InputStream) m_input; 62 InterfaceProxy!(.OutputStream) m_output; 63 InterfaceProxy!(.Stream) m_underlying; 64 } 65 66 /// private 67 this(InterfaceProxy!Stream stream, bool dummy) 68 { 69 m_underlying = stream; 70 m_input = stream; 71 m_output = stream; 72 } 73 74 /// private 75 this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, bool dummy) 76 { 77 m_input = input; 78 m_output = output; 79 } 80 81 /// The stream that is wrapped by this one 82 @property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; } 83 /// ditto 84 @property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; } 85 /// ditto 86 static if (!is(Stream == InterfaceProxy!Stream)) 87 @property void underlying(Stream value) { this.underlying = interfaceProxy!Stream(value); } 88 89 @property bool empty() { return m_input ? m_input.empty : true; } 90 91 @property ulong leastSize() { return m_input ? m_input.leastSize : 0; } 92 93 @property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; } 94 95 const(ubyte)[] peek() { return m_input.peek(); } 96 97 size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); } 98 99 alias read = Stream.read; 100 101 static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) { 102 override size_t write(scope const(ubyte)[] bytes, IOMode mode) { return m_output.write(bytes, mode); } 103 } else { 104 override size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); } 105 } 106 107 alias write = Stream.write; 108 109 void flush() { m_output.flush(); } 110 111 void finalize() { m_output.finalize(); } 112 } 113 114 115 /** 116 Special kind of proxy stream for streams nested in a ConnectionStream. 117 118 This stream will forward all stream operations to the selected stream, 119 but will forward all connection related operations to the given 120 ConnectionStream. This allows wrapping embedded streams, such as 121 SSL streams in a ConnectionStream. 122 */ 123 class ConnectionProxyStream : ConnectionStream { 124 @safe: 125 126 private { 127 InterfaceProxy!ConnectionStream m_connection; 128 InterfaceProxy!Stream m_underlying; 129 InterfaceProxy!InputStream m_input; 130 InterfaceProxy!OutputStream m_output; 131 } 132 133 /// private 134 this(InterfaceProxy!Stream stream, InterfaceProxy!ConnectionStream connection_stream, bool dummy) 135 { 136 assert(!!stream); 137 m_underlying = stream; 138 m_input = stream; 139 m_output = stream; 140 m_connection = connection_stream; 141 } 142 143 /// private 144 this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, InterfaceProxy!ConnectionStream connection_stream, bool dummy) 145 { 146 m_input = input; 147 m_output = output; 148 m_connection = connection_stream; 149 } 150 151 @property bool connected() 152 const { 153 if (!m_connection) 154 return true; 155 156 return m_connection.connected; 157 } 158 159 void close() 160 { 161 if (!m_connection) 162 return; 163 164 if (m_connection.connected) finalize(); 165 m_connection.close(); 166 } 167 168 bool waitForData(Duration timeout = 0.seconds) 169 { 170 if (this.dataAvailableForRead) return true; 171 172 if (!m_connection) 173 return timeout == 0.seconds ? !this.empty : false; 174 175 return m_connection.waitForData(timeout); 176 } 177 178 /// The stream that is wrapped by this one 179 @property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; } 180 /// ditto 181 @property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; } 182 /// ditto 183 static if (!is(Stream == InterfaceProxy!Stream)) 184 @property void underlying(Stream value) { this.underlying = InterfaceProxy!Stream(value); } 185 186 @property bool empty() { return m_input ? m_input.empty : true; } 187 188 @property ulong leastSize() { return m_input ? m_input.leastSize : 0; } 189 190 @property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; } 191 192 const(ubyte)[] peek() { return m_input.peek(); } 193 194 size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); } 195 196 alias read = ConnectionStream.read; 197 198 static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) { 199 size_t write(scope const(ubyte)[] bytes, IOMode mode) { return m_output.write(bytes, mode); } 200 } else { 201 size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); } 202 } 203 204 alias write = ConnectionStream.write; 205 206 void flush() { m_output.flush(); } 207 208 void finalize() { m_output.finalize(); } 209 } 210 211 212 /** 213 Implements an input range interface on top of an InputStream using an 214 internal buffer. 215 216 The buffer is GC allocated and is filled chunk wise. Thus an InputStream 217 that has been wrapped in a StreamInputRange cannot be used reliably on its 218 own anymore. 219 220 Reading occurs in a fully lazy fashion. The first call to either front, 221 popFront or empty will potentially trigger waiting for the next chunk of 222 data to arrive - but especially popFront will not wait if it was called 223 after a call to front. This property allows the range to be used in 224 request-response scenarios. 225 */ 226 struct StreamInputRange(InputStream, size_t buffer_size = 256) 227 if (isInputStream!InputStream) 228 { 229 @safe: 230 private { 231 struct Buffer { 232 ubyte[buffer_size] data = void; 233 size_t fill = 0; 234 } 235 InputStream m_stream; 236 Buffer* m_buffer; 237 } 238 239 private this(InputStream stream) 240 { 241 m_stream = stream; 242 m_buffer = new Buffer; 243 } 244 245 @property bool empty() { return !m_buffer.fill && m_stream.empty; } 246 247 ubyte front() 248 { 249 if (m_buffer.fill < 1) readChunk(); 250 return m_buffer.data[$ - m_buffer.fill]; 251 } 252 void popFront() 253 { 254 assert(!empty); 255 if (m_buffer.fill < 1) readChunk(); 256 m_buffer.fill--; 257 } 258 259 private void readChunk() 260 { 261 auto sz = min(m_stream.leastSize, m_buffer.data.length); 262 assert(sz > 0); 263 m_stream.read(m_buffer.data[$-sz .. $]); 264 m_buffer.fill = sz; 265 } 266 } 267 /// ditto 268 auto streamInputRange(size_t buffer_size = 256, InputStream)(InputStream stream) 269 if (isInputStream!InputStream) 270 { 271 return StreamInputRange!(InputStream, buffer_size)(stream); 272 } 273 274 275 /** 276 Implements a buffered output range interface on top of an OutputStream. 277 */ 278 struct StreamOutputRange(OutputStream, size_t buffer_size = 256) 279 if (isOutputStream!OutputStream) 280 { 281 @safe: 282 283 private { 284 OutputStream m_stream; 285 size_t m_fill = 0; 286 ubyte[buffer_size] m_data = void; 287 bool m_flushInDestructor = true; 288 } 289 290 @disable this(this); 291 292 /// private 293 this(OutputStream stream) 294 { 295 m_stream = stream; 296 } 297 298 ~this() 299 { 300 if (m_flushInDestructor) { 301 scope (failure) () @trusted { destroy(m_stream); }(); // workaround for #2484 302 flush(); 303 } 304 } 305 306 void flush() 307 { 308 if (m_fill == 0) return; 309 writeToStream(m_data[0 .. m_fill]); 310 m_fill = 0; 311 } 312 313 void drop() 314 { 315 m_fill = 0; 316 } 317 318 void put(ubyte bt) 319 { 320 m_data[m_fill++] = bt; 321 if (m_fill >= m_data.length) flush(); 322 } 323 324 void put(const(ubyte)[] bts) 325 { 326 // avoid writing more chunks than necessary 327 if (bts.length + m_fill >= m_data.length * 2) { 328 flush(); 329 writeToStream(bts); 330 return; 331 } 332 333 while (bts.length) { 334 auto len = min(m_data.length - m_fill, bts.length); 335 m_data[m_fill .. m_fill + len] = bts[0 .. len]; 336 m_fill += len; 337 bts = bts[len .. $]; 338 if (m_fill >= m_data.length) flush(); 339 } 340 } 341 342 void put(char elem) { put(cast(ubyte)elem); } 343 void put(const(char)[] elems) { put(cast(const(ubyte)[])elems); } 344 345 void put(dchar elem) 346 { 347 import std.utf; 348 char[4] chars; 349 auto len = encode(chars, elem); 350 put(chars[0 .. len]); 351 } 352 353 void put(const(dchar)[] elems) { foreach( ch; elems ) put(ch); } 354 355 private void writeToStream(scope const(ubyte)[] bytes) 356 { 357 // if the write fails, do not attempt another write in the destructor 358 // to avoid throwing an exception twice or nested 359 m_flushInDestructor = false; 360 m_stream.write(bytes); 361 m_flushInDestructor = true; 362 } 363 } 364 /// ditto 365 auto streamOutputRange(size_t buffer_size = 256, OutputStream)(OutputStream stream) 366 if (isOutputStream!OutputStream) 367 { 368 return StreamOutputRange!(OutputStream, buffer_size)(stream); 369 } 370 371 unittest { 372 static long writeLength(ARGS...)(ARGS args) { 373 import vibe.stream.memory; 374 auto dst = createMemoryOutputStream; 375 { 376 auto rng = streamOutputRange(dst); 377 foreach (a; args) rng.put(a); 378 } 379 return dst.data.length; 380 } 381 assert(writeLength("hello", ' ', "world") == "hello world".length); 382 assert(writeLength("h\u00E4llo", ' ', "world") == "h\u00E4llo world".length); 383 assert(writeLength("hello", '\u00E4', "world") == "hello\u00E4world".length); 384 assert(writeLength("h\u1000llo", '\u1000', "world") == "h\u1000llo\u1000world".length); 385 auto test = "häl"; 386 assert(test.length == 4); 387 assert(writeLength(test[0], test[1], test[2], test[3]) == test.length); 388 } 389 390 unittest { 391 static struct ThrowOutputStream { 392 @safe: 393 size_t write(in ubyte[] bytes, IOMode mode) @blocking { throw new Exception("Write failed."); } 394 void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } 395 void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 396 void flush() @blocking {} 397 void finalize() @blocking {} 398 } 399 mixin validateOutputStream!ThrowOutputStream; 400 401 ThrowOutputStream str; 402 403 assertThrown!Exception(() { 404 auto r = streamOutputRange(str); 405 // too few bytes to auto-flush 406 assertNotThrown!Exception(r.put("test")); 407 } ()); 408 409 try { 410 auto r = streamOutputRange(str); 411 // too few bytes to auto-flush 412 assertNotThrown!Exception(r.put("test")); 413 assertThrown!Exception(r.flush()); 414 assertThrown!Exception(r.flush()); 415 } catch (Exception e) assert(false, "Descructor has thrown redundant exception"); 416 }