1 /** 2 Stream interface for passing data between different tasks. 3 4 Copyright: © 2013 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.taskpipe; 9 10 public import vibe.core.stream; 11 12 import core.sync.mutex; 13 import core.time; 14 import std.algorithm : min; 15 import std.exception; 16 import vibe.core.core; 17 import vibe.core.sync; 18 import vibe.utils.array; 19 20 21 /** 22 Implements a unidirectional data pipe between two tasks. 23 */ 24 final class TaskPipe : ConnectionStream { 25 private { 26 TaskPipeImpl m_pipe; 27 } 28 29 /** Constructs a new pipe ready for use. 30 */ 31 this(bool grow_when_full = false) 32 { 33 m_pipe = new TaskPipeImpl(grow_when_full); 34 } 35 36 /// Size of the (fixed) FIFO buffer used to transfer data between tasks 37 @property size_t bufferSize() const { return m_pipe.bufferSize; } 38 /// ditto 39 @property void bufferSize(size_t nbytes) { m_pipe.bufferSize = nbytes; } 40 41 @property bool empty() { return leastSize() == 0; } 42 @property ulong leastSize() { m_pipe.waitForData(); return m_pipe.fill; } 43 @property bool dataAvailableForRead() { return m_pipe.fill > 0; } 44 @property bool connected() const { return m_pipe.open; } 45 46 void close() { m_pipe.close(); } 47 bool waitForData(Duration timeout) 48 { 49 if (dataAvailableForRead) return true; 50 m_pipe.waitForData(timeout); 51 return dataAvailableForRead; 52 } 53 const(ubyte)[] peek() { return m_pipe.peek; } 54 size_t read(scope ubyte[] dst, IOMode mode) { return m_pipe.read(dst, mode); } 55 alias read = ConnectionStream.read; 56 size_t write(in ubyte[] bytes, IOMode mode) { return m_pipe.write(bytes, mode); } 57 alias write = ConnectionStream.write; 58 void flush() {} 59 void finalize() { m_pipe.close(); } 60 } 61 62 63 /** 64 Underyling pipe implementation for TaskPipe with no Stream interface. 65 */ 66 private final class TaskPipeImpl { 67 @safe: 68 69 private { 70 Mutex m_mutex; 71 InterruptibleTaskCondition m_condition; 72 vibe.utils.array.FixedRingBuffer!ubyte m_buffer; 73 bool m_closed = false; 74 bool m_growWhenFull; 75 } 76 77 /** Constructs a new pipe ready for use. 78 */ 79 this(bool grow_when_full = false) 80 { 81 m_mutex = new Mutex; 82 () @trusted { m_condition = new InterruptibleTaskCondition(m_mutex); } (); 83 m_buffer.capacity = 2048; 84 m_growWhenFull = grow_when_full; 85 } 86 87 /// Size of the (fixed) buffer used to transfer data between tasks 88 @property size_t bufferSize() const { return m_buffer.capacity; } 89 /// ditto 90 @property void bufferSize(size_t nbytes) { m_buffer.capacity = nbytes; } 91 92 /// Number of bytes currently in the transfer buffer 93 @property size_t fill() 94 const { 95 synchronized (m_mutex) { 96 return m_buffer.length; 97 } 98 } 99 100 @property bool open() const { return !m_closed; } 101 102 /** Closes the pipe. 103 */ 104 void close() 105 { 106 synchronized (m_mutex) m_closed = true; 107 () @trusted { m_condition.notifyAll(); } (); 108 } 109 110 /** Blocks until at least one byte of data has been written to the pipe. 111 */ 112 void waitForData(Duration timeout = Duration.max) 113 { 114 import std.datetime : Clock, SysTime, UTC; 115 bool have_timeout = timeout > 0.seconds && timeout != Duration.max; 116 SysTime now = Clock.currTime(UTC()); 117 SysTime timeout_target; 118 if (have_timeout) timeout_target = now + timeout; 119 120 synchronized (m_mutex) { 121 while (m_buffer.empty && !m_closed && (!have_timeout || now < timeout_target)) { 122 if (have_timeout) 123 () @trusted { m_condition.wait(timeout_target - now); } (); 124 else () @trusted { m_condition.wait(); } (); 125 now = Clock.currTime(UTC()); 126 } 127 } 128 } 129 130 /** Writes the given byte array to the pipe. 131 */ 132 size_t write(const(ubyte)[] data, IOMode mode) 133 { 134 size_t ret = 0; 135 136 enforce(!m_closed, "Writing to closed task pipe."); 137 138 while (data.length > 0){ 139 bool need_signal; 140 synchronized (m_mutex) { 141 if (m_growWhenFull && m_buffer.full) { 142 size_t new_sz = m_buffer.capacity; 143 while (new_sz - m_buffer.capacity < data.length) new_sz += 2; 144 m_buffer.capacity = new_sz; 145 } else while (m_buffer.full) { 146 if (mode == IOMode.immediate || mode == IOMode.once && ret > 0) 147 return ret; 148 if (m_closed) throw new Exception("Pipe closed while writing data"); 149 () @trusted { m_condition.wait(); } (); 150 } 151 152 need_signal = m_buffer.empty; 153 auto len = min(m_buffer.freeSpace, data.length); 154 m_buffer.put(data[0 .. len]); 155 data = data[len .. $]; 156 ret += len; 157 } 158 if (need_signal) () @trusted { m_condition.notifyAll(); } (); 159 } 160 if (!m_growWhenFull) vibe.core.core.yield(); 161 162 return ret; 163 } 164 165 /** Returns a temporary view of the beginning of the transfer buffer. 166 167 Note that a call to read invalidates this array slice. Blocks in case 168 of a filled up transfer buffer. 169 */ 170 const(ubyte[]) peek() 171 { 172 synchronized (m_mutex) { 173 return m_buffer.peek(); 174 } 175 } 176 177 /** Reads data into the supplied buffer. 178 179 Blocks until a sufficient amount of data has been written to the pipe. 180 */ 181 size_t read(scope ubyte[] dst, IOMode mode) 182 { 183 size_t ret = 0; 184 185 while (dst.length > 0) { 186 bool need_signal; 187 size_t len; 188 synchronized (m_mutex) { 189 while (m_buffer.empty && !m_closed) { 190 if (mode == IOMode.immediate || mode == IOMode.once && ret > 0) 191 return ret; 192 () @trusted { m_condition.wait(); } (); 193 } 194 195 need_signal = m_buffer.full; 196 enforce(!m_buffer.empty, "Reading past end of closed pipe."); 197 len = min(dst.length, m_buffer.length); 198 m_buffer.read(dst[0 .. len]); 199 ret += len; 200 } 201 if (need_signal) () @trusted { m_condition.notifyAll(); } (); 202 dst = dst[len .. $]; 203 } 204 vibe.core.core.yield(); 205 206 return ret; 207 } 208 } 209 210 unittest { // issue #1501 - deadlock in TaskPipe 211 import std.datetime : Clock, UTC; 212 import core.time : msecs; 213 214 // test read after write and write after read 215 foreach (i; 0 .. 2) { 216 auto p = new TaskPipe; 217 p.bufferSize = 2048; 218 219 Task a, b; 220 a = runTask({ ubyte[2100] buf; if (i == 0) p.read(buf, IOMode.all); else p.write(buf, IOMode.all); }); 221 b = runTask({ ubyte[2100] buf; if (i == 0) p.write(buf, IOMode.all); else p.read(buf, IOMode.all); }); 222 223 auto joiner = runTask({ 224 auto starttime = Clock.currTime(UTC()); 225 while (a.running || b.running) { 226 if (Clock.currTime(UTC()) - starttime > 500.msecs) 227 assert(false, "TaskPipe is dead locked."); 228 yield(); 229 } 230 }); 231 232 joiner.join(); 233 } 234 } 235 236 unittest { // issue # 237 auto t = runTask({ 238 auto tp = new TaskPipeImpl; 239 tp.waitForData(10.msecs); 240 exitEventLoop(); 241 }); 242 runTask({ 243 sleep(500.msecs); 244 assert(!t.running, "TaskPipeImpl.waitForData didn't timeout."); 245 }); 246 runEventLoop(); 247 }