1 /**
2 	Contains interfaces and enums for evented I/O drivers.
3 
4 	Copyright: © 2012-2014 RejectedSoftware e.K.
5 	Authors: Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 */
8 module vibe.core.task;
9 
10 import vibe.core.sync;
11 import vibe.utils.array;
12 
13 import core.thread;
14 import std.exception;
15 import std.traits;
16 import std.typecons;
17 import std.variant;
18 
19 
20 /** Represents a single task as started using vibe.core.runTask.
21 
22 	Note that the Task type is considered weakly isolated and thus can be
23 	passed between threads using vibe.core.concurrency.send or by passing
24 	it as a parameter to vibe.core.core.runWorkerTask.
25 */
26 struct Task {
27 	private {
28 		shared(TaskFiber) m_fiber;
29 		size_t m_taskCounter;
30 		import std.concurrency : ThreadInfo, Tid;
31 		static ThreadInfo s_tidInfo;
32 	}
33 
34 	private this(TaskFiber fiber, size_t task_counter)
35 	@safe nothrow {
36 		() @trusted { m_fiber = cast(shared)fiber; } ();
37 		m_taskCounter = task_counter;
38 	}
39 
40 	this(in Task other) nothrow { m_fiber = cast(shared(TaskFiber))other.m_fiber; m_taskCounter = other.m_taskCounter; }
41 
42 	/** Returns the Task instance belonging to the calling task.
43 	*/
44 	static Task getThis() nothrow @safe
45 	{
46 		auto fiber = () @trusted { return Fiber.getThis(); } ();
47 		if (!fiber) return Task.init;
48 		auto tfiber = cast(TaskFiber)fiber;
49 		if (!tfiber) return Task.init;
50 		if (!tfiber.m_running) return Task.init;
51 		return () @trusted { return Task(tfiber, tfiber.m_taskCounter); } ();
52 	}
53 
54 	nothrow {
55 		@property inout(TaskFiber) fiber() inout @trusted { return cast(inout(TaskFiber))m_fiber; }
56 		@property size_t taskCounter() const @safe { return m_taskCounter; }
57 		@property inout(Thread) thread() inout @safe { if (m_fiber) return this.fiber.thread; return null; }
58 
59 		/** Determines if the task is still running.
60 		*/
61 		@property bool running()
62 		const @trusted {
63 			assert(m_fiber !is null, "Invalid task handle");
64 			try if (this.fiber.state == Fiber.State.TERM) return false; catch (Throwable) {}
65 			return this.fiber.m_running && this.fiber.m_taskCounter == m_taskCounter;
66 		}
67 
68 		// FIXME: this is not thread safe!
69 		@property ref ThreadInfo tidInfo() { return m_fiber ? fiber.tidInfo : s_tidInfo; }
70 		@property Tid tid() { return tidInfo.ident; }
71 	}
72 
73 	/// Reserved for internal use!
74 	@property inout(MessageQueue) messageQueue() inout { assert(running, "Task is not running"); return fiber.messageQueue; }
75 
76 	T opCast(T)() const nothrow if (is(T == bool)) { return m_fiber !is null; }
77 
78 	void join() @safe { if (running) fiber.join(); }
79 	void interrupt() { if (running) fiber.interrupt(); }
80 	void terminate() { if (running) fiber.terminate(); }
81 
82 	string toString() const @safe { import std.string; return format("%s:%s", () @trusted { return cast(void*)m_fiber; } (), m_taskCounter); }
83 
84 	bool opEquals(in ref Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
85 	bool opEquals(in Task other) const nothrow @safe { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
86 }
87 
88 
89 
90 /** The base class for a task aka Fiber.
91 
92 	This class represents a single task that is executed concurrently
93 	with other tasks. Each task is owned by a single thread.
94 */
95 class TaskFiber : Fiber {
96 	private {
97 		import std.concurrency : ThreadInfo;
98 		Thread m_thread;
99 		ThreadInfo m_tidInfo;
100 		MessageQueue m_messageQueue;
101 	}
102 
103 	protected {
104 		shared size_t m_taskCounter;
105 		shared bool m_running;
106 	}
107 
108 	protected this(void delegate() fun, size_t stack_size)
109 	nothrow {
110 		super(fun, stack_size);
111 		m_thread = Thread.getThis();
112 		scope (failure) assert(false);
113 		m_messageQueue = new MessageQueue;
114 	}
115 
116 	/** Returns the thread that owns this task.
117 	*/
118 	@property inout(Thread) thread() inout @safe nothrow { return m_thread; }
119 
120 	/** Returns the handle of the current Task running on this fiber.
121 	*/
122 	@property Task task() @safe nothrow { return Task(this, m_taskCounter); }
123 
124 	/// Reserved for internal use!
125 	@property inout(MessageQueue) messageQueue() inout { return m_messageQueue; }
126 
127 	@property ref inout(ThreadInfo) tidInfo() inout nothrow { return m_tidInfo; }
128 
129 	/** Blocks until the task has ended.
130 	*/
131 	abstract void join() @safe;
132 
133 	/** Throws an InterruptExeption within the task as soon as it calls a blocking function.
134 	*/
135 	abstract void interrupt();
136 
137 	/** Terminates the task without notice as soon as it calls a blocking function.
138 	*/
139 	abstract void terminate();
140 
141 	void bumpTaskCounter()
142 	@safe nothrow {
143 		import core.atomic : atomicOp;
144 		() @trusted { atomicOp!"+="(this.m_taskCounter, 1); } ();
145 	}
146 }
147 
148 
149 /** Exception that is thrown by Task.interrupt.
150 */
151 class InterruptException : Exception {
152 	this()
153 	{
154 		super("Task interrupted.");
155 	}
156 }
157 
158 
159 class MessageQueue {
160 	private {
161 		InterruptibleTaskMutex m_mutex;
162 		InterruptibleTaskCondition m_condition;
163 		FixedRingBuffer!Variant m_queue;
164 		FixedRingBuffer!Variant m_priorityQueue;
165 		size_t m_maxMailboxSize = 0;
166 		bool function(Task) m_onCrowding;
167 	}
168 
169 	this()
170 	{
171 		m_mutex = new InterruptibleTaskMutex;
172 		m_condition = new InterruptibleTaskCondition(m_mutex);
173 		m_queue.capacity = 32;
174 		m_priorityQueue.capacity = 8;
175 	}
176 
177 	@property bool full() const { return m_maxMailboxSize > 0 && m_queue.length + m_priorityQueue.length >= m_maxMailboxSize; }
178 
179 	void clear()
180 	{
181 		m_mutex.performLocked!({
182 			m_queue.clear();
183 			m_priorityQueue.clear();
184 		});
185 		m_condition.notifyAll();
186 	}
187 
188 	void setMaxSize(size_t count, bool function(Task tid) action)
189 	{
190 		m_maxMailboxSize = count;
191 		m_onCrowding = action;
192 	}
193 
194 	void send(Variant msg)
195 	{
196 		import vibe.core.log;
197 		m_mutex.performLocked!({
198 			if( this.full ){
199 				if( !m_onCrowding ){
200 					while(this.full)
201 						m_condition.wait();
202 				} else if( !m_onCrowding(Task.getThis()) ){
203 					return;
204 				}
205 			}
206 			assert(!this.full);
207 
208 			if( m_queue.full )
209 				m_queue.capacity = (m_queue.capacity * 3) / 2;
210 
211 			m_queue.put(msg);
212 		});
213 		m_condition.notify();
214 	}
215 
216 	void prioritySend(Variant msg)
217 	{
218 		m_mutex.performLocked!({
219 			if (m_priorityQueue.full)
220 				m_priorityQueue.capacity = (m_priorityQueue.capacity * 3) / 2;
221 			m_priorityQueue.put(msg);
222 		});
223 		m_condition.notify();
224 	}
225 
226 	void receive(scope bool delegate(Variant) filter, scope void delegate(Variant) handler)
227 	{
228 		bool notify;
229 		scope (exit) if (notify) m_condition.notify();
230 
231 		Variant args;
232 		m_mutex.performLocked!({
233 			notify = this.full;
234 			while (true) {
235 				import vibe.core.log;
236 				logTrace("looking for messages");
237 				if (receiveQueue(m_priorityQueue, args, filter)) break;
238 				if (receiveQueue(m_queue, args, filter)) break;
239 				logTrace("received no message, waiting..");
240 				m_condition.wait();
241 				notify = this.full;
242 			}
243 		});
244 
245 		handler(args);
246 	}
247 
248 	bool receiveTimeout(OPS...)(Duration timeout, scope bool delegate(Variant) filter, scope void delegate(Variant) handler)
249 	{
250 		import std.datetime;
251 
252 		bool notify;
253 		scope (exit) if (notify) m_condition.notify();
254 		auto limit_time = Clock.currTime(UTC()) + timeout;
255 		Variant args;
256 		if (!m_mutex.performLocked!({
257 			notify = this.full;
258 			while (true) {
259 				if (receiveQueue(m_priorityQueue, args, filter)) break;
260 				if (receiveQueue(m_queue, args, filter)) break;
261 				auto now = Clock.currTime(UTC());
262 				if (now >= limit_time) return false;
263 				m_condition.wait(limit_time - now);
264 				notify = this.full;
265 			}
266 			return true;
267 		})) return false;
268 
269 		handler(args);
270 		return true;
271 	}
272 
273 	private static bool receiveQueue(OPS...)(ref FixedRingBuffer!Variant queue, ref Variant dst, scope bool delegate(Variant) filter)
274 	{
275 		auto r = queue[];
276 		while (!r.empty) {
277 			scope (failure) queue.removeAt(r);
278 			auto msg = r.front;
279 			if (filter(msg)) {
280 				dst = msg;
281 				queue.removeAt(r);
282 				return true;
283 			}
284 			r.popFront();
285 		}
286 		return false;
287 	}
288 }