1 /**
2 	Driver implementation for Win32 using WSAAsyncSelect
3 
4 	See_Also:
5 		`vibe.core.driver` = interface definition
6 
7 	Copyright: © 2012-2015 Sönke Ludwig
8 	Authors: Sönke Ludwig, Leonid Kramer
9 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
10 */
11 module vibe.core.drivers.win32;
12 
13 version(VibeWin32Driver)
14 {
15 
16 import vibe.core.core;
17 import vibe.core.driver;
18 import vibe.core.drivers.timerqueue;
19 import vibe.core.drivers.utils;
20 import vibe.core.log;
21 import vibe.internal.win32;
22 import vibe.internal.meta.traits : synchronizedIsNothrow;
23 import vibe.utils.array;
24 import vibe.utils.hashmap;
25 
26 import core.atomic;
27 import core.sync.mutex;
28 import core.sys.windows.windows;
29 import core.time;
30 import core.thread;
31 import std.algorithm;
32 import std.conv;
33 import std.datetime;
34 import std.exception;
35 import std.string : lastIndexOf;
36 import std.typecons;
37 import std.utf;
38 
39 import core.sys.windows.windows;
40 import core.sys.windows.winsock2;
41 
42 enum WM_USER_SIGNAL = WM_USER+101;
43 enum WM_USER_SOCKET = WM_USER+102;
44 
45 pragma(lib, "wsock32");
46 pragma(lib, "ws2_32");
47 
48 /******************************************************************************/
49 /* class Win32EventDriver                                                     */
50 /******************************************************************************/
51 
52 final class Win32EventDriver : EventDriver {
53 @trusted:
54 	import std.container : Array, BinaryHeap, heapify;
55 	import std.datetime : Clock;
56 
57 	private {
58 		HWND m_hwnd;
59 		DWORD m_tid;
60 		DriverCore m_core;
61 		bool m_exit = false;
62 		SocketEventHandler[SOCKET] m_socketHandlers;
63 		HANDLE[] m_registeredEvents;
64 		HANDLE m_fileCompletionEvent;
65 		bool[Win32TCPConnection] m_fileWriters;
66 
67 		TimerQueue!TimerInfo m_timers;
68 	}
69 
70 	this(DriverCore core)
71 	{
72 		setupWindowClass();
73 
74 		m_core = core;
75 		m_tid = GetCurrentThreadId();
76 		m_hwnd = CreateWindowA("VibeWin32MessageWindow", "VibeWin32MessageWindow", 0, 0,0,0,0, HWND_MESSAGE,null,null,null);
77 
78 		SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, cast(ULONG_PTR)cast(void*)this);
79 		assert(cast(Win32EventDriver)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is this);
80 
81 		WSADATA wd;
82 		enforce(WSAStartup(0x0202, &wd) == 0, "Failed to initialize WinSock");
83 
84 		m_fileCompletionEvent = CreateEventW(null, false, false, null);
85 		m_registeredEvents ~= m_fileCompletionEvent;
86 	}
87 
88 	override void dispose()
89 	{
90 //		DestroyWindow(m_hwnd);
91 	}
92 
93 	override int runEventLoop()
94 	{
95 		void removePendingQuitMessages() @trusted {
96 			MSG msg;
97 			while (PeekMessageW(&msg, null, WM_QUIT, WM_QUIT, PM_REMOVE)) {}
98 		}
99 
100 		// clear all possibly outstanding WM_QUIT messages to avoid
101 		// them having an influence this runEventLoop()
102 		removePendingQuitMessages();
103 
104 		m_exit = false;
105 		while (!m_exit && haveEvents())
106 			runEventLoopOnce();
107 
108 		// remove quit messages here to avoid them having an influence on
109 		// processEvets or runEventLoopOnce
110 		removePendingQuitMessages();
111 		return 0;
112 	}
113 
114 	override int runEventLoopOnce()
115 	{
116 		doProcessEvents(INFINITE);
117 		return 0;
118 	}
119 
120 	override bool processEvents()
121 	{
122 		return doProcessEvents(0);
123 	}
124 
125 	bool doProcessEvents(uint timeout_msecs)
126 	@trusted {
127 		assert(m_tid == GetCurrentThreadId());
128 
129 		waitForEvents(timeout_msecs);
130 
131 		processTimers();
132 
133 		MSG msg;
134 		//uint cnt = 0;
135 		while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) {
136 			if( msg.message == WM_QUIT ) {
137 				m_exit = true;
138 				return false;
139 			}
140 			if( msg.message == WM_USER_SIGNAL )
141 				msg.hwnd = m_hwnd;
142 			TranslateMessage(&msg);
143 			DispatchMessageW(&msg);
144 
145 			// process timers every now and then so that they don't get stuck
146 			//if (++cnt % 10 == 0) processTimers();
147 		}
148 
149 		if (timeout_msecs != 0) m_core.notifyIdle();
150 
151 		return true;
152 	}
153 
154 	private bool haveEvents()
155 	@safe {
156 		version(VibePartialAutoExit)
157 			return !m_fileWriters.byKey.empty || !m_socketHandlers.byKey.empty;
158 		else return true;
159 	}
160 
161 	private void waitForEvents(uint timeout_msecs)
162 	{
163 		// if timers are pending, limit the wait time to the first timer timeout
164 		auto next_timer = m_timers.getFirstTimeout();
165 		if (timeout_msecs > 0 && next_timer != SysTime.max) {
166 			auto now = Clock.currStdTime();
167 			auto timer_timeout = (next_timer.stdTime - now) / 10_000;
168 			if (timeout_msecs == INFINITE || timer_timeout < timeout_msecs)
169 				timeout_msecs = cast(uint)(timer_timeout < 0 ? 0 : timer_timeout > uint.max ? uint.max : timer_timeout);
170 		}
171 
172 		auto ret = MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE);
173 		if( ret == WAIT_OBJECT_0 ){
174 			Win32TCPConnection[] to_remove;
175 			foreach( fw; m_fileWriters.byKey )
176 				if( fw.testFileWritten() )
177 					to_remove ~= fw;
178 			foreach( fw; to_remove )
179 			m_fileWriters.remove(fw);
180 		}
181 	}
182 
183 	private void processTimers()
184 	{
185 		if (!m_timers.anyPending) return;
186 
187 		// process all timers that have expired up to now
188 		auto now = Clock.currTime(UTC());
189 		m_timers.consumeTimeouts(now, (timer, periodic, ref data) {
190 			Task owner = data.owner;
191 			auto callback = data.callback;
192 			if (!periodic) releaseTimer(timer);
193 			if (owner && owner.running) m_core.resumeTask(owner);
194 			if (callback) runTask(callback);
195 		});
196 	}
197 
198 	override void exitEventLoop()
199 	{
200 		m_exit = true;
201 		PostThreadMessageW(m_tid, WM_QUIT, 0, 0);
202 	}
203 
204 	override Win32FileStream openFile(Path path, FileMode mode)
205 	{
206 		assert(m_tid == GetCurrentThreadId());
207 		return new Win32FileStream(m_core, path, mode);
208 	}
209 
210 	override DirectoryWatcher watchDirectory(Path path, bool recursive)
211 	{
212 		assert(m_tid == GetCurrentThreadId());
213 		return new Win32DirectoryWatcher(m_core, path, recursive);
214 	}
215 
216 	override NetworkAddress resolveHost(string host, ushort family = AF_UNSPEC, bool use_dns = true)
217 	{
218 		static immutable ushort[] addrfamilies = [AF_INET, AF_INET6];
219 
220 		NetworkAddress addr;
221 		foreach( af; addrfamilies ){
222 			if( family != af && family != AF_UNSPEC ) continue;
223 			addr.family = af;
224 
225 			INT addrlen = addr.sockAddrLen;
226 			auto ret = WSAStringToAddressW(toUTFz!(immutable(wchar)*)(host), af, null, addr.sockAddr, &addrlen);
227 			if( ret != 0 ) continue;
228 			assert(addrlen == addr.sockAddrLen);
229 			return addr;
230 		}
231 
232 		enforce(use_dns, "Invalid IP address string: "~host);
233 
234 		LookupStatus status;
235 		status.task = Task.getThis();
236 		status.driver = this;
237 		status.finished = false;
238 
239 		WSAOVERLAPPEDX overlapped;
240 		overlapped.Internal = 0;
241 		overlapped.InternalHigh = 0;
242 		overlapped.hEvent = cast(HANDLE)cast(void*)&status;
243 
244 		version(none){ // Windows 8+
245 			void* aif;
246 			ADDRINFOEXW addr_hint;
247 			ADDRINFOEXW* addr_ret;
248 			addr_hint.ai_family = family;
249 			addr_hint.ai_socktype = SOCK_STREAM;
250 			addr_hint.ai_protocol = IPPROTO_TCP;
251 
252 			enforce(GetAddrInfoExW(toUTFz!(immutable(wchar)*)(host), null, NS_DNS, null, &addr_hint, &addr_ret, null, &overlapped, &onDnsResult, null) == 0, "Failed to lookup host");
253 			while( !status.finished ) m_core.yieldForEvent();
254 			enforce(!status.error, "Failed to lookup host: "~to!string(status.error));
255 
256 			aif = addr_ret;
257 			addr.family = cast(ubyte)addr_ret.ai_family;
258 			switch(addr.family){
259 				default: assert(false, "Invalid address family returned from host lookup.");
260 				case AF_INET: addr.sockAddrInet4 = *cast(sockaddr_in*)addr_ret.ai_addr; break;
261 				case AF_INET6: addr.sockAddrInet6 = *cast(sockaddr_in6*)addr_ret.ai_addr; break;
262 			}
263 			FreeAddrInfoExW(addr_ret);
264 		} else {
265 			auto he = gethostbyname(toUTFz!(immutable(char)*)(host));
266 			socketEnforce(he !is null, "Failed to look up host "~host);
267 			addr.family = he.h_addrtype;
268 			switch(addr.family){
269 				default: assert(false, "Invalid address family returned from host lookup.");
270 				case AF_INET: addr.sockAddrInet4.sin_addr = *cast(in_addr*)he.h_addr_list[0]; break;
271 				case AF_INET6: addr.sockAddrInet6.sin6_addr = *cast(in6_addr*)he.h_addr_list[0]; break;
272 			}
273 		}
274 
275 		return addr;
276 	}
277 
278 	override Win32TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_addr)
279 	{
280 		assert(m_tid == GetCurrentThreadId());
281 
282 		auto sock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
283 		socketEnforce(sock != INVALID_SOCKET, "Failed to create socket");
284 
285 		socketEnforce(bind(sock, bind_addr.sockAddr, bind_addr.sockAddrLen) == 0, "Failed to bind socket");
286 
287 		auto conn = new Win32TCPConnection(this, sock, addr);
288 		conn.connect(addr);
289 		return conn;
290 	}
291 
292 	override Win32TCPListener listenTCP(ushort port, void delegate(TCPConnection conn) @safe conn_callback, string bind_address, TCPListenOptions options)
293 	{
294 		assert(m_tid == GetCurrentThreadId());
295 		auto addr = resolveHost(bind_address);
296 		addr.port = port;
297 
298 		auto sock = WSASocketW(addr.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
299 		socketEnforce(sock != INVALID_SOCKET, "Failed to create socket");
300 
301 		socketEnforce(bind(sock, addr.sockAddr, addr.sockAddrLen) == 0,
302 			"Failed to bind listening socket");
303 
304 		socketEnforce(listen(sock, 128) == 0,
305 			"Failed to listen");
306 
307 		socklen_t balen = addr.sockAddrLen;
308 		socketEnforce(getsockname(sock, addr.sockAddr, &balen) == 0, "getsockname failed");
309 
310 		// TODO: support TCPListenOptions.distribute
311 
312 		return new Win32TCPListener(this, sock, addr, conn_callback, options);
313 	}
314 
315 	override Win32UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0")
316 	{
317 		assert(m_tid == GetCurrentThreadId());
318 		/*auto addr = resolveHost(bind_address);
319 		addr.port = port;*/
320 
321 		assert(false);
322 	}
323 
324 	override Win32ManualEvent createManualEvent()
325 	{
326 		assert(m_tid == GetCurrentThreadId());
327 		return new Win32ManualEvent(this);
328 	}
329 
330 	override FileDescriptorEvent createFileDescriptorEvent(int file_descriptor, FileDescriptorEvent.Trigger events, FileDescriptorEvent.Mode mode)
331 	{
332 		assert(false, "Not implemented.");
333 	}
334 
335 	override size_t createTimer(void delegate() @safe callback) { return m_timers.create(TimerInfo(callback)); }
336 
337 	override void acquireTimer(size_t timer_id) { m_timers.getUserData(timer_id).refCount++; }
338 	override void releaseTimer(size_t timer_id)
339 	nothrow {
340 		if (!--m_timers.getUserData(timer_id).refCount)
341 			m_timers.destroy(timer_id);
342 	}
343 
344 	override bool isTimerPending(size_t timer_id) { return m_timers.isPending(timer_id); }
345 
346 	override void rearmTimer(size_t timer_id, Duration dur, bool periodic)
347 	{
348 		if (!m_timers.isPending(timer_id))
349 			acquireTimer(timer_id);
350 		m_timers.schedule(timer_id, dur, periodic);
351 	}
352 
353 	override void stopTimer(size_t timer_id)
354 	{
355 		if (m_timers.isPending(timer_id))
356 			releaseTimer(timer_id);
357 		m_timers.unschedule(timer_id);
358 	}
359 
360 	override void waitTimer(size_t timer_id)
361 	{
362 		while (true) {
363 			auto data = &m_timers.getUserData(timer_id);
364 			assert(data.owner == Task.init, "Waiting for the same timer from multiple tasks is not supported.");
365 			if (!m_timers.isPending(timer_id)) return;
366 			data.owner = Task.getThis();
367 			scope (exit) m_timers.getUserData(timer_id).owner = Task.init;
368 			m_core.yieldForEvent();
369 		}
370 	}
371 
372 
373 	static struct LookupStatus {
374 		Task task;
375 		DWORD error;
376 		bool finished;
377 		Win32EventDriver driver;
378 	}
379 
380 	private static nothrow extern(System)
381 	void onDnsResult(DWORD dwError, DWORD /*dwBytes*/, WSAOVERLAPPEDX* lpOverlapped)
382 	{
383 		auto stat = cast(LookupStatus*)cast(void*)lpOverlapped.hEvent;
384 		stat.error = dwError;
385 		stat.finished = true;
386 		if( stat.task )
387 			try stat.driver.m_core.resumeTask(stat.task);
388 			catch (UncaughtException th) logWarn("Resuming task for DNS lookup has thrown: %s", th.msg);
389 	}
390 
391 	private static nothrow extern(System)
392 	LRESULT onMessage(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam)
393 	{
394 		auto driver = cast(Win32EventDriver)cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA);
395 		switch(msg){
396 			default: break;
397 			case WM_USER_SIGNAL:
398 				auto sig = cast(Win32ManualEvent)cast(void*)lparam;
399 				Win32EventDriver[Task] lst;
400 				try {
401 					synchronized(sig.m_mutex) lst = sig.m_listeners.dup;
402 					foreach( task, tid; lst )
403 						if( tid is driver && task )
404 							driver.m_core.resumeTask(task);
405 				} catch(UncaughtException th){
406 					logWarn("Failed to resume signal listeners: %s", th.msg);
407 					return 0;
408 				}
409 				return 0;
410 			case WM_USER_SOCKET:
411 				SOCKET sock = cast(SOCKET)wparam;
412 				auto evt = LOWORD(lparam);
413 				auto err = HIWORD(lparam);
414 				auto ph = sock in driver.m_socketHandlers;
415 				if( ph is null ){
416 					logWarn("Socket %s has no associated handler for event %s/%s", sock, evt, err);
417 				} else ph.notifySocketEvent(sock, evt, err);
418 				return 0;
419 		}
420 		return DefWindowProcA(wnd, msg, wparam, lparam);
421 	}
422 }
423 
424 interface SocketEventHandler {
425 	SOCKET socket() nothrow;
426 	void notifySocketEvent(SOCKET sock, WORD event, WORD error) nothrow;
427 }
428 
429 private struct TimerInfo {
430 	size_t refCount = 1;
431 	void delegate() callback;
432 	Task owner;
433 
434 	this(void delegate() callback) { this.callback = callback; }
435 }
436 
437 
438 /******************************************************************************/
439 /* class Win32ManualEvent                                                     */
440 /******************************************************************************/
441 
442 final class Win32ManualEvent : ManualEvent {
443 @trusted:
444 	private {
445 		core.sync.mutex.Mutex m_mutex;
446 		Win32EventDriver m_driver;
447 		Win32EventDriver[Task] m_listeners;
448 		shared int m_emitCount = 0;
449 		Task m_waiter;
450 		bool m_timedOut;
451 	}
452 
453 	this(Win32EventDriver driver)
454 	nothrow {
455 		scope (failure) assert(false); // Mutex.this() now nothrow < 2.070
456 		m_mutex = new core.sync.mutex.Mutex;
457 		m_driver = driver;
458 	}
459 
460 	override void emit()
461 	{
462 		scope (failure) assert(false); // AA.opApply is not nothrow
463 		/*auto newcnt =*/ atomicOp!"+="(m_emitCount, 1);
464 		bool[Win32EventDriver] threads;
465 		synchronized(m_mutex)
466 		{
467 			foreach( th; m_listeners )
468 				threads[th] = true;
469 		}
470 		foreach( th, _; threads )
471 			if( !PostMessageW(th.m_hwnd, WM_USER_SIGNAL, 0, cast(LPARAM)cast(void*)this) )
472 				logWarn("Failed to post thread message.");
473 	}
474 
475 	override void wait() { wait(m_emitCount); }
476 	override int wait(int reference_emit_count) { return  doWait!true(reference_emit_count); }
477 	override int wait(Duration timeout, int reference_emit_count) { return doWait!true(timeout, reference_emit_count); }
478 	override int waitUninterruptible(int reference_emit_count) { return  doWait!false(reference_emit_count); }
479 	override int waitUninterruptible(Duration timeout, int reference_emit_count) { return doWait!false(timeout, reference_emit_count); }
480 
481 	void acquire()
482 	nothrow {
483 		static if (!synchronizedIsNothrow)
484 			scope (failure) assert(0, "Internal error: function should be nothrow");
485 
486 		synchronized(m_mutex)
487 		{
488 			m_listeners[Task.getThis()] = cast(Win32EventDriver)getEventDriver();
489 		}
490 	}
491 
492 	void release()
493 	nothrow {
494 		static if (!synchronizedIsNothrow)
495 			scope (failure) assert(0, "Internal error: function should be nothrow");
496 
497 		auto self = Task.getThis();
498 		synchronized(m_mutex)
499 		{
500 			if( self in m_listeners )
501 				m_listeners.remove(self);
502 		}
503 	}
504 
505 	bool amOwner()
506 	nothrow {
507 		static if (!synchronizedIsNothrow)
508 			scope (failure) assert(0, "Internal error: function should be nothrow");
509 
510 		synchronized(m_mutex)
511 		{
512 			return (Task.getThis() in m_listeners) !is null;
513 		}
514 	}
515 
516 	override @property int emitCount() const { return atomicLoad(m_emitCount); }
517 
518 	private int doWait(bool INTERRUPTIBLE)(int reference_emit_count)
519 	{
520 		//logDebugV("Signal %s wait enter %s", cast(void*)this, reference_emit_count);
521 		acquire();
522 		scope(exit) release();
523 		auto ec = atomicLoad(m_emitCount);
524 		while( ec == reference_emit_count ){
525 			static if (INTERRUPTIBLE) m_driver.m_core.yieldForEvent();
526 			else m_driver.m_core.yieldForEventDeferThrow();
527 			ec = atomicLoad(m_emitCount);
528 		}
529 		//logDebugV("Signal %s wait leave %s", cast(void*)this, ec);
530 		return ec;
531 	}
532 
533 	private int doWait(bool INTERRUPTIBLE)(Duration timeout, int reference_emit_count = emitCount)
534 	{
535 		static if (!INTERRUPTIBLE) scope (failure) assert(false); // timer functions are still not nothrow
536 
537 		acquire();
538 		scope(exit) release();
539 		auto ec = atomicLoad(m_emitCount);
540 		m_timedOut = false;
541 		m_waiter = Task.getThis();
542 		auto timer = m_driver.createTimer(null);
543 		scope(exit) m_driver.releaseTimer(timer);
544 		m_driver.m_timers.getUserData(timer).owner = Task.getThis();
545 		m_driver.rearmTimer(timer, timeout, false);
546 		while (ec == reference_emit_count && !m_driver.isTimerPending(timer)) {
547 			static if (INTERRUPTIBLE) m_driver.m_core.yieldForEvent();
548 			else m_driver.m_core.yieldForEventDeferThrow();
549 			ec = atomicLoad(m_emitCount);
550 		}
551 		return ec;
552 	}
553 }
554 
555 
556 /******************************************************************************/
557 /* class Win32FileStream                                                      */
558 /******************************************************************************/
559 
560 final class Win32FileStream : FileStream {
561 @trusted:
562 	private {
563 		Path m_path;
564 		HANDLE m_handle;
565 		FileMode m_mode;
566 		DriverCore m_driver;
567 		Task m_task;
568 		ulong m_size;
569 		ulong m_ptr = 0;
570 		DWORD m_bytesTransferred;
571 	}
572 
573 	this(DriverCore driver, Path path, FileMode mode)
574 	{
575 		m_path = path;
576 		m_mode = mode;
577 		m_driver = driver;
578 
579 		auto access = m_mode == FileMode.readWrite ? (GENERIC_WRITE | GENERIC_READ) :
580 						(m_mode == FileMode.createTrunc || m_mode == FileMode.append)? GENERIC_WRITE : GENERIC_READ;
581 
582 		auto shareMode = m_mode == FileMode.read? FILE_SHARE_READ : 0;
583 
584 		auto creation = m_mode == FileMode.createTrunc? CREATE_ALWAYS : m_mode == FileMode.append? OPEN_ALWAYS : OPEN_EXISTING;
585 
586 		m_handle = CreateFileW(
587 					toUTF16z(m_path.toNativeString()),
588 					access,
589 					shareMode,
590 					null,
591 					creation,
592 					FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
593 					null);
594 
595 		auto errorcode = GetLastError();
596 		enforce(m_handle != INVALID_HANDLE_VALUE, "Failed to open "~path.toNativeString()~": "~to!string(errorcode));
597 		if(mode == FileMode.createTrunc && errorcode == ERROR_ALREADY_EXISTS)
598 		{
599 			// truncate file
600 			// TODO: seek to start pos?
601 			BOOL ret = vibe.internal.win32.SetEndOfFile(m_handle);
602 			errorcode = GetLastError();
603 			enforce(ret, "Failed to call SetFileEndPos for path "~path.toNativeString()~", Error: " ~ to!string(errorcode));
604 		}
605 
606 		long size;
607 		auto succeeded = GetFileSizeEx(m_handle, &size);
608 		enforce(succeeded);
609 		m_size = size;
610 	}
611 
612 	~this()
613 	{
614 		close();
615 	}
616 
617 	void release()
618 	{
619 		assert(m_task == Task.getThis(), "Releasing FileStream that is not owned by the calling task.");
620 		m_task = Task();
621 	}
622 
623 	void acquire()
624 	{
625 		assert(m_task == Task(), "Acquiring FileStream that is already owned.");
626 		m_task = Task.getThis();
627 	}
628 
629 	bool amOwner()
630 	{
631 		return m_task == Task.getThis();
632 	}
633 
634 	override void close()
635 	{
636 		if(m_handle == INVALID_HANDLE_VALUE)
637 			return;
638 		CloseHandle(m_handle);
639 		m_handle = INVALID_HANDLE_VALUE;
640 	}
641 
642 	override ulong tell() { return m_ptr; }
643 
644 	override @property Path path() const { return m_path; }
645 
646 	override @property bool isOpen() const { return m_handle != INVALID_HANDLE_VALUE; }
647 
648 	override @property ulong size() const { return m_size; }
649 
650 	override @property bool readable()
651 	const {
652 		return m_mode != FileMode.append;
653 	}
654 
655 	override @property bool writable()
656 	const {
657 		return m_mode == FileMode.append || m_mode == FileMode.createTrunc || m_mode == FileMode.readWrite;
658 	}
659 
660 	override void seek(ulong offset)
661 	{
662 		m_ptr = offset;
663 	}
664 
665 
666 	override @property bool empty() const { assert(this.readable); return m_ptr >= m_size; }
667 	override @property ulong leastSize() const { assert(this.readable); return m_size - m_ptr; }
668 	override @property bool dataAvailableForRead(){
669 		return leastSize() > 0;
670 	}
671 
672 	override const(ubyte)[] peek(){
673 		assert(false);
674 	}
675 
676 	override size_t read(scope ubyte[] dst, IOMode)
677 	{
678 		assert(this.readable);
679 		acquire();
680 		scope(exit) release();
681 
682 		size_t nbytes = 0;
683 		while (dst.length > 0) {
684 			enforce(dst.length <= leastSize);
685 			OVERLAPPED overlapped;
686 			overlapped.Internal = 0;
687 			overlapped.InternalHigh = 0;
688 			overlapped.Offset = cast(uint)(m_ptr & 0xFFFFFFFF);
689 			overlapped.OffsetHigh = cast(uint)(m_ptr >> 32);
690 			overlapped.hEvent = cast(HANDLE)cast(void*)this;
691 			m_bytesTransferred = 0;
692 
693 			auto to_read = min(dst.length, DWORD.max);
694 
695 			// request to write the data
696 			ReadFileEx(m_handle, cast(void*)dst, to_read, &overlapped, &onIOCompleted);
697 
698 			// yield until the data is read
699 			while( !m_bytesTransferred ) m_driver.yieldForEvent();
700 
701 			assert(m_bytesTransferred <= to_read, "More bytes read than requested!?");
702 			dst = dst[m_bytesTransferred .. $];
703 			m_ptr += m_bytesTransferred;
704 			nbytes += m_bytesTransferred;
705 		}
706 
707 		return nbytes;
708 	}
709 
710 	override size_t write(in ubyte[] bytes_, IOMode)
711 	{
712 		assert(this.writable, "File is not writable");
713 		acquire();
714 		scope(exit) release();
715 
716 		const(ubyte)[] bytes = bytes_;
717 
718 		size_t nbytes = 0;
719 		while (bytes.length > 0) {
720 			OVERLAPPED overlapped;
721 			overlapped.Internal = 0;
722 			overlapped.InternalHigh = 0;
723 			overlapped.Offset = cast(uint)(m_ptr & 0xFFFFFFFF);
724 			overlapped.OffsetHigh = cast(uint)(m_ptr >> 32);
725 			overlapped.hEvent = cast(HANDLE)cast(void*)this;
726 			m_bytesTransferred = 0;
727 
728 			auto to_write = min(bytes.length, DWORD.max);
729 
730 			// request to write the data
731 			WriteFileEx(m_handle, cast(void*)bytes, to_write, &overlapped, &onIOCompleted);
732 
733 			// yield until the data is written
734 			while( !m_bytesTransferred ) m_driver.yieldForEvent();
735 
736 			assert(m_bytesTransferred <= to_write, "More bytes written than requested!?");
737 			bytes = bytes[m_bytesTransferred .. $];
738 			m_ptr += m_bytesTransferred;
739 			nbytes += m_bytesTransferred;
740 		}
741 		if(m_ptr > m_size) m_size = m_ptr;
742 
743 		return nbytes;
744 	}
745 
746 	override void flush(){}
747 
748 	override void finalize(){}
749 
750 	private static extern(System) nothrow
751 	void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
752 	{
753 		try {
754 			auto fileStream = cast(Win32FileStream)(overlapped.hEvent);
755 			fileStream.m_bytesTransferred = cbTransferred;
756 			if( fileStream.m_task ){
757 				Exception ex;
758 				if( dwError != 0 ) ex = new Exception("File I/O error: "~to!string(dwError));
759 				if( fileStream.m_task ) fileStream.m_driver.resumeTask(fileStream.m_task, ex);
760 			}
761 		} catch( UncaughtException e ){
762 			logWarn("Exception while handling file I/O: %s", e.msg);
763 		}
764 	}
765 }
766 
767 
768 /******************************************************************************/
769 /* class Win32Directory Watcher                                               */
770 /******************************************************************************/
771 
772 final class Win32DirectoryWatcher : DirectoryWatcher {
773 @trusted:
774 	private {
775 		Path m_path;
776 		bool m_recursive;
777 		HANDLE m_handle;
778 		DWORD m_bytesTransferred;
779 		DriverCore m_core;
780 		ubyte[16384] m_buffer;
781 		UINT m_notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME|
782 			FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE;
783 		Task m_task;
784 	}
785 
786 	this(DriverCore core, Path path, bool recursive)
787 	{
788 		m_core = core;
789 		m_path = path;
790 		m_recursive = recursive;
791 		m_task = Task.getThis();
792 
793 		auto pstr = m_path.toString();
794 		m_handle = CreateFileW(toUTFz!(const(wchar)*)(pstr),
795 							   FILE_LIST_DIRECTORY,
796 							   FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
797 							   null,
798 							   OPEN_EXISTING,
799 							   FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
800 							   null);
801 	}
802 
803 	~this()
804 	{
805 		CloseHandle(m_handle);
806 	}
807 
808 	override @property Path path() const { return m_path; }
809 	override @property bool recursive() const { return m_recursive; }
810 
811 	void release()
812 	{
813 		assert(m_task == Task.getThis(), "Releasing FileStream that is not owned by the calling task.");
814 		m_task = Task();
815 	}
816 
817 	void acquire()
818 	{
819 		assert(m_task == Task(), "Acquiring FileStream that is already owned.");
820 		m_task = Task.getThis();
821 	}
822 
823 	bool amOwner()
824 	{
825 		return m_task == Task.getThis();
826 	}
827 
828 	override bool readChanges(ref DirectoryChange[] dst, Duration timeout)
829 	{
830 		OVERLAPPED overlapped;
831 		overlapped.Internal = 0;
832 		overlapped.InternalHigh = 0;
833 		overlapped.Offset = 0;
834 		overlapped.OffsetHigh = 0;
835 		overlapped.hEvent = cast(HANDLE)cast(void*)this;
836 
837 		m_bytesTransferred = 0;
838 		DWORD bytesReturned;
839 		if( !ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, m_recursive,
840 								   m_notifications, &bytesReturned, &overlapped, &onIOCompleted) )
841 		{
842 			logError("Failed to read directory changes in '%s'", m_path);
843 			return false;
844 		}
845 
846 		// FIXME: obey timeout!
847 		assert(timeout.isNegative());
848 		while( !m_bytesTransferred ) m_core.yieldForEvent();
849 
850 		ubyte[] result = m_buffer[0 .. m_bytesTransferred];
851 		do {
852 			assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof);
853 			auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr;
854 			DirectoryChangeType kind;
855 			switch( fni.Action ){
856 				default: kind = DirectoryChangeType.modified; break;
857 				case 0x1: kind = DirectoryChangeType.added; break;
858 				case 0x2: kind = DirectoryChangeType.removed; break;
859 				case 0x3: kind = DirectoryChangeType.modified; break;
860 				case 0x4: kind = DirectoryChangeType.removed; break;
861 				case 0x5: kind = DirectoryChangeType.added; break;
862 			}
863 			string filename = to!string(fni.FileName[0 .. fni.FileNameLength/2]);
864 			dst ~= DirectoryChange(kind, Path(filename));
865 			//logTrace("File changed: %s", fni.FileName.ptr[0 .. fni.FileNameLength/2]);
866 			if( fni.NextEntryOffset == 0 ) break;
867 			result = result[fni.NextEntryOffset .. $];
868 		} while(result.length > 0);
869 
870 		return true;
871 	}
872 
873 	static nothrow extern(System)
874 	{
875 		void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
876 		{
877 			try {
878 				auto watcher = cast(Win32DirectoryWatcher)overlapped.hEvent;
879 				watcher.m_bytesTransferred = cbTransferred;
880 				if( watcher.m_task ){
881 					Exception ex;
882 					if( dwError != 0 ) ex = new Exception("Diretory watcher error: "~to!string(dwError));
883 					if( watcher.m_task ) watcher.m_core.resumeTask(watcher.m_task, ex);
884 				}
885 			} catch( UncaughtException th ){
886 				logWarn("Exception in directory watcher callback: %s", th.msg);
887 			}
888 		}
889 	}
890 }
891 
892 
893 /******************************************************************************/
894 /* class Win32UDPConnection                                                   */
895 /******************************************************************************/
896 
897 final class Win32UDPConnection : UDPConnection, SocketEventHandler {
898 @trusted:
899 	private {
900 		Task m_task;
901 		Win32EventDriver m_driver;
902 		SOCKET m_socket;
903 		NetworkAddress m_bindAddress;
904 		bool m_canBroadcast;
905 	}
906 
907 	this(Win32EventDriver driver, SOCKET sock, NetworkAddress bind_addr)
908 	{
909 		m_driver = driver;
910 		m_socket = sock;
911 		m_bindAddress = bind_addr;
912 
913 		WSAAsyncSelect(sock, m_driver.m_hwnd, WM_USER_SOCKET, FD_READ|FD_WRITE|FD_CLOSE);
914 
915 		//bind...
916 	}
917 
918 	@property SOCKET socket() { return m_socket; }
919 
920 	override @property string bindAddress() const {
921 		// NOTE: using WSAAddressToStringW instead of inet_ntop because that is only available from Vista up
922 		wchar[64] buf;
923 		DWORD buf_len = 64;
924 		WSAAddressToStringW(m_bindAddress.sockAddr, m_bindAddress.sockAddrLen, null, buf.ptr, &buf_len);
925 		auto ret = to!string(buf[0 .. buf_len]);
926 		ret = ret[0 .. ret.lastIndexOf(':')]; // strip the port number
927 		return ret;
928 	}
929 
930 	override @property NetworkAddress localAddress() const { return m_bindAddress; }
931 
932 	override @property bool canBroadcast() const { return m_canBroadcast; }
933 	override @property void canBroadcast(bool val)
934 	{
935 		int tmp_broad = val;
936 		socketEnforce(setsockopt(m_socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof) == 0,
937 				"Failed to change the socket broadcast flag");
938 		m_canBroadcast = val;
939 	}
940 
941 	override void close()
942 	{
943 		if (m_socket == INVALID_SOCKET) return;
944 		closesocket(m_socket);
945 		m_socket = INVALID_SOCKET;
946 	}
947 
948 	bool amOwner() {
949 		return m_task != Task() && m_task == Task.getThis();
950 	}
951 
952 	void acquire()
953 	{
954 		assert(m_task == Task(), "Trying to acquire a TCP connection that is currently owned.");
955 		m_task = Task.getThis();
956 	}
957 
958 	void release()
959 	{
960 		assert(m_task != Task(), "Trying to release a TCP connection that is not owned.");
961 		assert(m_task == Task.getThis(), "Trying to release a foreign TCP connection.");
962 		m_task = Task();
963 	}
964 
965 	override void connect(string host, ushort port)
966 	{
967 		NetworkAddress addr = m_driver.resolveHost(host, m_bindAddress.family);
968 		addr.port = port;
969 		connect(addr);
970 	}
971 	override void connect(NetworkAddress addr)
972 	{
973 		socketEnforce(.connect(m_socket, addr.sockAddr, addr.sockAddrLen) == 0, "Failed to connect UDP socket");
974 	}
975 
976 	override void send(in ubyte[] data, in NetworkAddress* peer_address = null)
977 	{
978 		assert(data.length <= int.max);
979 		sizediff_t ret;
980 		if( peer_address ){
981 			ret = .sendto(m_socket, data.ptr, cast(int)data.length, 0, peer_address.sockAddr, peer_address.sockAddrLen);
982 		} else {
983 			ret = .send(m_socket, data.ptr, cast(int)data.length, 0);
984 		}
985 		logTrace("send ret: %s, %s", ret, WSAGetLastError());
986 		socketEnforce(ret >= 0, "Error sending UDP packet");
987 		enforce(ret == data.length, "Unable to send full packet.");
988 	}
989 
990 	override ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null)
991 	{
992 		return recv(Duration.max, buf, peer_address);
993 	}
994 
995 	override ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null)
996 	{
997 		size_t tm;
998 		if (timeout != Duration.max && timeout > 0.seconds) {
999 			tm = m_driver.createTimer(null);
1000 			m_driver.rearmTimer(tm, timeout, false);
1001 			m_driver.acquireTimer(tm);
1002 		}
1003 
1004 		acquire();
1005 		scope(exit) {
1006 			release();
1007 			if (tm != size_t.max) m_driver.releaseTimer(tm);
1008 		}
1009 
1010 		assert(buf.length <= int.max);
1011 		if( buf.length == 0 ) buf.length = 65507;
1012 		NetworkAddress from;
1013 		from.family = m_bindAddress.family;
1014 		while(true){
1015 			socklen_t addr_len = from.sockAddrLen;
1016 			auto ret = .recvfrom(m_socket, buf.ptr, cast(int)buf.length, 0, from.sockAddr, &addr_len);
1017 			if( ret > 0 ){
1018 				if( peer_address ) *peer_address = from;
1019 				return buf[0 .. ret];
1020 			}
1021 			if( ret < 0 ){
1022 				auto err = WSAGetLastError();
1023 				logDebug("UDP recv err: %s", err);
1024 				socketEnforce(err == WSAEWOULDBLOCK, "Error receiving UDP packet");
1025 
1026 				if (timeout != Duration.max) {
1027 					enforce(timeout > 0.seconds && m_driver.isTimerPending(tm), "UDP receive timeout.");
1028 				}
1029 			}
1030 			m_driver.m_core.yieldForEvent();
1031 		}
1032 	}
1033 
1034 	void notifySocketEvent(SOCKET sock, WORD event, WORD error)
1035 	{
1036 		assert(false);
1037 	}
1038 
1039 	void addMembership(ref NetworkAddress multiaddr)
1040 	{
1041 		assert(false, "TODO!");
1042 	}
1043 
1044 	@property void multicastLoopback(bool loop)
1045 	{
1046 		assert(false, "TODO!");
1047 	}
1048 
1049 	private static nothrow extern(C) void onUDPRead(SOCKET sockfd, short evts, void* arg)
1050 	{
1051 		/*auto ctx = cast(TCPContext*)arg;
1052 		logTrace("udp socket %d read event!", ctx.socketfd);
1053 
1054 		try {
1055 			auto f = ctx.task;
1056 			if( f && f.state != Fiber.State.TERM )
1057 				ctx.core.resumeTask(f);
1058 		} catch( UncaughtException e ){
1059 			logError("Exception onUDPRead: %s", e.msg);
1060 			debug assert(false);
1061 		}*/
1062 	}
1063 }
1064 
1065 
1066 /******************************************************************************/
1067 /* class Win32TCPConnection                                                   */
1068 /******************************************************************************/
1069 
1070 enum ConnectionStatus { Initialized, Connected, Disconnected }
1071 
1072 final class Win32TCPConnection : TCPConnection, SocketEventHandler {
1073 @trusted:
1074 	private {
1075 		Win32EventDriver m_driver;
1076 		Task m_readOwner;
1077 		Task m_writeOwner;
1078 		bool m_tcpNoDelay;
1079 		Duration m_readTimeout;
1080 		bool m_keepAlive;
1081 		SOCKET m_socket;
1082 		NetworkAddress m_localAddress;
1083 		NetworkAddress m_peerAddress;
1084 		string m_peerAddressString;
1085 		DWORD m_bytesTransferred;
1086 		ConnectionStatus m_status;
1087 		FixedRingBuffer!(ubyte, 64*1024) m_readBuffer;
1088 		void delegate(TCPConnection) m_connectionCallback;
1089 		Exception m_exception;
1090 
1091 		HANDLE m_transferredFile;
1092 		OVERLAPPED m_fileOverlapped;
1093 	}
1094 
1095 	this(Win32EventDriver driver, SOCKET sock, NetworkAddress peer_address, ConnectionStatus status = ConnectionStatus.Initialized)
1096 	{
1097 		m_driver = driver;
1098 		m_socket = sock;
1099 		m_driver.m_socketHandlers[sock] = this;
1100 		m_status = status;
1101 
1102 		m_localAddress.family = peer_address.family;
1103 		if (peer_address.family == AF_INET) m_localAddress.sockAddrInet4.sin_addr.s_addr = 0;
1104 		else m_localAddress.sockAddrInet6.sin6_addr.s6_addr[] = 0;
1105 		socklen_t balen = m_localAddress.sockAddrLen;
1106 		socketEnforce(getsockname(sock, m_localAddress.sockAddr, &balen) == 0, "getsockname failed");
1107 
1108 		m_peerAddress = peer_address;
1109 
1110 		// NOTE: using WSAAddressToStringW instead of inet_ntop because that is only available from Vista up
1111 		wchar[64] buf;
1112 		DWORD buflen = buf.length;
1113 		socketEnforce(WSAAddressToStringW(m_peerAddress.sockAddr, m_peerAddress.sockAddrLen, null, buf.ptr, &buflen) == 0, "Failed to get string representation of peer address");
1114 		m_peerAddressString = to!string(buf[0 .. buflen]);
1115 		m_peerAddressString = m_peerAddressString[0 .. m_peerAddressString.lastIndexOf(':')]; // strip the port number
1116 
1117 		// setup overlapped structure for copy-less file sending
1118 		m_fileOverlapped.Internal = 0;
1119 		m_fileOverlapped.InternalHigh = 0;
1120 		m_fileOverlapped.Offset = 0;
1121 		m_fileOverlapped.OffsetHigh = 0;
1122 		m_fileOverlapped.hEvent = m_driver.m_fileCompletionEvent;
1123 
1124 		WSAAsyncSelect(sock, m_driver.m_hwnd, WM_USER_SOCKET, FD_READ|FD_WRITE|FD_CONNECT|FD_CLOSE);
1125 	}
1126 
1127 	~this()
1128 	{
1129 		/*if( m_socket != -1 ){
1130 			closesocket(m_socket);
1131 		}*/
1132 	}
1133 
1134 	@property SOCKET socket() { return m_socket; }
1135 
1136 	private void connect(NetworkAddress addr)
1137 	{
1138 		enforce(m_status != ConnectionStatus.Connected, "Connection is already established.");
1139 		acquire();
1140 		scope(exit) release();
1141 
1142 		auto ret = .connect(m_socket, addr.sockAddr, addr.sockAddrLen);
1143 		//enforce(WSAConnect(m_socket, addr.sockAddr, addr.sockAddrLen, null, null, null, null), "Failed to connect to host");
1144 
1145 		if (ret != 0) {
1146 			auto err = WSAGetLastError();
1147 			logDebugV("connect err: %s", err);
1148 			import std.string;
1149 			socketEnforce(err == WSAEWOULDBLOCK, "Connect call failed");
1150 			while (m_status != ConnectionStatus.Connected) {
1151 				m_driver.m_core.yieldForEvent();
1152 				if (m_exception) throw m_exception;
1153 			}
1154 		}
1155 		assert(m_status == ConnectionStatus.Connected);
1156 	}
1157 
1158 	void release()
1159 	{
1160 		assert(m_readOwner == Task.getThis() && m_readOwner == m_writeOwner, "Releasing TCP connection that is not owned by the calling task.");
1161 		m_readOwner = m_writeOwner = Task();
1162 	}
1163 
1164 	void acquire()
1165 	{
1166 		assert(m_readOwner == Task() && m_writeOwner == Task(), "Acquiring TCP connection that is currently owned.");
1167 		m_readOwner = m_writeOwner = Task.getThis();
1168 	}
1169 
1170 	bool amOwner() { return Task.getThis() == m_readOwner && m_readOwner == m_writeOwner; }
1171 
1172 	override @property void tcpNoDelay(bool enabled)
1173 	{
1174 		m_tcpNoDelay = enabled;
1175 		BOOL eni = enabled;
1176 		setsockopt(m_socket, IPPROTO_TCP, TCP_NODELAY, &eni, eni.sizeof);
1177 	}
1178 	override @property bool tcpNoDelay() const { return m_tcpNoDelay; }
1179 
1180 	override @property void readTimeout(Duration v)
1181 	{
1182 		m_readTimeout = v;
1183 		auto msecs = v.total!"msecs"();
1184 		assert(msecs < DWORD.max);
1185 		DWORD vdw = cast(DWORD)msecs;
1186 		setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, &vdw, vdw.sizeof);
1187 	}
1188 	override @property Duration readTimeout() const { return m_readTimeout; }
1189 
1190 	override @property void keepAlive(bool enabled)
1191 	{
1192 		m_keepAlive = enabled;
1193 		BOOL eni = enabled;
1194 		setsockopt(m_socket, SOL_SOCKET, SO_KEEPALIVE, &eni, eni.sizeof);
1195 	}
1196 	override @property bool keepAlive() const { return m_keepAlive; }
1197 
1198 	override @property bool connected() const { return m_status == ConnectionStatus.Connected; }
1199 
1200 	override @property string peerAddress() const { return m_peerAddressString; }
1201 
1202 	override @property NetworkAddress localAddress() const { return m_localAddress; }
1203 	override @property NetworkAddress remoteAddress() const { return m_peerAddress; }
1204 
1205 	override @property bool empty() { return leastSize == 0; }
1206 
1207 	override @property ulong leastSize()
1208 	{
1209 		acquireReader();
1210 		scope(exit) releaseReader();
1211 
1212 		while( m_readBuffer.empty ){
1213 			if( !connected ) return 0;
1214 			m_driver.m_core.yieldForEvent();
1215 		}
1216 		return m_readBuffer.length;
1217 	}
1218 
1219 	override @property bool dataAvailableForRead()
1220 	{
1221 		acquireReader();
1222 		scope(exit) releaseReader();
1223 		return !m_readBuffer.empty;
1224 	}
1225 
1226 	override void close()
1227 	{
1228 		acquire();
1229 		scope(exit) release();
1230 		WSASendDisconnect(m_socket, null);
1231 		closesocket(m_socket);
1232 		m_socket = -1;
1233 		m_status = ConnectionStatus.Disconnected;
1234 	}
1235 
1236 	override bool waitForData(Duration timeout)
1237 	{
1238 		if (timeout == 0.seconds)
1239 			logDebug("Warning: use Duration.max as an argument to waitForData() to wait infinitely, not 0.seconds.");
1240 
1241 		acquireReader();
1242 		scope(exit) releaseReader();
1243 		if (timeout != Duration.max && timeout != 0.seconds) {
1244 			auto tm = m_driver.createTimer(null);
1245 			scope(exit) m_driver.releaseTimer(tm);
1246 			m_driver.m_timers.getUserData(tm).owner = Task.getThis();
1247 			m_driver.rearmTimer(tm, timeout, false);
1248 			while (m_readBuffer.empty) {
1249 				if (!connected) return false;
1250 				m_driver.m_core.yieldForEvent();
1251 				if (!m_driver.isTimerPending(tm)) return false;
1252 			}
1253 		} else {
1254 			while (m_readBuffer.empty) {
1255 				if (!connected) return false;
1256 				m_driver.m_core.yieldForEvent();
1257 			}
1258 		}
1259 		return true;
1260 	}
1261 
1262 	override const(ubyte)[] peek()
1263 	{
1264 		acquireReader();
1265 		scope(exit) releaseReader();
1266 		return m_readBuffer.peek();
1267 	}
1268 
1269 	override size_t read(scope ubyte[] dst, IOMode)
1270 	{
1271 		acquireReader();
1272 		scope(exit) releaseReader();
1273 
1274 		size_t nbytes = 0;
1275 		while (dst.length > 0) {
1276 			while( m_readBuffer.empty ){
1277 				checkConnected();
1278 				m_driver.m_core.yieldForEvent();
1279 			}
1280 			size_t amt = min(dst.length, m_readBuffer.length);
1281 
1282 			m_readBuffer.read(dst[0 .. amt]);
1283 			dst = dst[amt .. $];
1284 			nbytes += amt;
1285 		}
1286 
1287 		return nbytes;
1288 	}
1289 
1290 	override size_t write(in ubyte[] bytes_, IOMode)
1291 	{
1292 		acquireWriter();
1293 		scope(exit) releaseWriter();
1294 
1295 		checkConnected();
1296 		const(ubyte)[] bytes = bytes_;
1297 		logTrace("TCP write with %s bytes called", bytes.length);
1298 
1299 		WSAOVERLAPPEDX overlapped;
1300 		overlapped.Internal = 0;
1301 		overlapped.InternalHigh = 0;
1302 		overlapped.Offset = 0;
1303 		overlapped.OffsetHigh = 0;
1304 		overlapped.hEvent = cast(HANDLE)cast(void*)this;
1305 
1306 		size_t nbytes = 0;
1307 		while (bytes.length > 0) {
1308 			WSABUF buf;
1309 			buf.len = bytes.length;
1310 			buf.buf = cast(ubyte*)bytes.ptr;
1311 
1312 			m_bytesTransferred = 0;
1313 			logTrace("Sending %s bytes TCP", buf.len);
1314 			auto ret = WSASend(m_socket, &buf, 1, null, 0, &overlapped, &onIOWriteCompleted);
1315 			if( ret == SOCKET_ERROR ){
1316 				auto err = WSAGetLastError();
1317 				socketEnforce(err == WSA_IO_PENDING, "Failed to send data");
1318 			}
1319 			while( !m_bytesTransferred ) m_driver.m_core.yieldForEvent();
1320 
1321 			assert(m_bytesTransferred <= bytes.length, "More data sent than requested!?");
1322 			bytes = bytes[m_bytesTransferred .. $];
1323 			nbytes += m_bytesTransferred;
1324 		}
1325 		return nbytes;
1326 
1327 	}
1328 
1329 	override void flush()
1330 	{
1331 		acquireWriter();
1332 		scope(exit) releaseWriter();
1333 
1334 		checkConnected();
1335 	}
1336 
1337 	override void finalize()
1338 	{
1339 		flush();
1340 	}
1341 
1342 	void writeFile(Path filename)
1343 	{
1344 		auto fstream = m_driver.openFile(filename, FileMode.read);
1345 		enforce(fstream.size <= 1<<31);
1346 		acquireWriter();
1347 		m_bytesTransferred = 0;
1348 		m_driver.m_fileWriters[this] = true;
1349 		scope(exit) releaseWriter();
1350 		logDebug("Using sendfile! %s %s %s", fstream.m_handle, fstream.tell(), fstream.size);
1351 
1352 		if (TransmitFile(m_socket, fstream.m_handle, 0, 0, &m_fileOverlapped, null, 0))
1353 			m_bytesTransferred = 1;
1354 
1355 		socketEnforce(WSAGetLastError() == WSA_IO_PENDING, "Failed to send file over TCP.");
1356 
1357 		while (m_bytesTransferred < fstream.size) m_driver.m_core.yieldForEvent();
1358 	}
1359 
1360 	InputStream acquireReader() { assert(m_readOwner == Task()); m_readOwner = Task.getThis(); return this; }
1361 	void releaseReader() { assert(m_readOwner == Task.getThis()); m_readOwner = Task(); }
1362 	bool amReadOwner() const { return m_readOwner == Task.getThis(); }
1363 
1364 	OutputStream acquireWriter() { assert(m_writeOwner == Task()); m_writeOwner = Task.getThis(); return this; }
1365 	void releaseWriter() { assert(m_writeOwner == Task.getThis()); m_writeOwner = Task(); }
1366 	bool amWriteOwner() const { return m_writeOwner == Task.getThis(); }
1367 
1368 	private void checkConnected()
1369 	{
1370 		// TODO!
1371 	}
1372 
1373 	private bool testFileWritten()
1374 	{
1375 		if( !GetOverlappedResult(m_transferredFile, &m_fileOverlapped, &m_bytesTransferred, false) ){
1376 			if( GetLastError() != ERROR_IO_PENDING ){
1377 				auto ex = new Exception("File transfer over TCP failed.");
1378 				if (m_writeOwner != Task.init) {
1379 					m_driver.m_core.resumeTask(m_writeOwner, ex);
1380 					return true;
1381 				} else throw ex;
1382 			}
1383 			return false;
1384 		} else {
1385 			if (m_writeOwner != Task.init) m_driver.m_core.resumeTask(m_writeOwner);
1386 			return true;
1387 		}
1388 	}
1389 
1390 	void notifySocketEvent(SOCKET sock, WORD event, WORD error)
1391 	nothrow {
1392 		try {
1393 			logDebugV("Socket event for %s: %s, error: %s", sock, event, error);
1394 			if (m_socket == -1) {
1395 				logDebug("Event for already closed socket - ignoring");
1396 				return;
1397 			}
1398 			assert(sock == m_socket);
1399 			Exception ex;
1400 			switch(event){
1401 				default: break;
1402 				case FD_CONNECT: // doesn't seem to occur, but we handle it just in case
1403 					if (error) {
1404 						ex = new SystemSocketException("Failed to connect to host", error);
1405 						m_status = ConnectionStatus.Disconnected;
1406 					} else m_status = ConnectionStatus.Connected;
1407 					if (m_writeOwner) m_driver.m_core.resumeTask(m_writeOwner, ex);
1408 					break;
1409 				case FD_READ:
1410 					logTrace("TCP read event");
1411 					while (m_readBuffer.freeSpace > 0) {
1412 						auto dst = m_readBuffer.peekDst();
1413 						assert(dst.length <= int.max);
1414 						logTrace("Try to read up to %s bytes", dst.length);
1415 						auto ret = .recv(m_socket, dst.ptr, cast(int)dst.length, 0);
1416 						if (ret >= 0) {
1417 							logTrace("received %s bytes", ret);
1418 							if( ret == 0 ) break;
1419 							m_readBuffer.putN(ret);
1420 						} else {
1421 							auto err = WSAGetLastError();
1422 							if( err != WSAEWOULDBLOCK ){
1423 								logTrace("receive error %s", err);
1424 								ex = new SystemSocketException("Error reading data from socket", error);
1425 							}
1426 							break;
1427 						}
1428 					}
1429 
1430 					//m_driver.m_core.resumeTask(m_readOwner, ex);
1431 					/*WSABUF buf;
1432 					buf.len = dst.length;
1433 					buf.buf = dst.ptr;
1434 					DWORD flags = 0;
1435 
1436 					WSAOVERLAPPEDX overlapped;
1437 					overlapped.Internal = 0;
1438 					overlapped.InternalHigh = 0;
1439 					overlapped.Offset = 0;
1440 					overlapped.OffsetHigh = 0;
1441 					overlapped.hEvent = cast(HANDLE)cast(void*)this;
1442 
1443 					m_bytesTransferred = 0;
1444 					auto ret = WSARecv(m_socket, &buf, 1, null, &flags, &overlapped, &onIOCompleted);
1445 					if( ret == SOCKET_ERROR ){
1446 						auto err = WSAGetLastError();
1447 						socketEnforce(err == WSA_IO_PENDING, "Failed to receive data");
1448 					}
1449 					while( !m_bytesTransferred ) m_driver.m_core.yieldForEvent();
1450 
1451 					assert(m_bytesTransferred <= dst.length, "More data received than requested!?");
1452 					m_readBuffer.pushN(m_bytesTransferred);*/
1453 					if (m_readOwner) m_driver.m_core.resumeTask(m_readOwner, ex);
1454 					break;
1455 				case FD_WRITE:
1456 					if (m_status == ConnectionStatus.Initialized) {
1457 						if( error ){
1458 							ex = new SystemSocketException("Failed to connect to host", error);
1459 						} else m_status = ConnectionStatus.Connected;
1460 					}
1461 					if (m_writeOwner) m_driver.m_core.resumeTask(m_writeOwner, ex);
1462 					break;
1463 				case FD_CLOSE:
1464 					if (error) {
1465 						if (m_status == ConnectionStatus.Initialized) {
1466 							ex = new SystemSocketException("Failed to connect to host", error);
1467 						} else {
1468 							ex = new SystemSocketException("The connection was closed with an error", error);
1469 						}
1470 					} else {
1471 						m_status = ConnectionStatus.Disconnected;
1472 						closesocket(m_socket);
1473 						m_socket = -1;
1474 					}
1475 					if (m_writeOwner) m_driver.m_core.resumeTask(m_writeOwner, ex);
1476 					break;
1477 			}
1478 
1479 			if (ex) m_exception = ex;
1480 		} catch( UncaughtException th ){
1481 			logWarn("Exception while handling socket event: %s", th.msg);
1482 		}
1483 	}
1484 
1485 	private void runConnectionCallback(TCPListenOptions options)
1486 	{
1487 		try {
1488 			m_connectionCallback(this);
1489 			logDebug("task out (fd %d).", m_socket);
1490 		} catch( Exception e ){
1491 			logWarn("Handling of connection failed: %s", e.msg);
1492 			logDiagnostic("%s", e.toString());
1493 		} finally {
1494 			if (!(options & TCPListenOptions.disableAutoClose) && this.connected) close();
1495 		}
1496 	}
1497 
1498 	private static extern(System) nothrow
1499 	void onIOWriteCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags)
1500 	{
1501 		logTrace("IO completed for TCP send: %s (error=%s)", cbTransferred, dwError);
1502 		try {
1503 			auto conn = cast(Win32TCPConnection)(lpOverlapped.hEvent);
1504 			conn.m_bytesTransferred = cbTransferred;
1505 			if (conn.m_writeOwner != Task.init) {
1506 				Exception ex;
1507 				if( dwError != 0 ) ex = new Exception("Socket I/O error: "~to!string(dwError));
1508 				conn.m_driver.m_core.resumeTask(conn.m_writeOwner, ex);
1509 			}
1510 		} catch( UncaughtException th ){
1511 			logWarn("Exception while handling TCP I/O: %s", th.msg);
1512 		}
1513 	}
1514 }
1515 
1516 /******************************************************************************/
1517 /* class Win32TCPListener                                                     */
1518 /******************************************************************************/
1519 
1520 final class Win32TCPListener : TCPListener, SocketEventHandler {
1521 @trusted:
1522 	private {
1523 		Win32EventDriver m_driver;
1524 		SOCKET m_socket;
1525 		NetworkAddress m_bindAddress;
1526 		void delegate(TCPConnection conn) m_connectionCallback;
1527 		TCPListenOptions m_options;
1528 	}
1529 
1530 	this(Win32EventDriver driver, SOCKET sock, NetworkAddress bind_addr, void delegate(TCPConnection conn) @safe conn_callback, TCPListenOptions options)
1531 	{
1532 		m_driver = driver;
1533 		m_socket = sock;
1534 		m_bindAddress = bind_addr;
1535 		m_connectionCallback = conn_callback;
1536 		m_driver.m_socketHandlers[sock] = this;
1537 		m_options = options;
1538 
1539 		WSAAsyncSelect(sock, m_driver.m_hwnd, WM_USER_SOCKET, FD_ACCEPT);
1540 	}
1541 
1542 	override @property NetworkAddress bindAddress()
1543 	{
1544 		return m_bindAddress;
1545 	}
1546 
1547 	override void stopListening()
1548 	{
1549 		if( m_socket == -1 ) return;
1550 		closesocket(m_socket);
1551 		m_socket = -1;
1552 	}
1553 
1554 	SOCKET socket() nothrow { return m_socket; }
1555 
1556 	void notifySocketEvent(SOCKET sock, WORD event, WORD error)
1557 	nothrow {
1558 		assert(sock == m_socket);
1559 		switch(event){
1560 			default: assert(false);
1561 			case FD_ACCEPT:
1562 				try {
1563 					NetworkAddress addr;
1564 					addr.family = AF_INET6;
1565 					int addrlen = addr.sockAddrLen;
1566 					auto clientsock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0);
1567 					assert(addrlen == addr.sockAddrLen);
1568 					// TODO avoid GC allocations for delegate and Win32TCPConnection
1569 					auto conn = new Win32TCPConnection(m_driver, clientsock, addr, ConnectionStatus.Connected);
1570 					conn.m_connectionCallback = m_connectionCallback;
1571 					runTask(&conn.runConnectionCallback, m_options);
1572 				} catch( Exception e ){
1573 					logWarn("Exception white accepting TCP connection: %s", e.msg);
1574 					try logDiagnostic("Exception white accepting TCP connection: %s", e.toString());
1575 					catch( Exception ){}
1576 				}
1577 				break;
1578 		}
1579 	}
1580 }
1581 
1582 
1583 private {
1584 	struct TimerMapTraits {
1585 		enum clearValue = UINT_PTR.max;
1586 		static bool equals(UINT_PTR a, UINT_PTR b) { return a == b; }
1587 	}
1588 	__gshared s_setupWindowClass = false;
1589 }
1590 
1591 void setupWindowClass() nothrow
1592 @trusted {
1593 	if( s_setupWindowClass ) return;
1594 	WNDCLASSA wc;
1595 	wc.lpfnWndProc = &Win32EventDriver.onMessage;
1596 	wc.lpszClassName = "VibeWin32MessageWindow";
1597 	RegisterClassA(&wc);
1598 	s_setupWindowClass = true;
1599 }
1600 
1601 version (VibeDebugCatchAll) private alias UncaughtException = Throwable;
1602 else private alias UncaughtException = Exception;
1603 
1604 } // version(VibeWin32Driver)