1 /** 2 Stream proxy and wrapper facilities. 3 4 Copyright: © 2013-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.wrapper; 9 10 public import vibe.core.stream; 11 12 import std.algorithm : min; 13 import std.exception; 14 import core.time; 15 import vibe.internal.interfaceproxy; 16 import vibe.internal.freelistref : FreeListRef; 17 18 19 ProxyStream createProxyStream(Stream)(Stream stream) 20 if (isStream!Stream) 21 { 22 return new ProxyStream(interfaceProxy!(.Stream)(stream), true); 23 } 24 25 ProxyStream createProxyStream(InputStream, OutputStream)(InputStream input, OutputStream output) 26 if (isInputStream!InputStream && isOutputStream!OutputStream) 27 { 28 return new ProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), true); 29 } 30 31 ConnectionProxyStream createConnectionProxyStream(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream) 32 if (isStream!Stream && isConnectionStream!ConnectionStream) 33 { 34 mixin validateStream!Stream; 35 mixin validateConnectionStream!ConnectionStream; 36 return new ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true); 37 } 38 39 /// private 40 FreeListRef!ConnectionProxyStream createConnectionProxyStreamFL(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream) 41 if (isStream!Stream && isConnectionStream!ConnectionStream) 42 { 43 mixin validateStream!Stream; 44 mixin validateConnectionStream!ConnectionStream; 45 return FreeListRef!ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true); 46 } 47 48 ConnectionProxyStream createConnectionProxyStream(InputStream, OutputStream, ConnectionStream)(InputStream input, OutputStream output, ConnectionStream connection_stream) 49 if (isInputStream!InputStream && isOutputStream!OutputStream && isConnectionStream!ConnectionStream) 50 { 51 return new ConnectionProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), interfaceProxy!(.ConnectionStream)(connection_stream), true); 52 } 53 54 55 /** 56 Provides a way to access varying streams using a constant stream reference. 57 */ 58 class ProxyStream : Stream { 59 @safe: 60 private { 61 InterfaceProxy!(.InputStream) m_input; 62 InterfaceProxy!(.OutputStream) m_output; 63 InterfaceProxy!(.Stream) m_underlying; 64 } 65 66 /// private 67 this(InterfaceProxy!Stream stream, bool dummy) 68 { 69 m_underlying = stream; 70 m_input = stream; 71 m_output = stream; 72 } 73 74 /// private 75 this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, bool dummy) 76 { 77 m_input = input; 78 m_output = output; 79 } 80 81 /// The stream that is wrapped by this one 82 @property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; } 83 /// ditto 84 @property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; } 85 /// ditto 86 static if (!is(Stream == InterfaceProxy!Stream)) 87 @property void underlying(Stream value) { this.underlying = interfaceProxy!Stream(value); } 88 89 @property bool empty() { return m_input ? m_input.empty : true; } 90 91 @property ulong leastSize() { return m_input ? m_input.leastSize : 0; } 92 93 @property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; } 94 95 const(ubyte)[] peek() { return m_input.peek(); } 96 97 size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); } 98 99 alias read = Stream.read; 100 101 size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); } 102 103 alias write = Stream.write; 104 105 void flush() { m_output.flush(); } 106 107 void finalize() { m_output.finalize(); } 108 } 109 110 111 /** 112 Special kind of proxy stream for streams nested in a ConnectionStream. 113 114 This stream will forward all stream operations to the selected stream, 115 but will forward all connection related operations to the given 116 ConnectionStream. This allows wrapping embedded streams, such as 117 SSL streams in a ConnectionStream. 118 */ 119 class ConnectionProxyStream : ConnectionStream { 120 @safe: 121 122 private { 123 InterfaceProxy!ConnectionStream m_connection; 124 InterfaceProxy!Stream m_underlying; 125 InterfaceProxy!InputStream m_input; 126 InterfaceProxy!OutputStream m_output; 127 } 128 129 /// private 130 this(InterfaceProxy!Stream stream, InterfaceProxy!ConnectionStream connection_stream, bool dummy) 131 { 132 assert(!!stream); 133 m_underlying = stream; 134 m_input = stream; 135 m_output = stream; 136 m_connection = connection_stream; 137 } 138 139 /// private 140 this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, InterfaceProxy!ConnectionStream connection_stream, bool dummy) 141 { 142 m_input = input; 143 m_output = output; 144 m_connection = connection_stream; 145 } 146 147 @property bool connected() 148 const { 149 if (!m_connection) 150 return true; 151 152 return m_connection.connected; 153 } 154 155 void close() 156 { 157 if (!m_connection) 158 return; 159 160 if (m_connection.connected) finalize(); 161 m_connection.close(); 162 } 163 164 bool waitForData(Duration timeout = 0.seconds) 165 { 166 if (this.dataAvailableForRead) return true; 167 168 if (!m_connection) 169 return timeout == 0.seconds ? !this.empty : false; 170 171 return m_connection.waitForData(timeout); 172 } 173 174 /// The stream that is wrapped by this one 175 @property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; } 176 /// ditto 177 @property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; } 178 /// ditto 179 static if (!is(Stream == InterfaceProxy!Stream)) 180 @property void underlying(Stream value) { this.underlying = InterfaceProxy!Stream(value); } 181 182 @property bool empty() { return m_input ? m_input.empty : true; } 183 184 @property ulong leastSize() { return m_input ? m_input.leastSize : 0; } 185 186 @property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; } 187 188 const(ubyte)[] peek() { return m_input.peek(); } 189 190 size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); } 191 192 alias read = ConnectionStream.read; 193 194 size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); } 195 196 alias write = ConnectionStream.write; 197 198 void flush() { m_output.flush(); } 199 200 void finalize() { m_output.finalize(); } 201 } 202 203 204 /** 205 Implements an input range interface on top of an InputStream using an 206 internal buffer. 207 208 The buffer is GC allocated and is filled chunk wise. Thus an InputStream 209 that has been wrapped in a StreamInputRange cannot be used reliably on its 210 own anymore. 211 212 Reading occurs in a fully lazy fashion. The first call to either front, 213 popFront or empty will potentially trigger waiting for the next chunk of 214 data to arrive - but especially popFront will not wait if it was called 215 after a call to front. This property allows the range to be used in 216 request-response scenarios. 217 */ 218 struct StreamInputRange(InputStream, size_t buffer_size = 256) 219 if (isInputStream!InputStream) 220 { 221 @safe: 222 private { 223 struct Buffer { 224 ubyte[buffer_size] data = void; 225 size_t fill = 0; 226 } 227 InputStream m_stream; 228 Buffer* m_buffer; 229 } 230 231 private this(InputStream stream) 232 { 233 m_stream = stream; 234 m_buffer = new Buffer; 235 } 236 237 @property bool empty() { return !m_buffer.fill && m_stream.empty; } 238 239 ubyte front() 240 { 241 if (m_buffer.fill < 1) readChunk(); 242 return m_buffer.data[$ - m_buffer.fill]; 243 } 244 void popFront() 245 { 246 assert(!empty); 247 if (m_buffer.fill < 1) readChunk(); 248 m_buffer.fill--; 249 } 250 251 private void readChunk() 252 { 253 auto sz = min(m_stream.leastSize, m_buffer.data.length); 254 assert(sz > 0); 255 m_stream.read(m_buffer.data[$-sz .. $]); 256 m_buffer.fill = sz; 257 } 258 } 259 /// ditto 260 auto streamInputRange(size_t buffer_size = 256, InputStream)(InputStream stream) 261 if (isInputStream!InputStream) 262 { 263 return StreamInputRange!(InputStream, buffer_size)(stream); 264 } 265 266 267 /** 268 Implements a buffered output range interface on top of an OutputStream. 269 */ 270 struct StreamOutputRange(OutputStream, size_t buffer_size = 256) 271 if (isOutputStream!OutputStream) 272 { 273 @safe: 274 275 private { 276 OutputStream m_stream; 277 size_t m_fill = 0; 278 ubyte[buffer_size] m_data = void; 279 bool m_flushInDestructor = true; 280 } 281 282 @disable this(this); 283 284 /// private 285 this(OutputStream stream) 286 { 287 m_stream = stream; 288 } 289 290 ~this() 291 { 292 if (m_flushInDestructor) { 293 scope (failure) () @trusted { destroy(m_stream); }(); // workaround for #2484 294 flush(); 295 } 296 } 297 298 void flush() 299 { 300 if (m_fill == 0) return; 301 writeToStream(m_data[0 .. m_fill]); 302 m_fill = 0; 303 } 304 305 void drop() 306 { 307 m_fill = 0; 308 } 309 310 void put(ubyte bt) 311 { 312 m_data[m_fill++] = bt; 313 if (m_fill >= m_data.length) flush(); 314 } 315 316 void put(const(ubyte)[] bts) 317 { 318 // avoid writing more chunks than necessary 319 if (bts.length + m_fill >= m_data.length * 2) { 320 flush(); 321 writeToStream(bts); 322 return; 323 } 324 325 while (bts.length) { 326 auto len = min(m_data.length - m_fill, bts.length); 327 m_data[m_fill .. m_fill + len] = bts[0 .. len]; 328 m_fill += len; 329 bts = bts[len .. $]; 330 if (m_fill >= m_data.length) flush(); 331 } 332 } 333 334 void put(char elem) { put(cast(ubyte)elem); } 335 void put(const(char)[] elems) { put(cast(const(ubyte)[])elems); } 336 337 void put(dchar elem) 338 { 339 import std.utf; 340 char[4] chars; 341 auto len = encode(chars, elem); 342 put(chars[0 .. len]); 343 } 344 345 void put(const(dchar)[] elems) { foreach( ch; elems ) put(ch); } 346 347 private void writeToStream(in ubyte[] bytes) 348 { 349 // if the write fails, do not attempt another write in the destructor 350 // to avoid throwing an exception twice or nested 351 m_flushInDestructor = false; 352 m_stream.write(bytes); 353 m_flushInDestructor = true; 354 } 355 } 356 /// ditto 357 auto streamOutputRange(size_t buffer_size = 256, OutputStream)(OutputStream stream) 358 if (isOutputStream!OutputStream) 359 { 360 return StreamOutputRange!(OutputStream, buffer_size)(stream); 361 } 362 363 unittest { 364 static long writeLength(ARGS...)(ARGS args) { 365 import vibe.stream.memory; 366 auto dst = createMemoryOutputStream; 367 { 368 auto rng = streamOutputRange(dst); 369 foreach (a; args) rng.put(a); 370 } 371 return dst.data.length; 372 } 373 assert(writeLength("hello", ' ', "world") == "hello world".length); 374 assert(writeLength("h\u00E4llo", ' ', "world") == "h\u00E4llo world".length); 375 assert(writeLength("hello", '\u00E4', "world") == "hello\u00E4world".length); 376 assert(writeLength("h\u1000llo", '\u1000', "world") == "h\u1000llo\u1000world".length); 377 auto test = "häl"; 378 assert(test.length == 4); 379 assert(writeLength(test[0], test[1], test[2], test[3]) == test.length); 380 } 381 382 unittest { 383 static struct ThrowOutputStream { 384 @safe: 385 size_t write(in ubyte[] bytes, IOMode mode) @blocking { throw new Exception("Write failed."); } 386 void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } 387 void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 388 void flush() @blocking {} 389 void finalize() @blocking {} 390 } 391 mixin validateOutputStream!ThrowOutputStream; 392 393 ThrowOutputStream str; 394 395 assertThrown!Exception(() { 396 auto r = streamOutputRange(str); 397 // too few bytes to auto-flush 398 assertNotThrown!Exception(r.put("test")); 399 } ()); 400 401 try { 402 auto r = streamOutputRange(str); 403 // too few bytes to auto-flush 404 assertNotThrown!Exception(r.put("test")); 405 assertThrown!Exception(r.flush()); 406 assertThrown!Exception(r.flush()); 407 } catch (Exception e) assert(false, "Descructor has thrown redundant exception"); 408 }