1 /**
2 	Driver implementation for the libasync library
3 
4 	Libasync is an asynchronous library completely written in D.
5 
6 	See_Also:
7 		`vibe.core.driver` = interface definition
8 		https://github.com/etcimon/libasync = Github repository
9 
10 
11 	Copyright: © 2014-2015 RejectedSoftware e.K., GlobecSys Inc
12 	Authors: Sönke Ludwig, Etienne Cimon
13 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
14 */
15 module vibe.core.drivers.libasync;
16 
17 version(VibeLibasyncDriver):
18 
19 import vibe.core.core;
20 import vibe.core.driver;
21 import vibe.core.drivers.threadedfile;
22 import vibe.core.drivers.timerqueue;
23 import vibe.core.log;
24 import vibe.utils.array : FixedRingBuffer;
25 
26 import libasync : AsyncDirectoryWatcher, AsyncDNS, AsyncFile, AsyncSignal, AsyncTimer,
27 	AsyncTCPConnection, AsyncTCPListener, AsyncUDPSocket, DWFileEvent, DWChangeInfo,
28 	EventLoop, NetworkAddressLA = NetworkAddress, UDPEvent, TCPEvent, TCPOption, fd_t,
29 	getThreadEventLoop;
30 import libasync.internals.memory;
31 import libasync.types : Status;
32 
33 import std.algorithm : min, max;
34 import std.array;
35 import std.container : Array;
36 import std.conv;
37 import std.datetime;
38 import std.encoding;
39 import std.exception;
40 import std.string;
41 import std.stdio : File;
42 import std.typecons;
43 
44 import core.atomic;
45 import core.memory;
46 import core.thread;
47 import core.sync.mutex;
48 import core.stdc.stdio;
49 import core.sys.posix.netinet.in_;
50 
51 version (Posix) import core.sys.posix.sys.socket;
52 version (Windows) import core.sys.windows.winsock2;
53 
54 private __gshared EventLoop gs_evLoop;
55 private EventLoop s_evLoop;
56 private DriverCore s_driverCore;
57 
58 version(Windows) extern(C) {
59 	FILE* _wfopen(const(wchar)* filename, in wchar* mode);
60 	int _wchmod(in wchar*, int);
61 }
62 
63 EventLoop getMainEventLoop() @trusted nothrow
64 {
65 	if (s_evLoop is null)
66 		return gs_evLoop;
67 
68 	return s_evLoop;
69 }
70 
71 DriverCore getDriverCore() @safe nothrow
72 {
73 	assert(s_driverCore !is null);
74 	return s_driverCore;
75 }
76 
77 private struct TimerInfo {
78 	size_t refCount = 1;
79 	void delegate() callback;
80 	Task owner;
81 
82 	this(void delegate() callback) { this.callback = callback; }
83 }
84 
85 /// one per thread
86 final class LibasyncDriver : EventDriver {
87 @trusted:
88 	private {
89 		bool m_break = false;
90 		debug Thread m_ownerThread;
91 		AsyncTimer m_timerEvent;
92 		TimerQueue!TimerInfo m_timers;
93 		SysTime m_nextSched = SysTime.max;
94 		shared AsyncSignal m_exitSignal;
95 	}
96 
97 	this(DriverCore core) nothrow
98 	{
99 		assert(!isControlThread, "Libasync driver created in control thread");
100 		try {
101 			import core.atomic : atomicOp;
102 			if (!gs_mutex) {
103 				import core.sync.mutex;
104 				gs_mutex = new core.sync.mutex.Mutex;
105 
106 				gs_availID.reserve(32);
107 
108 				foreach (i; gs_availID.length .. gs_availID.capacity) {
109 					gs_availID.insertBack(i + 1);
110 				}
111 
112 				gs_maxID = 32;
113 
114 			}
115 		}
116 		catch (Throwable) {
117 			assert(false, "Couldn't reserve necessary space for available Manual Events");
118 		}
119 
120 		debug m_ownerThread = Thread.getThis();
121 		s_driverCore = core;
122 		s_evLoop = getThreadEventLoop();
123 		if (!gs_evLoop)
124 			gs_evLoop = s_evLoop;
125 
126 		m_exitSignal = new shared AsyncSignal(getMainEventLoop());
127 		m_exitSignal.run({
128 				m_break = true;
129 			});
130 		logTrace("Loaded libasync backend in thread %s", Thread.getThis().name);
131 
132 	}
133 
134 	static @property bool isControlThread() nothrow {
135 		scope(failure) assert(false);
136 		return Thread.getThis().isDaemon && Thread.getThis().name == "CmdProcessor";
137 	}
138 
139 	override void dispose()
140 	{
141 		logTrace("Deleting event driver");
142 		m_break = true;
143 		getMainEventLoop().exit();
144 	}
145 
146 	override int runEventLoop()
147 	{
148 		while(!m_break && getMainEventLoop().loop(int.max.msecs)){
149 			processTimers();
150 			getDriverCore().notifyIdle();
151 		}
152 		m_break = false;
153 		logDebug("Event loop exit %d", m_break);
154 		return 0;
155 	}
156 
157 	override int runEventLoopOnce()
158 	{
159 		getMainEventLoop().loop(int.max.msecs);
160 		processTimers();
161 		getDriverCore().notifyIdle();
162 		logTrace("runEventLoopOnce exit");
163 		return 0;
164 	}
165 
166 	override bool processEvents()
167 	{
168 		getMainEventLoop().loop(0.seconds);
169 		processTimers();
170 		if (m_break) {
171 			m_break = false;
172 			return false;
173 		}
174 		return true;
175 	}
176 
177 	override void exitEventLoop()
178 	{
179 		logDebug("Exiting (%s)", m_break);
180 		m_exitSignal.trigger();
181 
182 	}
183 
184 	override LibasyncFileStream openFile(Path path, FileMode mode)
185 	{
186 		return new LibasyncFileStream(path, mode);
187 	}
188 
189 	override DirectoryWatcher watchDirectory(Path path, bool recursive)
190 	{
191 		return new LibasyncDirectoryWatcher(path, recursive);
192 	}
193 
194 	/** Resolves the given host name or IP address string. */
195 	override NetworkAddress resolveHost(string host, ushort family = 2, bool use_dns = true)
196 	{
197 		import libasync.types : isIPv6;
198 		isIPv6 is_ipv6;
199 
200 		if (family == AF_INET6)
201 			is_ipv6 = isIPv6.yes;
202 		else
203 			is_ipv6 = isIPv6.no;
204 
205 		import std.regex : regex, Captures, Regex, matchFirst, ctRegex;
206 		import std.traits : ReturnType;
207 
208 		auto IPv4Regex = ctRegex!(`^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}$`, ``);
209 		auto IPv6Regex = ctRegex!(`^([0-9A-Fa-f]{0,4}:){2,7}([0-9A-Fa-f]{1,4}$|((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4})$`, ``);
210 		auto ipv4 = matchFirst(host, IPv4Regex);
211 		auto ipv6 = matchFirst(host, IPv6Regex);
212 		if (!ipv4.empty)
213 		{
214 			if (!ipv4.empty)
215 			is_ipv6 = isIPv6.no;
216 			use_dns = false;
217 		}
218 		else if (!ipv6.empty)
219 		{ // fixme: match host instead?
220 			is_ipv6 = isIPv6.yes;
221 			use_dns = false;
222 		}
223 		else
224 		{
225 			use_dns = true;
226 		}
227 
228 		NetworkAddress ret;
229 
230 		if (use_dns) {
231 			bool done;
232 			struct DNSCallback  {
233 				Task waiter;
234 				NetworkAddress* address;
235 				bool* finished;
236 				void handler(NetworkAddressLA addr) {
237 					*address = NetworkAddress(addr);
238 					*finished = true;
239 					if (waiter != Task() && waiter != Task.getThis())
240 						getDriverCore().resumeTask(waiter);
241 				}
242 			}
243 
244 			DNSCallback* cb = FreeListObjectAlloc!DNSCallback.alloc();
245 			cb.waiter = Task.getThis();
246 			cb.address = &ret;
247 			cb.finished = &done;
248 
249 			// todo: remove the shared attribute to avoid GC?
250 			shared AsyncDNS dns = new shared AsyncDNS(getMainEventLoop());
251 			scope(exit) dns.destroy();
252 			bool success = dns.handler(&cb.handler).resolveHost(host, is_ipv6);
253 			if (!success || dns.status.code != Status.OK)
254 				throw new Exception(dns.status.text);
255 			while(!done)
256 				getDriverCore.yieldForEvent();
257 			if (dns.status.code != Status.OK)
258 				throw new Exception(dns.status.text);
259 			assert(ret != NetworkAddress.init);
260 			assert(ret.family != 0);
261 			logTrace("Async resolved address %s", ret.toString());
262 			FreeListObjectAlloc!DNSCallback.free(cb);
263 
264 			if (ret.family == 0)
265 				ret.family = family;
266 
267 			return ret;
268 		}
269 		else {
270 			ret = NetworkAddress(getMainEventLoop().resolveIP(host, 0, is_ipv6));
271 			if (ret.family == 0)
272 				ret.family = family;
273 			return ret;
274 		}
275 
276 	}
277 
278 	override LibasyncTCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_addr)
279 	{
280 		AsyncTCPConnection conn = new AsyncTCPConnection(getMainEventLoop());
281 
282 		LibasyncTCPConnection tcp_connection = new LibasyncTCPConnection(conn, (TCPConnection conn) {
283 			Task waiter = (cast(LibasyncTCPConnection) conn).m_settings.writer.task;
284 			if (waiter != Task()) {
285 				getDriverCore().resumeTask(waiter);
286 			}
287 		});
288 
289 		if (Task.getThis() != Task())
290 			tcp_connection.m_settings.writer.acquire();
291 
292 		tcp_connection.m_tcpImpl.conn = conn;
293 		//conn.local = bind_addr;
294 		conn.ip(bind_addr.toAddressString(), bind_addr.port);
295 		conn.peer = cast(NetworkAddressLA)addr;
296 
297 		enforce(conn.run(&tcp_connection.handler), "An error occured while starting a new connection: " ~ conn.error);
298 
299 		while (!tcp_connection.connected && !tcp_connection.m_error) getDriverCore().yieldForEvent();
300 		enforce(!tcp_connection.m_error, tcp_connection.m_error);
301 		tcp_connection.m_tcpImpl.localAddr = NetworkAddress(conn.local);
302 
303 		if (Task.getThis() != Task())
304 			tcp_connection.m_settings.writer.release();
305 		return tcp_connection;
306 	}
307 
308 	override LibasyncTCPListener listenTCP(ushort port, void delegate(TCPConnection conn) @safe conn_callback, string address, TCPListenOptions options)
309 	{
310 		NetworkAddress localaddr = getEventDriver().resolveHost(address);
311 		localaddr.port = port;
312 
313 		return new LibasyncTCPListener(localaddr, conn_callback, options);
314 	}
315 
316 	override LibasyncUDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0")
317 	{
318 		NetworkAddress localaddr = getEventDriver().resolveHost(bind_address);
319 		localaddr.port = port;
320 		AsyncUDPSocket sock = new AsyncUDPSocket(getMainEventLoop());
321 		sock.local = cast(NetworkAddressLA)localaddr;
322 		auto udp_connection = new LibasyncUDPConnection(sock);
323 		sock.run(&udp_connection.handler);
324 		return udp_connection;
325 	}
326 
327 	override LibasyncManualEvent createManualEvent()
328 	{
329 		return new LibasyncManualEvent(this);
330 	}
331 
332 	override FileDescriptorEvent createFileDescriptorEvent(int file_descriptor, FileDescriptorEvent.Trigger triggers, FileDescriptorEvent.Mode mode)
333 	{
334 		assert(false);
335 	}
336 
337 
338 	// The following timer implementation was adapted from the equivalent in libevent2.d
339 
340 	override size_t createTimer(void delegate() @safe callback) { return m_timers.create(TimerInfo(callback)); }
341 
342 	override void acquireTimer(size_t timer_id) { m_timers.getUserData(timer_id).refCount++; }
343 	override void releaseTimer(size_t timer_id)
344 	nothrow {
345 		debug assert(m_ownerThread is Thread.getThis());
346 		logTrace("Releasing timer %s", timer_id);
347 		if (!--m_timers.getUserData(timer_id).refCount)
348 			m_timers.destroy(timer_id);
349 	}
350 
351 	override bool isTimerPending(size_t timer_id) nothrow { return m_timers.isPending(timer_id); }
352 
353 	override void rearmTimer(size_t timer_id, Duration dur, bool periodic)
354 	{
355 		debug assert(m_ownerThread is Thread.getThis());
356 		if (!isTimerPending(timer_id)) acquireTimer(timer_id);
357 		m_timers.schedule(timer_id, dur, periodic);
358 		rescheduleTimerEvent(Clock.currTime(UTC()));
359 	}
360 
361 	override void stopTimer(size_t timer_id)
362 	{
363 		logTrace("Stopping timer %s", timer_id);
364 		if (m_timers.isPending(timer_id)) {
365 			m_timers.unschedule(timer_id);
366 			releaseTimer(timer_id);
367 		}
368 	}
369 
370 	override void waitTimer(size_t timer_id)
371 	{
372 		logTrace("Waiting for timer in %s", Task.getThis());
373 		debug assert(m_ownerThread is Thread.getThis());
374 		while (true) {
375 			assert(!m_timers.isPeriodic(timer_id), "Cannot wait for a periodic timer.");
376 			if (!m_timers.isPending(timer_id)) {
377 				// logTrace("Timer is not pending");
378 				return;
379 			}
380 			auto data = &m_timers.getUserData(timer_id);
381 			assert(data.owner == Task.init, "Waiting for the same timer from multiple tasks is not supported.");
382 			data.owner = Task.getThis();
383 			scope (exit) m_timers.getUserData(timer_id).owner = Task.init;
384 			getDriverCore().yieldForEvent();
385 		}
386 	}
387 
388 	/// If the timer has an owner, it will resume the task.
389 	/// if the timer has a callback, it will run a new task.
390 	private void processTimers()
391 	{
392 		if (!m_timers.anyPending) return;
393 		logTrace("Processing due timers");
394 		// process all timers that have expired up to now
395 		auto now = Clock.currTime(UTC());
396 		// event loop timer will need to be rescheduled because we'll process everything until now
397 		m_nextSched = SysTime.max;
398 
399 		m_timers.consumeTimeouts(now, (timer, periodic, ref data) {
400 			Task owner = data.owner;
401 			auto callback = data.callback;
402 
403 			logTrace("Timer %s fired (%s/%s)", timer, owner != Task.init, callback !is null);
404 
405 			if (!periodic) releaseTimer(timer);
406 
407 			if (owner && owner.running && owner != Task.getThis()) {
408 				if (Task.getThis == Task.init) getDriverCore().resumeTask(owner);
409 				else getDriverCore().yieldAndResumeTask(owner);
410 			}
411 			if (callback) runTask(callback);
412 		});
413 
414 		rescheduleTimerEvent(now);
415 	}
416 
417 	private void rescheduleTimerEvent(SysTime now)
418 	{
419 		logTrace("Rescheduling timer event %s", Task.getThis());
420 
421 		// don't bother scheduling, the timers will be processed before leaving for the event loop
422 		if (m_nextSched <= Clock.currTime(UTC()))
423 			return;
424 
425 		bool first;
426 		auto next = m_timers.getFirstTimeout();
427 		Duration dur;
428 		if (next == SysTime.max) return;
429 		dur = max(1.msecs, next - now);
430 		if (m_nextSched != next)
431 			m_nextSched = next;
432 		else return;
433 		if (dur.total!"seconds"() >= int.max)
434 			return; // will never trigger, don't bother
435 		if (!m_timerEvent) {
436 			//logTrace("creating new async timer");
437 			m_timerEvent = new AsyncTimer(getMainEventLoop());
438 			bool success = m_timerEvent.duration(dur).run(&onTimerTimeout);
439 			assert(success, "Failed to run timer");
440 		}
441 		else {
442 			//logTrace("rearming the same timer instance");
443 			bool success = m_timerEvent.rearm(dur);
444 			assert(success, "Failed to rearm timer");
445 		}
446 		//logTrace("Rescheduled timer event for %s seconds in thread '%s' :: task '%s'", dur.total!"usecs" * 1e-6, Thread.getThis().name, Task.getThis());
447 	}
448 
449 	private void onTimerTimeout()
450 	{
451 		import std.encoding : sanitize;
452 
453 		logTrace("timer event fired");
454 		try processTimers();
455 		catch (Exception e) {
456 			logError("Failed to process timers: %s", e.msg);
457 			try logDiagnostic("Full error: %s", e.toString().sanitize); catch (Throwable) {}
458 		}
459 	}
460 }
461 
462 /// Writes or reads asynchronously (in another thread) for sizes > 64kb to benefit from kernel page cache
463 /// in lower size operations.
464 final class LibasyncFileStream : FileStream {
465 @trusted:
466 	import vibe.core.path : Path;
467 
468 	private {
469 		Path m_path;
470 		ulong m_size;
471 		ulong m_offset = 0;
472 		FileMode m_mode;
473 		Task m_task;
474 		Exception m_ex;
475 		shared AsyncFile m_impl;
476 
477 		bool m_started;
478 		bool m_truncated;
479 		bool m_finished;
480 	}
481 
482 	this(Path path, FileMode mode)
483 	{
484 		import std.file : getSize,exists;
485 		if (mode != FileMode.createTrunc)
486 			m_size = getSize(path.toNativeString());
487 		else {
488 			auto path_str = path.toNativeString();
489 			if (exists(path_str))
490 				removeFile(path);
491 			{ // touch
492 				import std.string : toStringz;
493 				version(Windows) {
494 					import std.utf : toUTF16z;
495 					auto path_str_utf = path_str.toUTF16z();
496 					FILE* f = _wfopen(path_str_utf, "w");
497 					_wchmod(path_str_utf, S_IREAD|S_IWRITE);
498 				}
499 				else FILE * f = fopen(path_str.toStringz, "w");
500 				fclose(f);
501 				m_truncated = true;
502 			}
503 		}
504 		m_path = path;
505 		m_mode = mode;
506 
507 		m_impl = new shared AsyncFile(getMainEventLoop());
508 		m_impl.onReady(&handler);
509 
510 		m_started = true;
511 	}
512 
513 	~this()
514 	{
515 		try close();
516 		catch (Exception e) { assert(false, e.msg); }
517 	}
518 
519 	override @property Path path() const { return m_path; }
520 	override @property bool isOpen() const { return m_started; }
521 	override @property ulong size() const { return m_size; }
522 	override @property bool readable() const { return m_mode != FileMode.append; }
523 	override @property bool writable() const { return m_mode != FileMode.read; }
524 
525 	override void seek(ulong offset)
526 	{
527 		m_offset = offset;
528 	}
529 
530 	override ulong tell() { return m_offset; }
531 
532 	override void close()
533 	{
534 		if (m_impl) {
535 			m_impl.kill();
536 			m_impl = null;
537 		}
538 		m_started = false;
539 		if (m_task != Task() && Task.getThis() != Task())
540 			getDriverCore().yieldAndResumeTask(m_task, new Exception("The file was closed during an operation"));
541 		else if (m_task != Task() && Task.getThis() == Task())
542 			getDriverCore().resumeTask(m_task, new Exception("The file was closed during an operation"));
543 
544 	}
545 
546 	override @property bool empty() const { assert(this.readable); return m_offset >= m_size; }
547 	override @property ulong leastSize() const { assert(this.readable); return m_size - m_offset; }
548 	override @property bool dataAvailableForRead() { return true; }
549 
550 	override const(ubyte)[] peek()
551 	{
552 		return null;
553 	}
554 
555 	override size_t read(scope ubyte[] dst, IOMode)
556 	{
557 		scope(failure)
558 			close();
559 		assert(this.readable, "To read a file, it must be opened in a read-enabled mode.");
560 		shared ubyte[] bytes = cast(shared) dst;
561 		bool truncate_if_exists;
562 		if (!m_truncated && m_mode == FileMode.createTrunc) {
563 			truncate_if_exists = true;
564 			m_truncated = true;
565 			m_size = 0;
566 		}
567 		m_finished = false;
568 		enforce(dst.length <= leastSize);
569 		enforce(m_impl.read(m_path.toNativeString(), bytes, m_offset, true, truncate_if_exists), "Failed to read data from disk: " ~ m_impl.error);
570 
571 		if (!m_finished) {
572 			acquire();
573 			scope(exit) release();
574 			getDriverCore().yieldForEvent();
575 		}
576 		m_finished = false;
577 
578 		if (m_ex) throw m_ex;
579 
580 		m_offset += dst.length;
581 		assert(m_impl.offset == m_offset, "Incoherent offset returned from file reader: " ~ m_offset.to!string ~ "B assumed but the implementation is at: " ~ m_impl.offset.to!string ~ "B");
582 
583 		return dst.length;
584 	}
585 
586 	alias Stream.write write;
587 	override size_t write(in ubyte[] bytes_, IOMode)
588 	{
589 		assert(this.writable, "To write to a file, it must be opened in a write-enabled mode.");
590 
591 		shared const(ubyte)[] bytes = cast(shared const(ubyte)[]) bytes_;
592 
593 		bool truncate_if_exists;
594 		if (!m_truncated && m_mode == FileMode.createTrunc) {
595 			truncate_if_exists = true;
596 			m_truncated = true;
597 			m_size = 0;
598 		}
599 		m_finished = false;
600 
601 		if (m_mode == FileMode.append)
602 			enforce(m_impl.append(m_path.toNativeString(), cast(shared ubyte[]) bytes, true, truncate_if_exists), "Failed to write data to disk: " ~ m_impl.error);
603 		else
604 			enforce(m_impl.write(m_path.toNativeString(), bytes, m_offset, true, truncate_if_exists), "Failed to write data to disk: " ~ m_impl.error);
605 
606 		if (!m_finished) {
607 			acquire();
608 			scope(exit) release();
609 			getDriverCore().yieldForEvent();
610 		}
611 		m_finished = false;
612 
613 		if (m_ex) throw m_ex;
614 
615 		if (m_mode == FileMode.append) {
616 			m_size += bytes.length;
617 		}
618 		else {
619 			m_offset += bytes.length;
620 			if (m_offset >= m_size)
621 				m_size += m_offset - m_size;
622 			assert(m_impl.offset == m_offset, "Incoherent offset returned from file writer.");
623 		}
624 		//assert(getSize(m_path.toNativeString()) == m_size, "Incoherency between local size and filesize: " ~ m_size.to!string ~ "B assumed for a file of size " ~ getSize(m_path.toNativeString()).to!string ~ "B");
625 
626 		return bytes_.length;
627 	}
628 
629 	override void flush()
630 	{
631 		assert(this.writable, "To write to a file, it must be opened in a write-enabled mode.");
632 
633 	}
634 
635 	override void finalize()
636 	{
637 		if (this.writable)
638 			flush();
639 	}
640 
641 	void release()
642 	{
643 		assert(Task.getThis() == Task() || m_task == Task.getThis(), "Releasing FileStream that is not owned by the calling task.");
644 		m_task = Task();
645 	}
646 
647 	void acquire()
648 	{
649 		assert(Task.getThis() == Task() || m_task == Task(), "Acquiring FileStream that is already owned.");
650 		m_task = Task.getThis();
651 	}
652 
653 	private void handler() {
654 		// This may be called by the event loop if read/write > 64kb and another thread was delegated
655 		Exception ex;
656 
657 		if (m_impl.status.code != Status.OK)
658 			ex = new Exception(m_impl.error);
659 		m_finished = true;
660 		if (m_task != Task())
661 			getDriverCore().resumeTask(m_task, ex);
662 		else m_ex = ex;
663 	}
664 }
665 
666 
667 final class LibasyncDirectoryWatcher : DirectoryWatcher {
668 @trusted:
669 	private {
670 		Path m_path;
671 		bool m_recursive;
672 		Task m_task;
673 		AsyncDirectoryWatcher m_impl;
674 		Array!DirectoryChange m_changes;
675 		Exception m_error;
676 	}
677 
678 	this(Path path, bool recursive)
679 	{
680 		m_impl = new AsyncDirectoryWatcher(getMainEventLoop());
681 		m_impl.run(&handler);
682 		m_path = path;
683 		m_recursive = recursive;
684 		watch(path, recursive);
685 		// logTrace("DirectoryWatcher called with: %s", path.toNativeString());
686 	}
687 
688 	~this()
689 	{
690 		m_impl.kill();
691 	}
692 
693 	override @property Path path() const { return m_path; }
694 	override @property bool recursive() const { return m_recursive; }
695 
696 	void release()
697 	{
698 		assert(m_task == Task.getThis(), "Releasing FileStream that is not owned by the calling task.");
699 		m_task = Task();
700 	}
701 
702 	void acquire()
703 	{
704 		assert(m_task == Task(), "Acquiring FileStream that is already owned.");
705 		m_task = Task.getThis();
706 	}
707 
708 	bool amOwner()
709 	{
710 		return m_task == Task.getThis();
711 	}
712 
713 	override bool readChanges(ref DirectoryChange[] dst, Duration timeout)
714 	{
715 		dst.length = 0;
716 		assert(!amOwner());
717 		if (m_error)
718 			throw m_error;
719 		acquire();
720 		scope(exit) release();
721 		void consumeChanges() {
722 			if (m_impl.status.code == Status.ERROR) {
723 				throw new Exception(m_impl.error);
724 			}
725 
726 			foreach (ref change; m_changes[]) {
727 				//logTrace("Adding change: %s", change.to!string);
728 				dst ~= change;
729 			}
730 
731 			//logTrace("Consumed change 1: %s", dst.to!string);
732 			import std.array : array;
733 			import std.algorithm : uniq;
734 			dst = cast(DirectoryChange[]) uniq!((a, b) => a.path == b.path && a.type == b.type)(dst).array;
735 			logTrace("Consumed change: %s", dst.to!string);
736 			m_changes.clear();
737 		}
738 
739 		if (!m_changes.empty) {
740 			consumeChanges();
741 			return true;
742 		}
743 
744 		auto tm = getEventDriver().createTimer(null);
745 		getEventDriver().m_timers.getUserData(tm).owner = Task.getThis();
746 		getEventDriver().rearmTimer(tm, timeout, false);
747 
748 		while (m_changes.empty) {
749 			getDriverCore().yieldForEvent();
750 			if (!getEventDriver().isTimerPending(tm)) break;
751 		}
752 
753 		if (!m_changes.empty) {
754 			consumeChanges();
755 			return true;
756 		}
757 
758 		return false;
759 	}
760 
761 	private void watch(Path path, bool recursive) {
762 		m_impl.watchDir(path.toNativeString(), DWFileEvent.ALL, recursive);
763 	}
764 
765 	private void handler() {
766 		import std.stdio;
767 		DWChangeInfo[] changes = allocArray!DWChangeInfo(manualAllocator(), 128);
768 		scope(exit) freeArray(manualAllocator(), changes);
769 		Exception ex;
770 		try {
771 			uint cnt;
772 			do {
773 				cnt = m_impl.readChanges(changes);
774 				size_t i;
775 				foreach (DWChangeInfo change; changes) {
776 					DirectoryChange dc;
777 
778 					final switch (change.event){
779 						case DWFileEvent.CREATED: dc.type = DirectoryChangeType.added; break;
780 						case DWFileEvent.DELETED: dc.type = DirectoryChangeType.removed; break;
781 						case DWFileEvent.MODIFIED: dc.type = DirectoryChangeType.modified; break;
782 						case DWFileEvent.MOVED_FROM: dc.type = DirectoryChangeType.removed; break;
783 						case DWFileEvent.MOVED_TO: dc.type = DirectoryChangeType.added; break;
784 						case DWFileEvent.ALL: break; // impossible
785 						case DWFileEvent.ERROR: throw new Exception(m_impl.error);
786 					}
787 
788 					dc.path = Path(change.path);
789 					//logTrace("Inserted %s absolute %s", dc.to!string, dc.path.absolute.to!string);
790 					m_changes.insert(dc);
791 					i++;
792 					if (cnt == i) break;
793 				}
794 			} while(cnt == 0 && m_impl.status.code == Status.OK);
795 			if (m_impl.status.code == Status.ERROR) {
796 				ex = new Exception(m_impl.error);
797 			}
798 
799 		}
800 		catch (Exception e) {
801 			ex = e;
802 		}
803 		if (m_task != Task()) getDriverCore().resumeTask(m_task, ex);
804 		else m_error = ex;
805 	}
806 
807 }
808 
809 
810 
811 final class LibasyncManualEvent : ManualEvent {
812 @trusted:
813 	private {
814 		shared(int) m_emitCount = 0;
815 		shared(int) m_threadCount = 0;
816 		shared(size_t) m_instance;
817 		Array!(void*) ms_signals;
818 
819 		core.sync.mutex.Mutex m_mutex;
820 
821 		@property size_t instanceID() nothrow { return atomicLoad(m_instance); }
822 		@property void instanceID(size_t instance) nothrow{ atomicStore(m_instance, instance); }
823 	}
824 
825 	this(LibasyncDriver driver)
826 	nothrow {
827 		m_mutex = new core.sync.mutex.Mutex;
828 		instanceID = generateID();
829 	}
830 
831 	~this()
832 	{
833 		try {
834 			recycleID(instanceID);
835 
836 			foreach (ref signal; ms_signals[]) {
837 				if (signal) {
838 					(cast(shared AsyncSignal) signal).kill();
839 					signal = null;
840 				}
841 			}
842 		} catch (Exception e) {
843 			import std.stdio;
844 			writefln("Exception thrown while finalizing LibasyncManualEvent: %s", e.msg);
845 		}
846 	}
847 
848 	override void emit()
849 	{
850 		scope (failure) assert(false); // synchronized is not nothrow on DMD 2.066 and below and Array is not nothrow at all
851 		logTrace("Emitting signal");
852 		atomicOp!"+="(m_emitCount, 1);
853 		synchronized (m_mutex) {
854 			logTrace("Looping signals. found: " ~ ms_signals.length.to!string);
855 			foreach (ref signal; ms_signals[]) {
856 				auto evloop = getMainEventLoop();
857 				shared AsyncSignal sig = cast(shared AsyncSignal) signal;
858 				if (!sig.trigger(evloop)) logError("Failed to trigger ManualEvent: %s", sig.error);
859 			}
860 		}
861 	}
862 
863 	override void wait() { wait(m_emitCount); }
864 	override int wait(int reference_emit_count) { return  doWait!true(reference_emit_count); }
865 	override int wait(Duration timeout, int reference_emit_count) { return doWait!true(timeout, reference_emit_count); }
866 	override int waitUninterruptible(int reference_emit_count) { return  doWait!false(reference_emit_count); }
867 	override int waitUninterruptible(Duration timeout, int reference_emit_count) { return doWait!false(timeout, reference_emit_count); }
868 
869 	void acquire()
870 	{
871 		auto task = Task.getThis();
872 
873 		bool signal_exists;
874 
875 		size_t instance = instanceID;
876 		if (s_eventWaiters.length <= instance)
877 			expandWaiters();
878 
879 		logTrace("Acquire event ID#%d", instance);
880 		auto taskList = s_eventWaiters[instance];
881 		if (taskList.length > 0)
882 			signal_exists = true;
883 
884 		if (!signal_exists) {
885 			shared AsyncSignal sig = new shared AsyncSignal(getMainEventLoop());
886 			sig.run(&onSignal);
887 			synchronized (m_mutex) ms_signals.insertBack(cast(void*)sig);
888 		}
889 		s_eventWaiters[instance].insertBack(Task.getThis());
890 	}
891 
892 	void release()
893 	{
894 		assert(amOwner(), "Releasing non-acquired signal.");
895 
896 		import std.algorithm : countUntil;
897 
898 		size_t instance = instanceID;
899 		auto taskList = s_eventWaiters[instance];
900 		auto idx = taskList[].countUntil!((a, b) => a == b)(Task.getThis());
901 		logTrace("Release event ID#%d", instance);
902 		s_eventWaiters[instance].linearRemove(taskList[idx .. idx+1]);
903 
904 		if (s_eventWaiters[instance].empty) {
905 			removeMySignal();
906 		}
907 	}
908 
909 	bool amOwner()
910 	{
911 		import std.algorithm : countUntil;
912 		size_t instance = instanceID;
913 		if (s_eventWaiters.length <= instance) return false;
914 		auto taskList = s_eventWaiters[instance];
915 		if (taskList.length == 0) return false;
916 
917 		auto idx = taskList[].countUntil!((a, b) => a == b)(Task.getThis());
918 
919 		return idx != -1;
920 	}
921 
922 	override @property int emitCount() const { return atomicLoad(m_emitCount); }
923 
924 	private int doWait(bool INTERRUPTIBLE)(int reference_emit_count)
925 	{
926 		try {
927 			assert(!amOwner());
928 			acquire();
929 			scope(exit) release();
930 			auto ec = this.emitCount;
931 			while( ec == reference_emit_count ){
932 				//synchronized(m_mutex) logTrace("Waiting for event with signal count: " ~ ms_signals.length.to!string);
933 				static if (INTERRUPTIBLE) getDriverCore().yieldForEvent();
934 				else getDriverCore().yieldForEventDeferThrow();
935 				ec = this.emitCount;
936 			}
937 			return ec;
938 		} catch (Exception e) {
939 			static if (!INTERRUPTIBLE)
940 				assert(false, e.msg); // still some function calls not marked nothrow
941 			else throw e;
942 		}
943 	}
944 
945 	private int doWait(bool INTERRUPTIBLE)(Duration timeout, int reference_emit_count)
946 	{
947 		static if (!INTERRUPTIBLE) scope (failure) assert(false); // still some function calls not marked nothrow
948 		assert(!amOwner());
949 		acquire();
950 		scope(exit) release();
951 		auto tm = getEventDriver().createTimer(null);
952 		scope (exit) getEventDriver().releaseTimer(tm);
953 		getEventDriver().m_timers.getUserData(tm).owner = Task.getThis();
954 		getEventDriver().rearmTimer(tm, timeout, false);
955 
956 		auto ec = this.emitCount;
957 		while (ec == reference_emit_count) {
958 			static if (INTERRUPTIBLE) getDriverCore().yieldForEvent();
959 			else getDriverCore().yieldForEventDeferThrow();
960 			ec = this.emitCount;
961 			if (!getEventDriver().isTimerPending(tm)) break;
962 		}
963 		return ec;
964 	}
965 
966 	private void removeMySignal() {
967 		import std.algorithm : countUntil;
968 		synchronized(m_mutex) {
969 			auto idx = ms_signals[].countUntil!((void* a, LibasyncManualEvent b) { return ((cast(shared AsyncSignal) a).owner == Thread.getThis() && this is b);})(this);
970 			if (idx >= 0)
971 				ms_signals.linearRemove(ms_signals[idx .. idx+1]);
972 		}
973 	}
974 
975 	private void expandWaiters() {
976 		size_t maxID;
977 		synchronized(gs_mutex) maxID = gs_maxID;
978 		s_eventWaiters.reserve(maxID);
979 		logTrace("gs_maxID: %d", maxID);
980 		size_t s_ev_len = s_eventWaiters.length;
981 		size_t s_ev_cap = s_eventWaiters.capacity;
982 		assert(maxID > s_eventWaiters.length);
983 		foreach (i; s_ev_len .. s_ev_cap) {
984 			s_eventWaiters.insertBack(Array!Task.init);
985 		}
986 	}
987 
988 	private void onSignal()
989 	{
990 		logTrace("Got signal in onSignal");
991 		try {
992 			auto thread = Thread.getThis();
993 			auto core = getDriverCore();
994 
995 			size_t instance = instanceID;
996 			logTrace("Got context: %d", instance);
997 			foreach (Task task; s_eventWaiters[instance][]) {
998 				logTrace("Task Found");
999 				core.resumeTask(task);
1000 			}
1001 		} catch (Exception e) {
1002 			logError("Exception while handling signal event: %s", e.msg);
1003 			try logDebug("Full error: %s", sanitize(e.msg));
1004 			catch (Exception) {}
1005 		}
1006 	}
1007 }
1008 
1009 final class LibasyncTCPListener : TCPListener {
1010 @trusted:
1011 	private {
1012 		NetworkAddress m_local;
1013 		void delegate(TCPConnection conn) @safe m_connectionCallback;
1014 		TCPListenOptions m_options;
1015 		AsyncTCPListener[] m_listeners;
1016 		fd_t socket;
1017 	}
1018 
1019 	this(NetworkAddress addr, void delegate(TCPConnection conn) @safe connection_callback, TCPListenOptions options)
1020 	{
1021 		m_connectionCallback = connection_callback;
1022 		m_options = options;
1023 		m_local = addr;
1024 		void function(shared LibasyncTCPListener) init = (shared LibasyncTCPListener ctxt){
1025 			synchronized(ctxt) {
1026 				LibasyncTCPListener ctxt2 = cast(LibasyncTCPListener)ctxt;
1027 				AsyncTCPListener listener = new AsyncTCPListener(getMainEventLoop(), ctxt2.socket);
1028 				listener.local = cast(NetworkAddressLA)ctxt2.m_local;
1029 
1030 				enforce(listener.run(&ctxt2.initConnection), "Failed to start listening to local socket: " ~ listener.error);
1031 				ctxt2.socket = listener.socket;
1032 				ctxt2.m_listeners ~= listener;
1033 			}
1034 		};
1035 		if (options & TCPListenOptions.distribute)	runWorkerTaskDist(init, cast(shared) this);
1036 		else init(cast(shared) this);
1037 
1038 	}
1039 
1040 	override @property NetworkAddress bindAddress() { return m_local; }
1041 
1042 	@property void delegate(TCPConnection) connectionCallback() { return m_connectionCallback; }
1043 
1044 	private void delegate(TCPEvent) initConnection(AsyncTCPConnection conn) {
1045 		logTrace("Connection initialized in thread: " ~ Thread.getThis().name);
1046 
1047 		LibasyncTCPConnection native_conn = new LibasyncTCPConnection(conn, m_connectionCallback);
1048 		native_conn.m_tcpImpl.conn = conn;
1049 		native_conn.m_tcpImpl.localAddr = m_local;
1050 		return &native_conn.handler;
1051 	}
1052 
1053 	override void stopListening()
1054 	{
1055 		synchronized(this) {
1056 			foreach (listener; m_listeners) {
1057 				listener.kill();
1058 				listener = null;
1059 			}
1060 		}
1061 	}
1062 }
1063 
1064 final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
1065 @trusted:
1066 	private {
1067 		FixedRingBuffer!ubyte m_readBuffer;
1068 		ubyte[] m_buffer;
1069 		ubyte[] m_slice;
1070 		TCPConnectionImpl m_tcpImpl;
1071 		Settings m_settings;
1072 
1073 		bool m_closed = true;
1074 		bool m_mustRecv = true;
1075 		string m_error;
1076 
1077 		// The socket descriptor is unavailable to motivate low-level/API feature additions
1078 		// rather than high-lvl platform-dependent hacking
1079 		// fd_t socket;
1080 	}
1081 
1082 	ubyte[] readChunk(ubyte[] buffer = null)
1083 	{
1084 		logTrace("readBuf TCP: %d", buffer.length);
1085 		import std.algorithm : swap;
1086 		ubyte[] ret;
1087 
1088 		if (m_slice.length > 0) {
1089 			swap(ret, m_slice);
1090 			logTrace("readBuf returned instantly with slice length: %d", ret.length);
1091 			return ret;
1092 		}
1093 
1094 		if (m_readBuffer.length > 0) {
1095 			size_t amt = min(buffer.length, m_readBuffer.length);
1096 			m_readBuffer.read(buffer[0 .. amt]);
1097 			logTrace("readBuf returned with existing amount: %d", amt);
1098 			return buffer[0 .. amt];
1099 		}
1100 
1101 		if (buffer) {
1102 			m_buffer = buffer;
1103 			m_readBuffer.dispose();
1104 		}
1105 
1106 		leastSize();
1107 
1108 		swap(ret, m_slice);
1109 		logTrace("readBuf returned with buffered length: %d", ret.length);
1110 		return ret;
1111 	}
1112 
1113 	this(AsyncTCPConnection conn, void delegate(TCPConnection) @safe cb)
1114 	in { assert(conn !is null); }
1115 	body {
1116 		m_settings.onConnect = cb;
1117 		m_readBuffer.capacity = 64*1024;
1118 	}
1119 
1120 	private @property AsyncTCPConnection conn() {
1121 
1122 		return m_tcpImpl.conn;
1123 	}
1124 
1125 	// Using this setting completely disables the internal buffers as well
1126 	override @property void tcpNoDelay(bool enabled)
1127 	{
1128 		m_settings.tcpNoDelay = enabled;
1129 		conn.setOption(TCPOption.NODELAY, enabled);
1130 	}
1131 
1132 	override @property bool tcpNoDelay() const { return m_settings.tcpNoDelay; }
1133 
1134 	override @property void readTimeout(Duration dur)
1135 	{
1136 		m_settings.readTimeout = dur;
1137 		conn.setOption(TCPOption.TIMEOUT_RECV, dur);
1138 	}
1139 
1140 	override @property Duration readTimeout() const { return m_settings.readTimeout; }
1141 
1142 	override @property void keepAlive(bool enabled)
1143 	{
1144 		m_settings.keepAlive = enabled;
1145 		conn.setOption(TCPOption.KEEPALIVE_ENABLE, enabled);
1146 	}
1147 
1148 	override @property bool keepAlive() const { return m_settings.keepAlive; }
1149 
1150 	override @property bool connected() const { return !m_closed && m_tcpImpl.conn && m_tcpImpl.conn.isConnected; }
1151 
1152 	override @property bool dataAvailableForRead(){
1153 		logTrace("dataAvailableForRead");
1154 		m_settings.reader.acquire();
1155 		scope(exit) m_settings.reader.release();
1156 		return !readEmpty;
1157 	}
1158 
1159 	private @property bool readEmpty() {
1160 		return (m_buffer && !m_slice) || (!m_buffer && m_readBuffer.empty);
1161 	}
1162 
1163 	override @property string peerAddress() const { return m_tcpImpl.conn.peer.toString(); }
1164 
1165 	override @property NetworkAddress localAddress() const { return m_tcpImpl.localAddr; }
1166 	override @property NetworkAddress remoteAddress() const { return NetworkAddress(m_tcpImpl.conn.peer); }
1167 
1168 	override @property bool empty() { return leastSize == 0; }
1169 
1170 	override @property ulong leastSize()
1171 	{
1172 		logTrace("leastSize TCP");
1173 		m_settings.reader.acquire();
1174 		scope(exit) m_settings.reader.release();
1175 
1176 		while( m_readBuffer.empty ){
1177 			if (!connected)
1178 				return 0;
1179 			m_settings.reader.noExcept = true;
1180 			getDriverCore().yieldForEvent();
1181 			m_settings.reader.noExcept = false;
1182 		}
1183 		return (m_slice.length > 0) ? m_slice.length : m_readBuffer.length;
1184 	}
1185 
1186 	override void close()
1187 	{
1188 		logTrace("Close TCP enter");
1189 
1190 		// resume any reader, so that the read operation can be ended with a failure
1191 		Task reader = m_settings.reader.task;
1192 		while (m_settings.reader.isWaiting && reader.running) {
1193 			logTrace("resuming reader first");
1194 			getDriverCore().yieldAndResumeTask(reader);
1195 		}
1196 
1197 		// test if the connection is already closed
1198 		if (m_closed) {
1199 			logTrace("connection already closed.");
1200 			return;
1201 		}
1202 
1203 		//logTrace("closing");
1204 		m_settings.writer.acquire();
1205 		scope(exit) m_settings.writer.release();
1206 
1207 		// checkConnected();
1208 		m_readBuffer.dispose();
1209 		onClose(null, false);
1210 	}
1211 
1212 	override bool waitForData(Duration timeout = Duration.max)
1213 	{
1214 		// 0 seconds is max. CHanging this would be breaking, might as well use -1 for immediate
1215 		if (timeout == 0.seconds)
1216 			timeout = Duration.max;
1217 		logTrace("WaitForData enter, timeout %s :: Ptr %s",  timeout.toString(), (cast(void*)this).to!string);
1218 		m_settings.reader.acquire();
1219 		auto _driver = getEventDriver();
1220 		auto tm = _driver.createTimer(null);
1221 		scope(exit) {
1222 			_driver.stopTimer(tm);
1223 			_driver.releaseTimer(tm);
1224 			m_settings.reader.release();
1225 		}
1226 		_driver.m_timers.getUserData(tm).owner = Task.getThis();
1227 		if (timeout != Duration.max) _driver.rearmTimer(tm, timeout, false);
1228 		logTrace("waitForData TCP");
1229 		while (m_readBuffer.empty) {
1230 			if (!connected) return false;
1231 
1232 			if (m_mustRecv)
1233 				onRead();
1234 			else {
1235 				//logTrace("Yielding for event in waitForData, waiting? %s", m_settings.reader.isWaiting);
1236 				m_settings.reader.noExcept = true;
1237 				getDriverCore().yieldForEvent();
1238 				m_settings.reader.noExcept = false;
1239 			}
1240 			if (timeout != Duration.max && !_driver.isTimerPending(tm)) {
1241 				logTrace("WaitForData TCP: timer signal");
1242 				return false;
1243 			}
1244 		}
1245 		if (m_readBuffer.empty && !connected) return false;
1246 		logTrace("WaitForData exit: fiber resumed with read buffer");
1247 		return !m_readBuffer.empty;
1248 	}
1249 
1250 	override const(ubyte)[] peek()
1251 	{
1252 		logTrace("Peek TCP enter");
1253 		m_settings.reader.acquire();
1254 		scope(exit) m_settings.reader.release();
1255 
1256 		if (!readEmpty)
1257 			return (m_slice.length > 0) ? cast(const(ubyte)[]) m_slice : m_readBuffer.peek();
1258 		else
1259 			return null;
1260 	}
1261 
1262 	override size_t read(scope ubyte[] dst, IOMode)
1263 	{
1264 		if (!dst.length) return 0;
1265 		assert(dst !is null && !m_slice);
1266 		logTrace("Read TCP");
1267 		m_settings.reader.acquire();
1268 		size_t len = 0;
1269 		scope(exit) m_settings.reader.release();
1270 		while( dst.length > 0 ){
1271 			while( m_readBuffer.empty ){
1272 				checkConnected();
1273 				if (m_mustRecv)
1274 					onRead();
1275 				else {
1276 					getDriverCore().yieldForEvent();
1277 					checkConnected();
1278 				}
1279 			}
1280 			size_t amt = min(dst.length, m_readBuffer.length);
1281 
1282 			m_readBuffer.read(dst[0 .. amt]);
1283 			dst = dst[amt .. $];
1284 			len += amt;
1285 		}
1286 
1287 		return len;
1288 	}
1289 
1290 	override size_t write(in ubyte[] bytes_, IOMode)
1291 	{
1292 		assert(bytes_ !is null);
1293 		logTrace("%s", "write enter");
1294 		m_settings.writer.acquire();
1295 		scope(exit) m_settings.writer.release();
1296 		checkConnected();
1297 		const(ubyte)[] bytes = bytes_;
1298 		logTrace("TCP write with %s bytes called", bytes.length);
1299 
1300 		bool first = true;
1301 		size_t offset;
1302 		size_t len = bytes.length;
1303 		do {
1304 			if (!first) {
1305 				getDriverCore().yieldForEvent();
1306 			}
1307 			checkConnected();
1308 			offset += conn.send(bytes[offset .. $]);
1309 
1310 			if (conn.hasError) {
1311 				throw new Exception(conn.error);
1312 			}
1313 			first = false;
1314 		} while (offset != len);
1315 
1316 		return len;
1317 	}
1318 
1319 	override void flush()
1320 	{
1321 		logTrace("%s", "Flush");
1322 		m_settings.writer.acquire();
1323 		scope(exit) m_settings.writer.release();
1324 
1325 		checkConnected();
1326 
1327 	}
1328 
1329 	override void finalize()
1330 	{
1331 		logTrace("%s", "finalize");
1332 		flush();
1333 	}
1334 
1335 	private void checkConnected()
1336 	{
1337 		enforce(connected, "The remote peer has closed the connection.");
1338 		logTrace("Check Connected");
1339 	}
1340 
1341 	private bool tryReadBuf() {
1342 		//logTrace("TryReadBuf with m_buffer: %s", m_buffer.length);
1343 		if (m_buffer) {
1344 			ubyte[] buf = m_buffer[m_slice.length .. $];
1345 			uint ret = conn.recv(buf);
1346 			logTrace("Received: %s", buf[0 .. ret]);
1347 			// check for overflow
1348 			if (ret == buf.length) {
1349 				logTrace("Overflow detected, revert to ring buffer");
1350 				m_slice = null;
1351 				m_readBuffer.capacity = 64*1024;
1352 				m_readBuffer.put(buf);
1353 				m_buffer = null;
1354 				return false; // cancel slices and revert to the fixed ring buffer
1355 			}
1356 
1357 			if (m_slice.length > 0) {
1358 				//logDebug("post-assign m_slice ");
1359 				m_slice = m_slice.ptr[0 .. m_slice.length + ret];
1360 			}
1361 			else {
1362 				//logDebug("using m_buffer");
1363 				m_slice = m_buffer[0 .. ret];
1364 			}
1365 			return true;
1366 		}
1367 		logTrace("TryReadBuf exit with %d bytes in m_slice, %d bytes in m_readBuffer ", m_slice.length, m_readBuffer.length);
1368 
1369 		return false;
1370 	}
1371 
1372 	private void onRead() {
1373 		m_mustRecv = true; // assume we didn't receive everything
1374 
1375 		if (tryReadBuf()) {
1376 			m_mustRecv = false;
1377 			return;
1378 		}
1379 
1380 		assert(!m_slice);
1381 
1382 		logTrace("OnRead with %s", m_readBuffer.freeSpace);
1383 
1384 		while( m_readBuffer.freeSpace > 0 ) {
1385 			ubyte[] dst = m_readBuffer.peekDst();
1386 			assert(dst.length <= int.max);
1387 			logTrace("Try to read up to bytes: %s", dst.length);
1388 			bool read_more;
1389 			do {
1390 				uint ret = conn.recv(dst);
1391 				if( ret > 0 ){
1392 					logTrace("received bytes: %s", ret);
1393 					m_readBuffer.putN(ret);
1394 				}
1395 				read_more = ret == dst.length;
1396 				// ret == 0! let's look for some errors
1397 				if (read_more) {
1398 					if (m_readBuffer.freeSpace == 0)
1399 						m_readBuffer.capacity = m_readBuffer.capacity*2;
1400 					dst = m_readBuffer.peekDst();
1401 				}
1402 			} while( read_more );
1403 			if (conn.status.code == Status.ASYNC) {
1404 				m_mustRecv = false; // we'll have to wait
1405 				break; // the kernel's buffer is empty
1406 			}
1407 			// ret == 0! let's look for some errors
1408 			else if (conn.status.code == Status.ASYNC) {
1409 				m_mustRecv = false; // we'll have to wait
1410 				break; // the kernel's buffer is empty
1411 			}
1412 			else if (conn.status.code != Status.OK) {
1413 				// We have a read error and the socket may now even be closed...
1414 				auto err = conn.error;
1415 
1416 				logTrace("receive error %s %s", err, conn.status.code);
1417 				throw new Exception("Socket error: " ~ conn.status.code.to!string);
1418 			}
1419 			else {
1420 				m_mustRecv = false;
1421 				break;
1422 			}
1423 		}
1424 		logTrace("OnRead exit with free bytes: %s", m_readBuffer.freeSpace);
1425 	}
1426 
1427 	/* The AsyncTCPConnection object will be automatically disposed when this returns.
1428 	 * We're given some time to cleanup.
1429 	*/
1430 	private void onClose(in string msg = null, bool wake_ex = true) {
1431 		logTrace("onClose");
1432 
1433 		if (msg)
1434 			m_error = msg;
1435 		if (!m_closed) {
1436 
1437 			m_closed = true;
1438 
1439 			if (m_tcpImpl.conn && m_tcpImpl.conn.isConnected) {
1440 				m_tcpImpl.conn.kill(Task.getThis() != Task.init); // close the connection
1441 				m_tcpImpl.conn = null;
1442 			}
1443 		}
1444 		if (Task.getThis() != Task.init) {
1445 			return;
1446 		}
1447 		Exception ex;
1448 		if (!msg && wake_ex)
1449 			ex = new Exception("Connection closed");
1450 		else if (wake_ex)	ex = new Exception(msg);
1451 
1452 
1453 		Task reader = m_settings.reader.task;
1454 		Task writer = m_settings.writer.task;
1455 
1456 		bool hasUniqueReader = m_settings.reader.isWaiting;
1457 		bool hasUniqueWriter = m_settings.writer.isWaiting && reader != writer;
1458 
1459 		if (hasUniqueWriter && Task.getThis() != writer && wake_ex) {
1460 			getDriverCore().resumeTask(writer, ex);
1461 		}
1462 		if (hasUniqueReader && Task.getThis() != reader) {
1463 			getDriverCore().resumeTask(reader, m_settings.reader.noExcept?null:ex);
1464 		}
1465 	}
1466 
1467 	void onConnect() {
1468 		scope(failure) onClose();
1469 
1470 		if (m_tcpImpl.conn && m_tcpImpl.conn.isConnected)
1471 		{
1472 			bool inbound = m_tcpImpl.conn.inbound;
1473 
1474 			try m_settings.onConnect(this);
1475 			catch ( Exception e) {
1476 				//logError(e.toString);
1477 				throw e;
1478 			}
1479 			catch ( Throwable e) {
1480 				logError("%s", e.toString);
1481 				throw e;
1482 			}
1483 			if (inbound) close();
1484 		}
1485 		logTrace("Finished callback");
1486 	}
1487 
1488 	void handler(TCPEvent ev) {
1489 		logTrace("Handler");
1490 		Exception ex;
1491 		final switch (ev) {
1492 			case TCPEvent.CONNECT:
1493 				m_closed = false;
1494 				// read & write are guaranteed to be successful on any platform at this point
1495 
1496 				if (m_tcpImpl.conn.inbound)
1497 					runTask(&onConnect);
1498 				else onConnect();
1499 				m_settings.onConnect = null;
1500 				break;
1501 			case TCPEvent.READ:
1502 				// fill the read buffer and resume any task if waiting
1503 				try onRead();
1504 				catch (Exception e) ex = e;
1505 				if (m_settings.reader.isWaiting)
1506 					getDriverCore().resumeTask(m_settings.reader.task, ex);
1507 				goto case TCPEvent.WRITE; // sometimes the kernel notified write with read events
1508 			case TCPEvent.WRITE:
1509 				// The kernel is ready to have some more data written, all we need to do is wake up the writer
1510 				if (m_settings.writer.isWaiting)
1511 					getDriverCore().resumeTask(m_settings.writer.task, ex);
1512 				break;
1513 			case TCPEvent.CLOSE:
1514 				m_closed = false;
1515 				onClose();
1516 				if (m_settings.onConnect)
1517 					m_settings.onConnect(this);
1518 				m_settings.onConnect = null;
1519 				break;
1520 			case TCPEvent.ERROR:
1521 				m_closed = false;
1522 				onClose(conn.error);
1523 				if (m_settings.onConnect)
1524 					m_settings.onConnect(this);
1525 				m_settings.onConnect = null;
1526 				break;
1527 		}
1528 		return;
1529 	}
1530 
1531 	struct Waiter {
1532 		Task task; // we can only have one task waiting for read/write operations
1533 		bool isWaiting; // if a task is actively waiting
1534 		bool noExcept;
1535 
1536 		void acquire() {
1537 			assert(!this.isWaiting, "Acquiring waiter that is already in use.");
1538 			if (Task.getThis() == Task()) return;
1539 			logTrace("%s", "Acquire waiter");
1540 			assert(!amOwner(), "Failed to acquire waiter in task: " ~ Task.getThis().fiber.to!string ~ ", it was busy with: " ~ this.task.to!string);
1541 			this.task = Task.getThis();
1542 			this.isWaiting = true;
1543 		}
1544 
1545 		void release() {
1546 			if (Task.getThis() == Task()) return;
1547 			logTrace("%s", "Release waiter");
1548 			assert(amOwner());
1549 			this.isWaiting = false;
1550 		}
1551 
1552 		bool amOwner() const {
1553 			if (this.isWaiting && this.task == Task.getThis())
1554 				return true;
1555 			return false;
1556 		}
1557 	}
1558 
1559 	struct Settings {
1560 		void delegate(TCPConnection) onConnect;
1561 		Duration readTimeout;
1562 		bool keepAlive;
1563 		bool tcpNoDelay;
1564 		Waiter reader;
1565 		Waiter writer;
1566 	}
1567 
1568 	struct TCPConnectionImpl {
1569 		NetworkAddress localAddr;
1570 		AsyncTCPConnection conn;
1571 	}
1572 }
1573 
1574 int total_conn;
1575 
1576 final class LibasyncUDPConnection : UDPConnection {
1577 @trusted:
1578 	private {
1579 		Task m_task;
1580 		AsyncUDPSocket m_udpImpl;
1581 		bool m_canBroadcast;
1582 		NetworkAddressLA m_peer;
1583 
1584 		bool m_waiting;
1585 	}
1586 
1587 	private @property AsyncUDPSocket socket() {
1588 		return m_udpImpl;
1589 	}
1590 
1591 	this(AsyncUDPSocket conn)
1592 	in { assert(conn !is null); }
1593 	body {
1594 		m_udpImpl = conn;
1595 	}
1596 
1597 	override @property string bindAddress() const {
1598 
1599 		return m_udpImpl.local.toAddressString();
1600 	}
1601 
1602 	override @property NetworkAddress localAddress() const { return NetworkAddress(m_udpImpl.local); }
1603 
1604 	override @property bool canBroadcast() const { return m_canBroadcast; }
1605 	override @property void canBroadcast(bool val)
1606 	{
1607 		socket.broadcast(val);
1608 		m_canBroadcast = val;
1609 	}
1610 
1611 	override void close()
1612 	{
1613 		socket.kill();
1614 		m_udpImpl = null;
1615 	}
1616 
1617 	bool amOwner() {
1618 		return m_task != Task() && m_task == Task.getThis();
1619 	}
1620 
1621 	void acquire()
1622 	{
1623 		assert(m_task == Task(), "Trying to acquire a UDP socket that is currently owned.");
1624 		m_task = Task.getThis();
1625 	}
1626 
1627 	void release()
1628 	{
1629 		assert(m_task != Task(), "Trying to release a UDP socket that is not owned.");
1630 		assert(m_task == Task.getThis(), "Trying to release a foreign UDP socket.");
1631 		m_task = Task();
1632 	}
1633 
1634 	override void connect(string host, ushort port)
1635 	{
1636 		// assert(m_peer == NetworkAddress.init, "Cannot connect to another peer");
1637 		NetworkAddress addr = getEventDriver().resolveHost(host, localAddress.family, true);
1638 		addr.port = port;
1639 		connect(addr);
1640 	}
1641 
1642 	override void connect(NetworkAddress addr)
1643 	{
1644 		m_peer = cast(NetworkAddressLA)addr;
1645 	}
1646 
1647 	override void send(in ubyte[] data, in NetworkAddress* peer_address = null)
1648 	{
1649 		assert(data.length <= int.max);
1650 		uint ret;
1651 		size_t retries = 3;
1652 		foreach  (i; 0 .. retries) {
1653 			if( peer_address ){
1654 				auto pa = cast(NetworkAddressLA)*cast(NetworkAddress*)peer_address;
1655 				ret = socket.sendTo(data, pa);
1656 			} else {
1657 				ret = socket.sendTo(data, m_peer);
1658 			}
1659 			if (socket.status.code == Status.ASYNC) {
1660 				m_waiting = true;
1661 				getDriverCore().yieldForEvent();
1662 			}
1663 			else break;
1664 		}
1665 
1666 		logTrace("send ret: %s, %s", ret, socket.status.text);
1667 		enforce(socket.status.code == Status.OK, "Error sending UDP packet: " ~ socket.status.text);
1668 
1669 		enforce(ret == data.length, "Unable to send full packet.");
1670 	}
1671 
1672 	override ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null)
1673 	{
1674 		return recv(Duration.max, buf, peer_address);
1675 	}
1676 
1677 	override ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null)
1678 	{
1679 		size_t tm = size_t.max;
1680 		auto m_driver = getEventDriver();
1681 		if (timeout != Duration.max && timeout > 0.seconds) {
1682 			tm = m_driver.createTimer(null);
1683 			m_driver.rearmTimer(tm, timeout, false);
1684 			m_driver.acquireTimer(tm);
1685 		}
1686 
1687 		acquire();
1688 		scope(exit) {
1689 			release();
1690 			if (tm != size_t.max) m_driver.releaseTimer(tm);
1691 		}
1692 
1693 		assert(buf.length <= int.max);
1694 		if( buf.length == 0 ) buf.length = 65507;
1695 		NetworkAddressLA from;
1696 		from.family = localAddress.family;
1697 		while(true){
1698 			auto ret = socket.recvFrom(buf, from);
1699 			if( ret > 0 ){
1700 				if( peer_address ) *peer_address = NetworkAddress(from);
1701 				return buf[0 .. ret];
1702 			}
1703 			else if( socket.status.code != Status.OK ){
1704 				auto err = socket.status.text;
1705 				logDebug("UDP recv err: %s", err);
1706 				enforce(socket.status.code == Status.ASYNC, "Error receiving UDP packet");
1707 
1708 				if (timeout != Duration.max) {
1709 					enforce(timeout > 0.seconds && m_driver.isTimerPending(tm), "UDP receive timeout.");
1710 				}
1711 			}
1712 			m_waiting = true;
1713 			getDriverCore().yieldForEvent();
1714 		}
1715 	}
1716 
1717 	void addMembership(ref NetworkAddress multiaddr)
1718 	{
1719 		assert(false, "TODO!");
1720 	}
1721 
1722 	@property void multicastLoopback(bool loop)
1723 	{
1724 		assert(false, "TODO!");
1725 	}
1726 
1727 	private void handler(UDPEvent ev)
1728 	{
1729 		logTrace("UDPConnection %p event", this);
1730 
1731 		Exception ex;
1732 		final switch (ev) {
1733 			case UDPEvent.READ:
1734 				if (m_waiting) {
1735 					m_waiting = false;
1736 					getDriverCore().resumeTask(m_task, null);
1737 				}
1738 				break;
1739 			case UDPEvent.WRITE:
1740 				if (m_waiting) {
1741 					m_waiting = false;
1742 					getDriverCore().resumeTask(m_task, null);
1743 				}
1744 				break;
1745 			case UDPEvent.ERROR:
1746 				getDriverCore.resumeTask(m_task, new Exception(socket.error));
1747 				break;
1748 		}
1749 
1750 	}
1751 }
1752 
1753 
1754 
1755 /* The following is used for LibasyncManualEvent */
1756 
1757 import std.container : Array;
1758 Array!(Array!Task) s_eventWaiters; // Task list in the current thread per instance ID
1759 __gshared Array!size_t gs_availID;
1760 __gshared size_t gs_maxID;
1761 __gshared core.sync.mutex.Mutex gs_mutex;
1762 
1763 private size_t generateID()
1764 nothrow @trusted {
1765 	size_t idx;
1766 	import std.algorithm : max;
1767 	try {
1768 		size_t getIdx() {
1769 			if (!gs_availID.empty) {
1770 				immutable size_t ret = gs_availID.back;
1771 				gs_availID.removeBack();
1772 				return ret;
1773 			}
1774 			return 0;
1775 		}
1776 
1777 		synchronized(gs_mutex) {
1778 			idx = getIdx();
1779 			if (idx == 0) {
1780 				import std.range : iota;
1781 				gs_availID.insert( iota(gs_maxID + 1, max(32, gs_maxID * 2 + 1), 1) );
1782 				gs_maxID = gs_availID[$-1];
1783 				idx = getIdx();
1784 			}
1785 		}
1786 	} catch (Exception e) {
1787 		assert(false, "Failed to generate necessary ID for Manual Event waiters: " ~ e.msg);
1788 	}
1789 
1790 	return idx - 1;
1791 }
1792 
1793 void recycleID(size_t id)
1794 @trusted nothrow {
1795 	try {
1796 		synchronized(gs_mutex) gs_availID.insert(id+1);
1797 	}
1798 	catch (Exception e) {
1799 		assert(false, "Error destroying Manual Event ID: " ~ id.to!string ~ " [" ~ e.msg ~ "]");
1800 	}
1801 }