1 /** 2 Contains interfaces and enums for evented I/O drivers. 3 4 Copyright: © 2012-2014 RejectedSoftware e.K. 5 Authors: Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 */ 8 module vibe.core.task; 9 10 import vibe.core.sync; 11 import vibe.utils.array; 12 13 import core.thread; 14 import std.exception; 15 import std.traits; 16 import std.typecons; 17 import std.variant; 18 19 20 /** Represents a single task as started using vibe.core.runTask. 21 22 Note that the Task type is considered weakly isolated and thus can be 23 passed between threads using vibe.core.concurrency.send or by passing 24 it as a parameter to vibe.core.core.runWorkerTask. 25 */ 26 struct Task { 27 private { 28 shared(TaskFiber) m_fiber; 29 size_t m_taskCounter; 30 import std.concurrency : ThreadInfo, Tid; 31 static ThreadInfo s_tidInfo; 32 } 33 34 private this(TaskFiber fiber, size_t task_counter) 35 @safe nothrow { 36 () @trusted { m_fiber = cast(shared)fiber; } (); 37 m_taskCounter = task_counter; 38 } 39 40 this(in Task other) nothrow { m_fiber = cast(shared(TaskFiber))other.m_fiber; m_taskCounter = other.m_taskCounter; } 41 42 /** Returns the Task instance belonging to the calling task. 43 */ 44 static Task getThis() nothrow @safe 45 { 46 auto fiber = () @trusted { return Fiber.getThis(); } (); 47 if (!fiber) return Task.init; 48 auto tfiber = cast(TaskFiber)fiber; 49 if (!tfiber) return Task.init; 50 if (!tfiber.m_running) return Task.init; 51 return () @trusted { return Task(tfiber, tfiber.m_taskCounter); } (); 52 } 53 54 nothrow { 55 @property inout(TaskFiber) fiber() inout @trusted { return cast(inout(TaskFiber))m_fiber; } 56 @property size_t taskCounter() const @safe { return m_taskCounter; } 57 @property inout(Thread) thread() inout @safe { if (m_fiber) return this.fiber.thread; return null; } 58 59 /** Determines if the task is still running. 60 */ 61 @property bool running() 62 const @trusted { 63 assert(m_fiber !is null, "Invalid task handle"); 64 try if (this.fiber.state == Fiber.State.TERM) return false; catch (Throwable) {} 65 return this.fiber.m_running && this.fiber.m_taskCounter == m_taskCounter; 66 } 67 68 // FIXME: this is not thread safe! 69 @property ref ThreadInfo tidInfo() { return m_fiber ? fiber.tidInfo : s_tidInfo; } 70 @property Tid tid() { return tidInfo.ident; } 71 } 72 73 /// Reserved for internal use! 74 @property inout(MessageQueue) messageQueue() inout { assert(running, "Task is not running"); return fiber.messageQueue; } 75 76 T opCast(T)() const nothrow if (is(T == bool)) { return m_fiber !is null; } 77 78 void join() @safe { if (running) fiber.join(); } 79 void interrupt() { if (running) fiber.interrupt(); } 80 void terminate() { if (running) fiber.terminate(); } 81 82 string toString() const @safe { import std.string; return format("%s:%s", () @trusted { return cast(void*)m_fiber; } (), m_taskCounter); } 83 84 bool opEquals(in ref Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } 85 bool opEquals(in Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } 86 } 87 88 89 90 /** The base class for a task aka Fiber. 91 92 This class represents a single task that is executed concurrently 93 with other tasks. Each task is owned by a single thread. 94 */ 95 class TaskFiber : Fiber { 96 private { 97 import std.concurrency : ThreadInfo; 98 Thread m_thread; 99 ThreadInfo m_tidInfo; 100 MessageQueue m_messageQueue; 101 } 102 103 protected { 104 shared size_t m_taskCounter; 105 shared bool m_running; 106 } 107 108 protected this(void delegate() fun, size_t stack_size) 109 nothrow { 110 super(fun, stack_size); 111 m_thread = Thread.getThis(); 112 scope (failure) assert(false); 113 m_messageQueue = new MessageQueue; 114 } 115 116 /** Returns the thread that owns this task. 117 */ 118 @property inout(Thread) thread() inout @safe nothrow { return m_thread; } 119 120 /** Returns the handle of the current Task running on this fiber. 121 */ 122 @property Task task() @safe nothrow { return Task(this, m_taskCounter); } 123 124 /// Reserved for internal use! 125 @property inout(MessageQueue) messageQueue() inout { return m_messageQueue; } 126 127 @property ref inout(ThreadInfo) tidInfo() inout nothrow { return m_tidInfo; } 128 129 /** Blocks until the task has ended. 130 */ 131 abstract void join() @safe; 132 133 /** Throws an InterruptExeption within the task as soon as it calls a blocking function. 134 */ 135 abstract void interrupt(); 136 137 /** Terminates the task without notice as soon as it calls a blocking function. 138 */ 139 abstract void terminate(); 140 141 void bumpTaskCounter() 142 @safe nothrow { 143 import core.atomic : atomicOp; 144 () @trusted { atomicOp!"+="(this.m_taskCounter, 1); } (); 145 } 146 } 147 148 149 /** Exception that is thrown by Task.interrupt. 150 */ 151 class InterruptException : Exception { 152 this() 153 { 154 super("Task interrupted."); 155 } 156 } 157 158 159 class MessageQueue { 160 private { 161 InterruptibleTaskMutex m_mutex; 162 InterruptibleTaskCondition m_condition; 163 FixedRingBuffer!Variant m_queue; 164 FixedRingBuffer!Variant m_priorityQueue; 165 size_t m_maxMailboxSize = 0; 166 bool function(Task) m_onCrowding; 167 } 168 169 this() 170 { 171 m_mutex = new InterruptibleTaskMutex; 172 m_condition = new InterruptibleTaskCondition(m_mutex); 173 m_queue.capacity = 32; 174 m_priorityQueue.capacity = 8; 175 } 176 177 @property bool full() const { return m_maxMailboxSize > 0 && m_queue.length + m_priorityQueue.length >= m_maxMailboxSize; } 178 179 void clear() 180 { 181 m_mutex.performLocked!({ 182 m_queue.clear(); 183 m_priorityQueue.clear(); 184 }); 185 m_condition.notifyAll(); 186 } 187 188 void setMaxSize(size_t count, bool function(Task tid) action) 189 { 190 m_maxMailboxSize = count; 191 m_onCrowding = action; 192 } 193 194 void send(Variant msg) 195 { 196 import vibe.core.log; 197 m_mutex.performLocked!({ 198 if( this.full ){ 199 if( !m_onCrowding ){ 200 while(this.full) 201 m_condition.wait(); 202 } else if( !m_onCrowding(Task.getThis()) ){ 203 return; 204 } 205 } 206 assert(!this.full); 207 208 if( m_queue.full ) 209 m_queue.capacity = (m_queue.capacity * 3) / 2; 210 211 m_queue.put(msg); 212 }); 213 m_condition.notify(); 214 } 215 216 void prioritySend(Variant msg) 217 { 218 m_mutex.performLocked!({ 219 if (m_priorityQueue.full) 220 m_priorityQueue.capacity = (m_priorityQueue.capacity * 3) / 2; 221 m_priorityQueue.put(msg); 222 }); 223 m_condition.notify(); 224 } 225 226 void receive(scope bool delegate(Variant) filter, scope void delegate(Variant) handler) 227 { 228 bool notify; 229 scope (exit) if (notify) m_condition.notify(); 230 231 Variant args; 232 m_mutex.performLocked!({ 233 notify = this.full; 234 while (true) { 235 import vibe.core.log; 236 logTrace("looking for messages"); 237 if (receiveQueue(m_priorityQueue, args, filter)) break; 238 if (receiveQueue(m_queue, args, filter)) break; 239 logTrace("received no message, waiting.."); 240 m_condition.wait(); 241 notify = this.full; 242 } 243 }); 244 245 handler(args); 246 } 247 248 bool receiveTimeout(OPS...)(Duration timeout, scope bool delegate(Variant) filter, scope void delegate(Variant) handler) 249 { 250 import std.datetime; 251 252 bool notify; 253 scope (exit) if (notify) m_condition.notify(); 254 auto limit_time = Clock.currTime(UTC()) + timeout; 255 Variant args; 256 if (!m_mutex.performLocked!({ 257 notify = this.full; 258 while (true) { 259 if (receiveQueue(m_priorityQueue, args, filter)) break; 260 if (receiveQueue(m_queue, args, filter)) break; 261 auto now = Clock.currTime(UTC()); 262 if (now >= limit_time) return false; 263 m_condition.wait(limit_time - now); 264 notify = this.full; 265 } 266 return true; 267 })) return false; 268 269 handler(args); 270 return true; 271 } 272 273 private static bool receiveQueue(OPS...)(ref FixedRingBuffer!Variant queue, ref Variant dst, scope bool delegate(Variant) filter) 274 { 275 auto r = queue[]; 276 while (!r.empty) { 277 scope (failure) queue.removeAt(r); 278 auto msg = r.front; 279 if (filter(msg)) { 280 dst = msg; 281 queue.removeAt(r); 282 return true; 283 } 284 r.popFront(); 285 } 286 return false; 287 } 288 }