1 /**
2 Standard I/O streams
3
4 Copyright: © 2014 Sönke Ludwig
5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 Authors: Sönke Ludwig, Eric Cornelius
7 */
8 module vibe.stream.stdio;
9
10 import vibe.core.core;
11 import vibe.core.log;
12 import vibe.core.stream;
13 import vibe.stream.taskpipe;
14
15 import std.stdio;
16 import core.thread;
17
18 import std.exception;
19
20 class StdFileStream : ConnectionStream {
21 private {
22 std.stdio.File m_file;
23 TaskPipe m_readPipe;
24 TaskPipe m_writePipe;
25 Thread m_readThread;
26 Thread m_writeThread;
27 }
28
29 this(bool read, bool write)
30 {
31 if (read) m_readPipe = new TaskPipe;
32 if (write) m_writePipe = new TaskPipe;
33 }
34
35 void setup(std.stdio.File file)
36 {
37 m_file = file;
38
39 if (m_readPipe) {
40 m_readThread = new Thread(&readThreadFunc);
41 m_readThread.name = "StdFileStream reader";
42 m_readThread.start();
43 }
44
45 if (m_writePipe) {
46 m_writeThread = new Thread(&writeThreadFunc);
47 m_writeThread.name = "StdFileStream writer";
48 m_writeThread.start();
49 }
50 }
51
52 @property std.stdio.File stdFile() { return m_file; }
53
54 override @property bool empty() { enforceReadable(); return m_readPipe.empty; }
55
56 override @property ulong leastSize()
57 {
58 enforceReadable();
59 return m_readPipe.leastSize;
60 }
61
62 override @property bool dataAvailableForRead()
63 {
64 enforceReadable();
65 return m_readPipe.dataAvailableForRead;
66 }
67
68 override @property bool connected() const { return m_readPipe.connected; }
69
70 override void close() { m_writePipe.close(); }
71
72 override bool waitForData(Duration timeout) { return m_readPipe.waitForData(timeout); }
73
74 override const(ubyte)[] peek()
75 {
76 enforceReadable();
77 return m_readPipe.peek();
78 }
79
80 override size_t read(scope ubyte[] dst, IOMode mode)
81 {
82 enforceReadable();
83 return m_readPipe.read(dst, mode);
84 }
85
86 alias read = ConnectionStream.read;
87
88 override size_t write(in ubyte[] bytes_, IOMode mode)
89 {
90 enforceWritable();
91 return m_writePipe.write(bytes_, mode);
92 }
93
94 alias write = ConnectionStream.write;
95
96 override void flush()
97 {
98 enforceWritable();
99 m_writePipe.flush();
100 }
101
102 override void finalize()
103 {
104 enforceWritable();
105 if (!m_writePipe.connected) return;
106 flush();
107 m_writePipe.finalize();
108 }
109
110 void enforceReadable() @safe { enforce(m_readPipe, "Stream is not readable!"); }
111 void enforceWritable() @safe { enforce(m_writePipe, "Stream is not writable!"); }
112
113 private void readThreadFunc()
114 {
115 bool loop_flag = false;
116 runTask(() nothrow {
117 ubyte[1] buf;
118 try while (!m_file.eof) {
119 auto data = m_file.rawRead(buf);
120 if (!data.length) break;
121 m_readPipe.write(data, IOMode.all);
122 vibe.core.core.yield();
123 }
124 catch (Exception e) logException!(LogLevel.diagnostic)(e, "Failed to read from File");
125
126 try {
127 if (m_file.isOpen) m_file.close();
128 } catch (Exception e) logException(e, "Failed to close File");
129 try m_readPipe.finalize();
130 catch (Exception e) assert(false, e.msg);
131 if (loop_flag) exitEventLoop();
132 else loop_flag = true;
133 });
134 if (!loop_flag) {
135 loop_flag = true;
136 runEventLoop();
137 }
138 }
139
140 private void writeThreadFunc()
141 {
142 import std.algorithm : min;
143
144 bool loop_flag = false;
145 runTask(() nothrow {
146 ubyte[1024] buf;
147 try while (m_file.isOpen && !m_writePipe.empty) {
148 auto len = min(buf.length, m_writePipe.leastSize);
149 if (!len) break;
150 m_writePipe.read(buf[0 .. len], IOMode.all);
151 m_file.rawWrite(buf[0 .. len]);
152 vibe.core.core.yield();
153 }
154 catch (Exception e) logException!(LogLevel.diagnostic)(e, "Failed to write to File");
155
156 try {
157 if (m_file.isOpen) m_file.close();
158 } catch (Exception e) logException(e, "Failed to close File");
159 if (loop_flag) exitEventLoop();
160 else loop_flag = true;
161 });
162 if (!loop_flag) {
163 loop_flag = true;
164 runEventLoop();
165 }
166 }
167 }
168
169 /**
170 OutputStream that writes to stdout
171 */
172 final class StdoutStream : StdFileStream {
173 this() {
174 super(false, true);
175 setup(stdout);
176 }
177 }
178
179 /**
180 OutputStream that writes to stderr
181 */
182 final class StderrStream : StdFileStream {
183 this() {
184 super(false, true);
185 setup(stderr);
186 }
187 }
188
189 /**
190 InputStream that reads from stdin
191 */
192 final class StdinStream : StdFileStream {
193 this() {
194 super(true, false);
195 setup(stdin);
196 }
197 }