1 /** 2 Generic stream interface used by several stream-like classes. 3 4 This module defines the basic stream primitives. For concrete stream types, take a look at the 5 `vibe.stream` package. The `vibe.stream.operations` module contains additional high-level 6 operations on streams, such as reading streams by line or as a whole. 7 8 Copyright: © 2012-2015 RejectedSoftware e.K. 9 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 10 Authors: Sönke Ludwig 11 */ 12 module vibe.core.stream; 13 14 import core.time; 15 import std.algorithm; 16 import std.conv; 17 18 19 /**************************************************************************************************/ 20 /* Public functions */ 21 /**************************************************************************************************/ 22 23 /** Pipes an InputStream directly into this OutputStream. 24 25 The number of bytes written is either the whole input stream when `nbytes == 0`, or exactly 26 `nbytes` for `nbytes > 0`. If the input stream contains less than `nbytes` of data, an 27 exception is thrown. 28 */ 29 void pipe(IS : InputStream, OS : OutputStream)(IS source, OS sink, ulong nbytes = 0) 30 @safe { 31 import vibe.internal.allocator : dispose, makeArray, theAllocator; 32 33 auto buffer = () @trusted { return theAllocator.makeArray!ubyte(64*1024); } (); 34 scope (exit) () @trusted { theAllocator.dispose(buffer); } (); 35 36 //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); 37 if (nbytes == 0 || nbytes == ulong.max) { 38 while (!source.empty) { 39 size_t chunk = min(source.leastSize, buffer.length); 40 assert(chunk > 0, "leastSize returned zero for non-empty stream."); 41 //logTrace("read pipe chunk %d", chunk); 42 source.read(buffer[0 .. chunk], IOMode.all); 43 sink.write(buffer[0 .. chunk], IOMode.all); 44 } 45 } else { 46 while (nbytes > 0) { 47 size_t chunk = min(nbytes, buffer.length); 48 //logTrace("read pipe chunk %d", chunk); 49 source.read(buffer[0 .. chunk], IOMode.all); 50 sink.write(buffer[0 .. chunk], IOMode.all); 51 nbytes -= chunk; 52 } 53 } 54 } 55 56 /** 57 Returns a `NullOutputStream` instance. 58 59 The instance will only be created on the first request and gets reused for 60 all subsequent calls from the same thread. 61 */ 62 NullOutputStream nullSink() 63 @safe { 64 static NullOutputStream ret; 65 if (!ret) ret = new NullOutputStream; 66 return ret; 67 } 68 69 /**************************************************************************************************/ 70 /* Public types */ 71 /**************************************************************************************************/ 72 73 /** Controls the waiting behavior of read/write operations. 74 75 Note that this is currently ignored for all device streams. Use the "vibe-core" package if you 76 need this functionality. 77 */ 78 enum IOMode { 79 immediate, /// not supported 80 once, /// not supported 81 all /// Writes/reads the whole buffer 82 } 83 84 /** 85 Interface for all classes implementing readable streams. 86 */ 87 interface InputStream { 88 @safe: 89 90 /** Returns true $(I iff) the end of the input stream has been reached. 91 */ 92 @property bool empty(); 93 94 /** (Scheduled for deprecation) Returns the maximum number of bytes that are known to remain in this stream until the 95 end is reached. 96 97 After `leastSize()` bytes have been read, the stream will either have 98 reached EOS and `empty()` returns `true`, or `leastSize()` returns again a number `> 0`. 99 */ 100 @property ulong leastSize(); 101 102 /** (Scheduled for deprecation) Queries if there is data available for immediate, non-blocking read. 103 */ 104 @property bool dataAvailableForRead(); 105 106 /** Returns a temporary reference to the data that is currently buffered. 107 108 The returned slice typically has the size `leastSize()` or `0` if 109 `dataAvailableForRead()` returns false. Streams that don't have an 110 internal buffer will always return an empty slice. 111 112 Note that any method invocation on the same stream potentially 113 invalidates the contents of the returned buffer. 114 */ 115 const(ubyte)[] peek(); 116 117 /** Fills the preallocated array 'bytes' with data from the stream. 118 119 Throws: An exception if the operation reads past the end of the stream 120 */ 121 size_t read(scope ubyte[] dst, IOMode); 122 /// ditto 123 final void read(scope ubyte[] dst) { read(dst, IOMode.all); } 124 } 125 126 /** 127 Interface for all classes implementing writeable streams. 128 */ 129 interface OutputStream { 130 @safe: 131 132 /** Writes an array of bytes to the stream. 133 */ 134 size_t write(in ubyte[] bytes, IOMode mode); 135 /// ditto 136 final void write(in ubyte[] bytes) { write(bytes, IOMode.all); } 137 /// ditto 138 final void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); } 139 140 /** Flushes the stream and makes sure that all data is being written to the output device. 141 */ 142 void flush(); 143 144 /** Flushes and finalizes the stream. 145 146 Finalize has to be called on certain types of streams. No writes are possible after a 147 call to finalize(). 148 */ 149 void finalize(); 150 151 /** Pipes an InputStream directly into this OutputStream. 152 153 The number of bytes written is either the whole input stream when nbytes == 0, or exactly 154 nbytes for nbytes > 0. If the input stream contains less than nbytes of data, an exception 155 is thrown. 156 */ 157 deprecated("Use pipe(source, sink) instead.") 158 final void write(InputStream stream, ulong nbytes = 0) 159 { 160 stream.pipe(this, nbytes); 161 } 162 } 163 164 /** 165 Interface for all classes implementing readable and writable streams. 166 */ 167 interface Stream : InputStream, OutputStream { 168 } 169 170 171 /** 172 Interface for streams based on a connection. 173 174 Connection streams are based on streaming socket connections, pipes and 175 similar end-to-end streams. 176 177 See_also: vibe.core.net.TCPConnection 178 */ 179 interface ConnectionStream : Stream { 180 @safe: 181 182 /** Determines The current connection status. 183 184 If connected is false, writing to the connection will trigger an 185 exception. Reading may still succeed as long as there is data left in 186 the input buffer. Use InputStream.empty to determine when to stop 187 reading. 188 */ 189 @property bool connected() const; 190 191 /** Actively closes the connection and frees associated resources. 192 193 Note that close must always be called, even if the remote has already 194 closed the connection. Failure to do so will result in resource and 195 memory leakage. 196 197 Closing a connection implies a call to finalize, so that it doesn't 198 need to be called explicitly (it will be a no-op in that case). 199 */ 200 void close(); 201 202 /** Blocks until data becomes available for read. 203 204 The maximum wait time can be customized with the `timeout` parameter. 205 If there is already data availabe for read, or if the connection is 206 closed, the function will return immediately without blocking. 207 208 Params: 209 timeout = Optional timeout, the default value of `Duration.max` 210 indicates an infinite timeout 211 212 Returns: 213 The function will return `true` if data becomes available before the 214 timeout is reached. If the connection gets closed, or the timeout 215 gets reached, `false` is returned instead. 216 */ 217 bool waitForData(Duration timeout = Duration.max); 218 } 219 220 221 /** 222 Interface for all streams supporting random access. 223 */ 224 interface RandomAccessStream : Stream { 225 @safe: 226 227 /// Returns the total size of the file. 228 @property ulong size() const nothrow; 229 230 /// Determines if this stream is readable. 231 @property bool readable() const nothrow; 232 233 /// Determines if this stream is writable. 234 @property bool writable() const nothrow; 235 236 /// Seeks to a specific position in the file if supported by the stream. 237 void seek(ulong offset); 238 239 /// Returns the current offset of the file pointer 240 ulong tell() nothrow; 241 } 242 243 244 /** 245 Stream implementation acting as a sink with no function. 246 247 Any data written to the stream will be ignored and discarded. This stream type is useful if 248 the output of a particular stream is not needed but the stream needs to be drained. 249 */ 250 final class NullOutputStream : OutputStream { 251 size_t write(in ubyte[] bytes, IOMode) { return bytes.length; } 252 alias write = OutputStream.write; 253 void flush() {} 254 void finalize() {} 255 } 256 257 258 alias InputStreamProxy = InputStream; 259 alias OutputStreamProxy = OutputStream; 260 alias StreamProxy = Stream; 261 alias ConnectionStreamProxy = ConnectionStream; 262 alias RandomAccessStreamProxy = RandomAccessStream; 263 264 enum isInputStream(T) = is(T : InputStream); 265 enum isOutputStream(T) = is(T : OutputStream); 266 enum isStream(T) = is(T : Stream); 267 enum isConnectionStream(T) = is(T : ConnectionStream); 268 enum isRandomAccessStream(T) = is(T : RandomAccessStream); 269 270 mixin template validateInputStream(T) { static assert(isInputStream!T); } 271 mixin template validateOutputStream(T) { static assert(isOutputStream!T); } 272 mixin template validateStream(T) { static assert(isStream!T); } 273 mixin template validateConnectionStream(T) { static assert(isConnectionStream!T); } 274 mixin template validateRandomAccessStream(T) { static assert(isRandomAccessStream!T); }