1 /** 2 Zlib input/output streams 3 4 Copyright: © 2012-2013 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.zlib; 9 10 import vibe.core.stream; 11 import vibe.utils.array; 12 import vibe.internal.freelistref; 13 import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy; 14 15 import std.algorithm; 16 import std.exception; 17 import etc.c.zlib; 18 19 import vibe.core.log; 20 21 22 /** Creates a new deflate uncompression stream. 23 */ 24 ZlibInputStream createDeflateInputStream(InputStream)(InputStream source) @safe 25 if (isInputStream!InputStream) 26 { 27 return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true); 28 } 29 30 /// private 31 FreeListRef!ZlibInputStream createDeflateInputStreamFL(InputStream)(InputStream source) @safe 32 if (isInputStream!InputStream) 33 { 34 return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true); 35 } 36 37 /** Creates a new deflate compression stream. 38 */ 39 ZlibOutputStream createDeflateOutputStream(OutputStream)(OutputStream destination) @safe 40 if (isOutputStream!OutputStream) 41 { 42 return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true); 43 } 44 45 /// private 46 FreeListRef!ZlibOutputStream createDeflateOutputStreamFL(OutputStream)(OutputStream destination) @safe 47 if (isOutputStream!OutputStream) 48 { 49 return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true); 50 } 51 52 /** Creates a new deflate uncompression stream. 53 */ 54 ZlibInputStream createGzipInputStream(InputStream)(InputStream source) @safe 55 if (isInputStream!InputStream) 56 { 57 return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true); 58 } 59 60 /// private 61 FreeListRef!ZlibInputStream createGzipInputStreamFL(InputStream)(InputStream source) @safe 62 if (isInputStream!InputStream) 63 { 64 return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true); 65 } 66 67 /** Creates a new deflate uncompression stream. 68 */ 69 ZlibOutputStream createGzipOutputStream(OutputStream)(OutputStream destination) @safe 70 if (isOutputStream!OutputStream) 71 { 72 return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true); 73 } 74 75 /// private 76 FreeListRef!ZlibOutputStream createGzipOutputStreamFL(OutputStream)(OutputStream destination) @safe 77 if (isOutputStream!OutputStream) 78 { 79 return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true); 80 } 81 82 83 /** 84 Generic zlib output stream. 85 */ 86 class ZlibOutputStream : OutputStream { 87 @safe: 88 89 private { 90 InterfaceProxy!OutputStream m_out; 91 z_stream m_zstream; 92 ubyte[1024] m_outbuffer; 93 //ubyte[4096] m_inbuffer; 94 bool m_finalized = false; 95 } 96 97 enum HeaderFormat { 98 gzip, 99 deflate 100 } 101 102 /// private 103 this(InterfaceProxy!OutputStream dst, HeaderFormat type, int level, bool dummy) 104 { 105 m_out = dst; 106 zlibEnforce(() @trusted { return deflateInit2(&m_zstream, level, Z_DEFLATED, 15 + (type == HeaderFormat.gzip ? 16 : 0), 8, Z_DEFAULT_STRATEGY); } ()); 107 } 108 109 ~this() { 110 if (!m_finalized) 111 () @trusted { deflateEnd(&m_zstream); } (); 112 } 113 114 final size_t write(in ubyte[] data, IOMode mode) 115 { 116 // TODO: support IOMode! 117 if (!data.length) return 0; 118 assert(!m_finalized); 119 assert(m_zstream.avail_in == 0); 120 m_zstream.next_in = () @trusted { return cast(ubyte*)data.ptr; } (); 121 assert(data.length < uint.max); 122 m_zstream.avail_in = cast(uint)data.length; 123 doFlush(Z_NO_FLUSH); 124 assert(m_zstream.avail_in == 0); 125 m_zstream.next_in = null; 126 return data.length; 127 } 128 129 alias write = OutputStream.write; 130 131 final void flush() 132 { 133 assert(!m_finalized); 134 //doFlush(Z_SYNC_FLUSH); 135 m_out.flush(); 136 } 137 138 final void finalize() 139 { 140 if (m_finalized) return; 141 m_finalized = true; 142 doFlush(Z_FINISH); 143 m_out.flush(); 144 zlibEnforce(() @trusted { return deflateEnd(&m_zstream); }()); 145 } 146 147 private final void doFlush(int how) 148 @safe { 149 while (true) { 150 m_zstream.next_out = &m_outbuffer[0]; 151 m_zstream.avail_out = cast(uint)m_outbuffer.length; 152 //logInfo("deflate %s -> %s (%s)", m_zstream.avail_in, m_zstream.avail_out, how); 153 auto ret = () @trusted { return deflate(&m_zstream, how); } (); 154 //logInfo(" ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out); 155 switch (ret) { 156 default: 157 zlibEnforce(ret); 158 assert(false, "Unknown return value for zlib deflate."); 159 case Z_OK: 160 assert(m_zstream.avail_out < m_outbuffer.length || m_zstream.avail_in == 0); 161 m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]); 162 break; 163 case Z_BUF_ERROR: 164 assert(m_zstream.avail_in == 0); 165 return; 166 case Z_STREAM_END: 167 assert(how == Z_FINISH); 168 m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]); 169 return; 170 } 171 } 172 } 173 } 174 175 176 unittest { 177 import vibe.stream.memory; 178 import vibe.stream.operations; 179 180 auto raw = cast(ubyte[])"Hello, World!\n".dup; 181 ubyte[] gzip = [ 182 0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74, 183 0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51, 184 0xE4, 0x02, 0x00, 0x84, 0x9E, 0xE8, 0xB4, 0x0E, 0x00, 0x00, 0x00]; 185 186 auto gzipin = createGzipInputStream(createMemoryStream(gzip)); 187 assert(gzipin.readAll() == raw); 188 } 189 190 unittest { 191 import vibe.stream.memory; 192 import vibe.stream.operations; 193 194 ubyte[] gzip_partial = [ 195 0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74, 196 0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51, 197 ]; 198 199 auto gzipin = createGzipInputStream(createMemoryStream(gzip_partial)); 200 try { 201 gzipin.readAll(); 202 assert(false, "Expected exception."); 203 } catch (Exception e) {} 204 assert(gzipin.empty); 205 } 206 207 /** 208 Generic zlib input stream. 209 */ 210 class ZlibInputStream : InputStream { 211 @safe: 212 213 import std.zlib; 214 private { 215 InterfaceProxy!InputStream m_in; 216 z_stream m_zstream; 217 FixedRingBuffer!(ubyte, 4096) m_outbuffer; 218 ubyte[1024] m_inbuffer; 219 bool m_finished = false; 220 ulong m_ninflated, n_read; 221 } 222 223 enum HeaderFormat { 224 gzip, 225 deflate, 226 automatic 227 } 228 229 /// private 230 this(InterfaceProxy!InputStream src, HeaderFormat type, bool dummy) 231 { 232 m_in = src; 233 if (m_in.empty) { 234 m_finished = true; 235 } else { 236 int wndbits = 15; 237 if(type == HeaderFormat.gzip) wndbits += 16; 238 else if(type == HeaderFormat.automatic) wndbits += 32; 239 zlibEnforce(() @trusted { return inflateInit2(&m_zstream, wndbits); } ()); 240 readChunk(); 241 } 242 } 243 244 ~this() { 245 if (!m_finished) 246 () @trusted { inflateEnd(&m_zstream); } (); 247 } 248 249 @property bool empty() { return this.leastSize == 0; } 250 251 @property ulong leastSize() 252 { 253 assert(!m_finished || m_in.empty, "Input contains more data than expected."); 254 if (m_outbuffer.length > 0) return m_outbuffer.length; 255 if (m_finished) return 0; 256 readChunk(); 257 assert(m_outbuffer.length || m_finished); 258 259 return m_outbuffer.length; 260 } 261 262 @property bool dataAvailableForRead() 263 { 264 return m_outbuffer.length > 0; 265 } 266 267 const(ubyte)[] peek() { return m_outbuffer.peek(); } 268 269 size_t read(scope ubyte[] dst, IOMode mode) 270 { 271 enforce(dst.length == 0 || !empty, "Reading empty stream"); 272 273 size_t nread = 0; 274 275 while (dst.length > 0) { 276 auto len = min(m_outbuffer.length, dst.length); 277 m_outbuffer.read(dst[0 .. len]); 278 dst = dst[len .. $]; 279 280 nread += len; 281 282 if (!m_outbuffer.length && !m_finished) { 283 if (mode == IOMode.immediate || mode == IOMode.once && !nread) 284 break; 285 readChunk(); 286 } 287 enforce(dst.length == 0 || m_outbuffer.length || !m_finished, "Reading past end of zlib stream."); 288 } 289 290 return nread; 291 } 292 293 alias read = InputStream.read; 294 295 private void readChunk() 296 @safe { 297 assert(m_outbuffer.length == 0, "Buffer must be empty to read the next chunk."); 298 assert(m_outbuffer.peekDst().length > 0); 299 enforce (!m_finished, "Reading past end of zlib stream."); 300 301 m_zstream.next_out = &m_outbuffer.peekDst()[0]; 302 m_zstream.avail_out = cast(uint)m_outbuffer.peekDst().length; 303 304 while (!m_outbuffer.length) { 305 if (m_zstream.avail_in == 0) { 306 auto clen = min(m_inbuffer.length, m_in.leastSize); 307 if (clen == 0) { 308 m_finished = true; 309 throw new Exception("Premature end of compressed input."); 310 } 311 m_in.read(m_inbuffer[0 .. clen]); 312 m_zstream.next_in = &m_inbuffer[0]; 313 m_zstream.avail_in = cast(uint)clen; 314 } 315 auto avins = m_zstream.avail_in; 316 //logInfo("inflate %s -> %s (@%s in @%s)", m_zstream.avail_in, m_zstream.avail_out, m_ninflated, n_read); 317 auto ret = zlibEnforce(() @trusted { return inflate(&m_zstream, Z_SYNC_FLUSH); } ()); 318 //logInfo(" ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out); 319 assert(m_zstream.avail_out != m_outbuffer.peekDst.length || m_zstream.avail_in != avins); 320 m_ninflated += m_outbuffer.peekDst().length - m_zstream.avail_out; 321 n_read += avins - m_zstream.avail_in; 322 m_outbuffer.putN(m_outbuffer.peekDst().length - m_zstream.avail_out); 323 assert(m_zstream.avail_out == 0 || m_zstream.avail_out == m_outbuffer.peekDst().length); 324 325 if (ret == Z_STREAM_END) { 326 m_finished = true; 327 zlibEnforce(() @trusted { return inflateEnd(&m_zstream); }()); 328 enforce(m_in.empty, "Extra data after end of compressed input."); 329 return; 330 } 331 } 332 } 333 } 334 335 unittest { 336 import vibe.stream.memory; 337 338 auto data = new ubyte[5000]; 339 340 auto mos = createMemoryOutputStream(); 341 auto gos = createGzipOutputStream(mos); 342 gos.write(data); 343 gos.finalize(); 344 345 auto ms = createMemoryStream(mos.data, false); 346 auto gis = createGzipInputStream(ms); 347 348 auto result = new ubyte[data.length]; 349 gis.read(result); 350 assert(data == result); 351 } 352 353 private int zlibEnforce(int result) 354 @safe { 355 switch (result) { 356 default: 357 if (result < 0) throw new Exception("unknown zlib error"); 358 else return result; 359 case Z_ERRNO: throw new Exception("zlib errno error"); 360 case Z_STREAM_ERROR: throw new Exception("zlib stream error"); 361 case Z_DATA_ERROR: throw new Exception("zlib data error"); 362 case Z_MEM_ERROR: throw new Exception("zlib memory error"); 363 case Z_BUF_ERROR: throw new Exception("zlib buffer error"); 364 case Z_VERSION_ERROR: throw new Exception("zlib version error"); 365 } 366 }