1 /**
2 	Standard I/O streams
3 
4 	Copyright: © 2014 RejectedSoftware e.K.
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig, Eric Cornelius
7 */
8 module vibe.stream.stdio;
9 
10 import vibe.core.core;
11 import vibe.core.stream;
12 import vibe.stream.taskpipe;
13 
14 import std.stdio;
15 import core.thread;
16 
17 import std.exception;
18 
19 class StdFileStream : ConnectionStream {
20 	private {
21 		std.stdio.File m_file;
22 		TaskPipe m_readPipe;
23 		TaskPipe m_writePipe;
24 		Thread m_readThread;
25 		Thread m_writeThread;
26 	}
27 
28 	this(bool read, bool write)
29 	{
30 		if (read) m_readPipe = new TaskPipe;
31 		if (write) m_writePipe = new TaskPipe;
32 	}
33 
34 	void setup(std.stdio.File file)
35 	{
36 		m_file = file;
37 
38 		if (m_readPipe) {
39 			m_readThread = new Thread(&readThreadFunc);
40 			m_readThread.name = "StdFileStream reader";
41 			m_readThread.start();
42 		}
43 
44 		if (m_writePipe) {
45 			m_writeThread = new Thread(&writeThreadFunc);
46 			m_writeThread.name = "StdFileStream writer";
47 			m_writeThread.start();
48 		}
49 	}
50 
51 	override @property bool empty() { enforceReadable(); return m_readPipe.empty; }
52 
53 	override @property ulong leastSize()
54 	{
55 		enforceReadable();
56 		return m_readPipe.leastSize;
57 	}
58 
59 	override @property bool dataAvailableForRead()
60 	{
61 		enforceReadable();
62 		return m_readPipe.dataAvailableForRead;
63 	}
64 
65 	override @property bool connected() const { return m_readPipe.connected; }
66 
67 	override void close() { m_writePipe.close(); }
68 
69 	override bool waitForData(Duration timeout) { return m_readPipe.waitForData(timeout); }
70 
71 	override const(ubyte)[] peek()
72 	{
73 		enforceReadable();
74 		return m_readPipe.peek();
75 	}
76 
77 	override size_t read(scope ubyte[] dst, IOMode mode)
78 	{
79 		enforceReadable();
80 		return m_readPipe.read(dst, mode);
81 	}
82 
83 	alias read = ConnectionStream.read;
84 
85 	override size_t write(in ubyte[] bytes_, IOMode mode)
86 	{
87 		enforceWritable();
88 		return m_writePipe.write(bytes_, mode);
89 	}
90 
91 	alias write = ConnectionStream.write;
92 
93 	override void flush()
94 	{
95 		enforceWritable();
96 		m_writePipe.flush();
97 	}
98 
99 	override void finalize()
100 	{
101 		enforceWritable();
102 		if (!m_writePipe.connected) return;
103 		flush();
104 		m_writePipe.finalize();
105 	}
106 
107 	void enforceReadable() @safe { enforce(m_readPipe, "Stream is not readable!"); }
108 	void enforceWritable() @safe { enforce(m_writePipe, "Stream is not writable!"); }
109 
110 	private void readThreadFunc()
111 	{
112 		bool loop_flag = false;
113 		runTask({
114 			ubyte[1] buf;
115 			scope(exit) {
116 				if (m_file.isOpen) m_file.close();
117 				m_readPipe.finalize();
118 				if (loop_flag) exitEventLoop();
119 				else loop_flag = true;
120 			}
121 			while (!m_file.eof) {
122 				auto data = m_file.rawRead(buf);
123 				if (!data.length) break;
124 				m_readPipe.write(data, IOMode.all);
125 				vibe.core.core.yield();
126 			}
127 		});
128 		if (!loop_flag) {
129 			loop_flag = true;
130 			runEventLoop();
131 		}
132 	}
133 
134 	private void writeThreadFunc()
135 	{
136 		import std.algorithm : min;
137 
138 		bool loop_flag = false;
139 		runTask({
140 			ubyte[1024] buf;
141 			scope(exit) {
142 				if (m_file.isOpen) m_file.close();
143 				if (loop_flag) exitEventLoop();
144 				else loop_flag = true;
145 			}
146 			while (m_file.isOpen && !m_writePipe.empty) {
147 				auto len = min(buf.length, m_writePipe.leastSize);
148 				if (!len) break;
149 				m_writePipe.read(buf[0 .. len], IOMode.all);
150 				m_file.rawWrite(buf[0 .. len]);
151 				vibe.core.core.yield();
152 			}
153 		});
154 		if (!loop_flag) {
155 			loop_flag = true;
156 			runEventLoop();
157 		}
158 	}
159 }
160 
161 /**
162 	OutputStream that writes to stdout
163 */
164 final class StdoutStream : StdFileStream {
165 	this() {
166 		super(false, true);
167 		setup(stdout);
168 	}
169 }
170 
171 /**
172 	OutputStream that writes to stderr
173 */
174 final class StderrStream : StdFileStream {
175 	this() {
176 		super(false, true);
177 		setup(stderr);
178 	}
179 }
180 
181 /**
182 	InputStream that reads from stdin
183 */
184 final class StdinStream : StdFileStream {
185 	this() {
186 		super(true, false);
187 		setup(stdin);
188 	}
189 }