1 /** 2 Standard I/O streams 3 4 Copyright: © 2014 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig, Eric Cornelius 7 */ 8 module vibe.stream.stdio; 9 10 import vibe.core.core; 11 import vibe.core.log; 12 import vibe.core.stream; 13 import vibe.stream.taskpipe; 14 15 import std.stdio; 16 import core.thread; 17 18 import std.exception; 19 20 class StdFileStream : ConnectionStream { 21 private { 22 std.stdio.File m_file; 23 TaskPipe m_readPipe; 24 TaskPipe m_writePipe; 25 Thread m_readThread; 26 Thread m_writeThread; 27 } 28 29 this(bool read, bool write) 30 { 31 if (read) m_readPipe = new TaskPipe; 32 if (write) m_writePipe = new TaskPipe; 33 } 34 35 void setup(std.stdio.File file) 36 { 37 m_file = file; 38 39 if (m_readPipe) { 40 m_readThread = new Thread(&readThreadFunc); 41 m_readThread.name = "StdFileStream reader"; 42 m_readThread.start(); 43 } 44 45 if (m_writePipe) { 46 m_writeThread = new Thread(&writeThreadFunc); 47 m_writeThread.name = "StdFileStream writer"; 48 m_writeThread.start(); 49 } 50 } 51 52 @property std.stdio.File stdFile() { return m_file; } 53 54 override @property bool empty() { enforceReadable(); return m_readPipe.empty; } 55 56 override @property ulong leastSize() 57 { 58 enforceReadable(); 59 return m_readPipe.leastSize; 60 } 61 62 override @property bool dataAvailableForRead() 63 { 64 enforceReadable(); 65 return m_readPipe.dataAvailableForRead; 66 } 67 68 override @property bool connected() const { return m_readPipe.connected; } 69 70 override void close() { m_writePipe.close(); } 71 72 override bool waitForData(Duration timeout) { return m_readPipe.waitForData(timeout); } 73 74 override const(ubyte)[] peek() 75 { 76 enforceReadable(); 77 return m_readPipe.peek(); 78 } 79 80 override size_t read(scope ubyte[] dst, IOMode mode) 81 { 82 enforceReadable(); 83 return m_readPipe.read(dst, mode); 84 } 85 86 alias read = ConnectionStream.read; 87 88 static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) { 89 override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); } 90 } else { 91 override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); } 92 } 93 94 alias write = ConnectionStream.write; 95 96 private size_t doWrite(scope const(ubyte)[] bytes_, IOMode mode) 97 @safe { 98 enforceWritable(); 99 return m_writePipe.write(bytes_, mode); 100 } 101 102 override void flush() 103 { 104 enforceWritable(); 105 m_writePipe.flush(); 106 } 107 108 override void finalize() 109 { 110 enforceWritable(); 111 if (!m_writePipe.connected) return; 112 flush(); 113 m_writePipe.finalize(); 114 } 115 116 void enforceReadable() @safe { enforce(m_readPipe, "Stream is not readable!"); } 117 void enforceWritable() @safe { enforce(m_writePipe, "Stream is not writable!"); } 118 119 private void readThreadFunc() 120 { 121 bool loop_flag = false; 122 runTask(() nothrow { 123 ubyte[1] buf; 124 try while (!m_file.eof) { 125 auto data = m_file.rawRead(buf); 126 if (!data.length) break; 127 m_readPipe.write(data, IOMode.all); 128 vibe.core.core.yield(); 129 } 130 catch (Exception e) logException!(LogLevel.diagnostic)(e, "Failed to read from File"); 131 132 try { 133 if (m_file.isOpen) m_file.close(); 134 } catch (Exception e) logException(e, "Failed to close File"); 135 try m_readPipe.finalize(); 136 catch (Exception e) assert(false, e.msg); 137 if (loop_flag) exitEventLoop(); 138 else loop_flag = true; 139 }); 140 if (!loop_flag) { 141 loop_flag = true; 142 runEventLoop(); 143 } 144 } 145 146 private void writeThreadFunc() 147 { 148 import std.algorithm : min; 149 150 bool loop_flag = false; 151 runTask(() nothrow { 152 ubyte[1024] buf; 153 try while (m_file.isOpen && !m_writePipe.empty) { 154 auto len = min(buf.length, m_writePipe.leastSize); 155 if (!len) break; 156 m_writePipe.read(buf[0 .. len], IOMode.all); 157 m_file.rawWrite(buf[0 .. len]); 158 vibe.core.core.yield(); 159 } 160 catch (Exception e) logException!(LogLevel.diagnostic)(e, "Failed to write to File"); 161 162 try { 163 if (m_file.isOpen) m_file.close(); 164 } catch (Exception e) logException(e, "Failed to close File"); 165 if (loop_flag) exitEventLoop(); 166 else loop_flag = true; 167 }); 168 if (!loop_flag) { 169 loop_flag = true; 170 runEventLoop(); 171 } 172 } 173 } 174 175 /** 176 OutputStream that writes to stdout 177 */ 178 final class StdoutStream : StdFileStream { 179 this() { 180 super(false, true); 181 setup(stdout); 182 } 183 } 184 185 /** 186 OutputStream that writes to stderr 187 */ 188 final class StderrStream : StdFileStream { 189 this() { 190 super(false, true); 191 setup(stderr); 192 } 193 } 194 195 /** 196 InputStream that reads from stdin 197 */ 198 final class StdinStream : StdFileStream { 199 this() { 200 super(true, false); 201 setup(stdin); 202 } 203 }