1 /** 2 Stream proxy and wrapper facilities. 3 4 Copyright: © 2013-2016 RejectedSoftware e.K. 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.wrapper; 9 10 public import vibe.core.stream; 11 12 import std.algorithm : min; 13 import std.exception; 14 import core.time; 15 import vibe.internal.interfaceproxy; 16 import vibe.internal.freelistref : FreeListRef; 17 18 19 ProxyStream createProxyStream(Stream)(Stream stream) 20 if (isStream!Stream) 21 { 22 return new ProxyStream(interfaceProxy!(.Stream)(stream), true); 23 } 24 25 ProxyStream createProxyStream(InputStream, OutputStream)(InputStream input, OutputStream output) 26 if (isInputStream!InputStream && isOutputStream!OutputStream) 27 { 28 return new ProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), true); 29 } 30 31 ConnectionProxyStream createConnectionProxyStream(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream) 32 if (isStream!Stream && isConnectionStream!ConnectionStream) 33 { 34 mixin validateStream!Stream; 35 mixin validateConnectionStream!ConnectionStream; 36 return new ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true); 37 } 38 39 /// private 40 FreeListRef!ConnectionProxyStream createConnectionProxyStreamFL(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream) 41 if (isStream!Stream && isConnectionStream!ConnectionStream) 42 { 43 mixin validateStream!Stream; 44 mixin validateConnectionStream!ConnectionStream; 45 return FreeListRef!ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true); 46 } 47 48 ConnectionProxyStream createConnectionProxyStream(InputStream, OutputStream, ConnectionStream)(InputStream input, OutputStream output, ConnectionStream connection_stream) 49 if (isInputStream!InputStream && isOutputStream!OutputStream && isConnectionStream!ConnectionStream) 50 { 51 return new ConnectionProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), interfaceProxy!(.ConnectionStream)(connection_stream), true); 52 } 53 54 55 /** 56 Provides a way to access varying streams using a constant stream reference. 57 */ 58 class ProxyStream : Stream { 59 @safe: 60 private { 61 InterfaceProxy!(.InputStream) m_input; 62 InterfaceProxy!(.OutputStream) m_output; 63 InterfaceProxy!(.Stream) m_underlying; 64 } 65 66 deprecated("Use createProxyStream instead.") 67 this(Stream stream = null) 68 { 69 m_underlying = interfaceProxy!Stream(stream); 70 m_input = interfaceProxy!InputStream(stream); 71 m_output = interfaceProxy!OutputStream(stream); 72 } 73 74 deprecated("Use createProxyStream instead.") 75 this(InputStream input, OutputStream output) 76 { 77 m_input = interfaceProxy!InputStream(input); 78 m_output = interfaceProxy!OutputStream(output); 79 } 80 81 /// private 82 this(InterfaceProxy!Stream stream, bool dummy) 83 { 84 m_underlying = stream; 85 m_input = stream; 86 m_output = stream; 87 } 88 89 /// private 90 this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, bool dummy) 91 { 92 m_input = input; 93 m_output = output; 94 } 95 96 /// The stream that is wrapped by this one 97 @property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; } 98 /// ditto 99 @property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; } 100 /// ditto 101 static if (!is(Stream == InterfaceProxy!Stream)) 102 @property void underlying(Stream value) { this.underlying = interfaceProxy!Stream(value); } 103 104 @property bool empty() { return m_input ? m_input.empty : true; } 105 106 @property ulong leastSize() { return m_input ? m_input.leastSize : 0; } 107 108 @property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; } 109 110 const(ubyte)[] peek() { return m_input.peek(); } 111 112 size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); } 113 114 alias read = Stream.read; 115 116 size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); } 117 118 alias write = Stream.write; 119 120 void flush() { m_output.flush(); } 121 122 void finalize() { m_output.finalize(); } 123 } 124 125 126 /** 127 Special kind of proxy stream for streams nested in a ConnectionStream. 128 129 This stream will forward all stream operations to the selected stream, 130 but will forward all connection related operations to the given 131 ConnectionStream. This allows wrapping embedded streams, such as 132 SSL streams in a ConnectionStream. 133 */ 134 class ConnectionProxyStream : ConnectionStream { 135 @safe: 136 137 private { 138 InterfaceProxy!ConnectionStream m_connection; 139 InterfaceProxy!Stream m_underlying; 140 InterfaceProxy!InputStream m_input; 141 InterfaceProxy!OutputStream m_output; 142 } 143 144 deprecated("Use createConnectionProxyStream instead.") 145 this(Stream stream, ConnectionStream connection_stream) 146 { 147 this(interfaceProxy!Stream(stream), interfaceProxy!ConnectionStream(connection_stream), true); 148 } 149 150 deprecated("Use createConnectionProxyStream instead.") 151 this(InputStream input, OutputStream output, ConnectionStream connection_stream) 152 { 153 this(interfaceProxy!InputStream(input), interfaceProxy!OutputStream(output), interfaceProxy!ConnectionStream(connection_stream), true); 154 } 155 156 /// private 157 this(InterfaceProxy!Stream stream, InterfaceProxy!ConnectionStream connection_stream, bool dummy) 158 { 159 assert(!!stream); 160 m_underlying = stream; 161 m_input = stream; 162 m_output = stream; 163 m_connection = connection_stream; 164 } 165 166 /// private 167 this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, InterfaceProxy!ConnectionStream connection_stream, bool dummy) 168 { 169 m_input = input; 170 m_output = output; 171 m_connection = connection_stream; 172 } 173 174 @property bool connected() 175 const { 176 if (!m_connection) 177 return true; 178 179 return m_connection.connected; 180 } 181 182 void close() 183 { 184 if (!m_connection) 185 return; 186 187 if (m_connection.connected) finalize(); 188 m_connection.close(); 189 } 190 191 bool waitForData(Duration timeout = 0.seconds) 192 { 193 if (this.dataAvailableForRead) return true; 194 195 if (!m_connection) 196 return timeout == 0.seconds ? !this.empty : false; 197 198 return m_connection.waitForData(timeout); 199 } 200 201 /// The stream that is wrapped by this one 202 @property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; } 203 /// ditto 204 @property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; } 205 /// ditto 206 static if (!is(Stream == InterfaceProxy!Stream)) 207 @property void underlying(Stream value) { this.underlying = InterfaceProxy!Stream(value); } 208 209 @property bool empty() { return m_input ? m_input.empty : true; } 210 211 @property ulong leastSize() { return m_input ? m_input.leastSize : 0; } 212 213 @property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; } 214 215 const(ubyte)[] peek() { return m_input.peek(); } 216 217 size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); } 218 219 alias read = ConnectionStream.read; 220 221 size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); } 222 223 alias write = ConnectionStream.write; 224 225 void flush() { m_output.flush(); } 226 227 void finalize() { m_output.finalize(); } 228 } 229 230 231 /** 232 Implements an input range interface on top of an InputStream using an 233 internal buffer. 234 235 The buffer is GC allocated and is filled chunk wise. Thus an InputStream 236 that has been wrapped in a StreamInputRange cannot be used reliably on its 237 own anymore. 238 239 Reading occurs in a fully lazy fashion. The first call to either front, 240 popFront or empty will potentially trigger waiting for the next chunk of 241 data to arrive - but especially popFront will not wait if it was called 242 after a call to front. This property allows the range to be used in 243 request-response scenarios. 244 */ 245 struct StreamInputRange { 246 @safe: 247 248 private { 249 struct Buffer { 250 ubyte[256] data = void; 251 size_t fill = 0; 252 } 253 InputStream m_stream; 254 Buffer* m_buffer; 255 } 256 257 this (InputStream stream) 258 { 259 m_stream = stream; 260 m_buffer = new Buffer; 261 } 262 263 @property bool empty() { return !m_buffer.fill && m_stream.empty; } 264 265 ubyte front() 266 { 267 if (m_buffer.fill < 1) readChunk(); 268 return m_buffer.data[$ - m_buffer.fill]; 269 } 270 void popFront() 271 { 272 assert(!empty); 273 if (m_buffer.fill < 1) readChunk(); 274 m_buffer.fill--; 275 } 276 277 private void readChunk() 278 { 279 auto sz = min(m_stream.leastSize, m_buffer.data.length); 280 assert(sz > 0); 281 m_stream.read(m_buffer.data[$-sz .. $]); 282 m_buffer.fill = sz; 283 } 284 } 285 286 287 /** 288 Implements a buffered output range interface on top of an OutputStream. 289 */ 290 StreamOutputRange!OutputStream StreamOutputRange()(OutputStream stream) { return StreamOutputRange!OutputStream(stream); } 291 /// ditto 292 struct StreamOutputRange(OutputStream, size_t buffer_size = 256) 293 if (isOutputStream!OutputStream) 294 { 295 @safe: 296 297 private { 298 OutputStream m_stream; 299 size_t m_fill = 0; 300 ubyte[buffer_size] m_data = void; 301 } 302 303 @disable this(this); 304 305 this(OutputStream stream) 306 { 307 m_stream = stream; 308 } 309 310 ~this() 311 { 312 flush(); 313 } 314 315 void flush() 316 { 317 if (m_fill == 0) return; 318 m_stream.write(m_data[0 .. m_fill]); 319 m_fill = 0; 320 } 321 322 void drop() 323 { 324 m_fill = 0; 325 } 326 327 void put(ubyte bt) 328 { 329 m_data[m_fill++] = bt; 330 if (m_fill >= m_data.length) flush(); 331 } 332 333 void put(const(ubyte)[] bts) 334 { 335 // avoid writing more chunks than necessary 336 if (bts.length + m_fill >= m_data.length * 2) { 337 flush(); 338 m_stream.write(bts); 339 return; 340 } 341 342 while (bts.length) { 343 auto len = min(m_data.length - m_fill, bts.length); 344 m_data[m_fill .. m_fill + len] = bts[0 .. len]; 345 m_fill += len; 346 bts = bts[len .. $]; 347 if (m_fill >= m_data.length) flush(); 348 } 349 } 350 351 void put(char elem) { put(cast(ubyte)elem); } 352 void put(const(char)[] elems) { put(cast(const(ubyte)[])elems); } 353 354 void put(dchar elem) 355 { 356 import std.utf; 357 char[4] chars; 358 auto len = encode(chars, elem); 359 put(chars[0 .. len]); 360 } 361 362 void put(const(dchar)[] elems) { foreach( ch; elems ) put(ch); } 363 } 364 /// ditto 365 auto streamOutputRange(size_t buffer_size = 256, OutputStream)(OutputStream stream) 366 if (isOutputStream!OutputStream) 367 { 368 return StreamOutputRange!(OutputStream, buffer_size)(stream); 369 } 370 371 unittest { 372 static long writeLength(ARGS...)(ARGS args) { 373 import vibe.stream.memory; 374 auto dst = createMemoryOutputStream; 375 { 376 auto rng = StreamOutputRange(dst); 377 foreach (a; args) rng.put(a); 378 } 379 return dst.data.length; 380 } 381 assert(writeLength("hello", ' ', "world") == "hello world".length); 382 assert(writeLength("h\u00E4llo", ' ', "world") == "h\u00E4llo world".length); 383 assert(writeLength("hello", '\u00E4', "world") == "hello\u00E4world".length); 384 assert(writeLength("h\u1000llo", '\u1000', "world") == "h\u1000llo\u1000world".length); 385 auto test = "häl"; 386 assert(test.length == 4); 387 assert(writeLength(test[0], test[1], test[2], test[3]) == test.length); 388 }