1 /** 2 Driver implementation for Win32 using WSAAsyncSelect 3 4 See_Also: 5 `vibe.core.driver` = interface definition 6 7 Copyright: © 2012-2015 Sönke Ludwig 8 Authors: Sönke Ludwig, Leonid Kramer 9 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 10 */ 11 module vibe.core.drivers.win32; 12 13 version(VibeWin32Driver) 14 { 15 16 import vibe.core.core; 17 import vibe.core.driver; 18 import vibe.core.drivers.timerqueue; 19 import vibe.core.drivers.utils; 20 import vibe.core.log; 21 import vibe.internal.win32; 22 import vibe.internal.meta.traits : synchronizedIsNothrow; 23 import vibe.utils.array; 24 import vibe.utils.hashmap; 25 26 import core.atomic; 27 import core.sync.mutex; 28 import core.sys.windows.windows; 29 import core.time; 30 import core.thread; 31 import std.algorithm; 32 import std.conv; 33 import std.datetime; 34 import std.exception; 35 import std.string : lastIndexOf; 36 import std.typecons; 37 import std.utf; 38 39 import core.sys.windows.windows; 40 import core.sys.windows.winsock2; 41 42 enum WM_USER_SIGNAL = WM_USER+101; 43 enum WM_USER_SOCKET = WM_USER+102; 44 45 pragma(lib, "wsock32"); 46 pragma(lib, "ws2_32"); 47 48 /******************************************************************************/ 49 /* class Win32EventDriver */ 50 /******************************************************************************/ 51 52 final class Win32EventDriver : EventDriver { 53 @trusted: 54 import std.container : Array, BinaryHeap, heapify; 55 import std.datetime : Clock; 56 57 private { 58 HWND m_hwnd; 59 DWORD m_tid; 60 DriverCore m_core; 61 bool m_exit = false; 62 SocketEventHandler[SOCKET] m_socketHandlers; 63 HANDLE[] m_registeredEvents; 64 HANDLE m_fileCompletionEvent; 65 bool[Win32TCPConnection] m_fileWriters; 66 67 TimerQueue!TimerInfo m_timers; 68 } 69 70 this(DriverCore core) 71 { 72 setupWindowClass(); 73 74 m_core = core; 75 m_tid = GetCurrentThreadId(); 76 m_hwnd = CreateWindowA("VibeWin32MessageWindow", "VibeWin32MessageWindow", 0, 0,0,0,0, HWND_MESSAGE,null,null,null); 77 78 SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, cast(ULONG_PTR)cast(void*)this); 79 assert(cast(Win32EventDriver)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is this); 80 81 WSADATA wd; 82 enforce(WSAStartup(0x0202, &wd) == 0, "Failed to initialize WinSock"); 83 84 m_fileCompletionEvent = CreateEventW(null, false, false, null); 85 m_registeredEvents ~= m_fileCompletionEvent; 86 } 87 88 override void dispose() 89 { 90 // DestroyWindow(m_hwnd); 91 } 92 93 override int runEventLoop() 94 { 95 void removePendingQuitMessages() @trusted { 96 MSG msg; 97 while (PeekMessageW(&msg, null, WM_QUIT, WM_QUIT, PM_REMOVE)) {} 98 } 99 100 // clear all possibly outstanding WM_QUIT messages to avoid 101 // them having an influence this runEventLoop() 102 removePendingQuitMessages(); 103 104 m_exit = false; 105 while (!m_exit && haveEvents()) 106 runEventLoopOnce(); 107 108 // remove quit messages here to avoid them having an influence on 109 // processEvets or runEventLoopOnce 110 removePendingQuitMessages(); 111 return 0; 112 } 113 114 override int runEventLoopOnce() 115 { 116 doProcessEvents(INFINITE); 117 return 0; 118 } 119 120 override bool processEvents() 121 { 122 return doProcessEvents(0); 123 } 124 125 bool doProcessEvents(uint timeout_msecs) 126 @trusted { 127 assert(m_tid == GetCurrentThreadId()); 128 129 waitForEvents(timeout_msecs); 130 131 processTimers(); 132 133 MSG msg; 134 //uint cnt = 0; 135 while (PeekMessageW(&msg, null, 0, 0, PM_REMOVE)) { 136 if( msg.message == WM_QUIT ) { 137 m_exit = true; 138 return false; 139 } 140 if( msg.message == WM_USER_SIGNAL ) 141 msg.hwnd = m_hwnd; 142 TranslateMessage(&msg); 143 DispatchMessageW(&msg); 144 145 // process timers every now and then so that they don't get stuck 146 //if (++cnt % 10 == 0) processTimers(); 147 } 148 149 if (timeout_msecs != 0) m_core.notifyIdle(); 150 151 return true; 152 } 153 154 private bool haveEvents() 155 @safe { 156 version(VibePartialAutoExit) 157 return !m_fileWriters.byKey.empty || !m_socketHandlers.byKey.empty; 158 else return true; 159 } 160 161 private void waitForEvents(uint timeout_msecs) 162 { 163 // if timers are pending, limit the wait time to the first timer timeout 164 auto next_timer = m_timers.getFirstTimeout(); 165 if (timeout_msecs > 0 && next_timer != SysTime.max) { 166 auto now = Clock.currStdTime(); 167 auto timer_timeout = (next_timer.stdTime - now) / 10_000; 168 if (timeout_msecs == INFINITE || timer_timeout < timeout_msecs) 169 timeout_msecs = cast(uint)(timer_timeout < 0 ? 0 : timer_timeout > uint.max ? uint.max : timer_timeout); 170 } 171 172 auto ret = MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, timeout_msecs, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); 173 if( ret == WAIT_OBJECT_0 ){ 174 Win32TCPConnection[] to_remove; 175 foreach( fw; m_fileWriters.byKey ) 176 if( fw.testFileWritten() ) 177 to_remove ~= fw; 178 foreach( fw; to_remove ) 179 m_fileWriters.remove(fw); 180 } 181 } 182 183 private void processTimers() 184 { 185 if (!m_timers.anyPending) return; 186 187 // process all timers that have expired up to now 188 auto now = Clock.currTime(UTC()); 189 m_timers.consumeTimeouts(now, (timer, periodic, ref data) { 190 Task owner = data.owner; 191 auto callback = data.callback; 192 if (!periodic) releaseTimer(timer); 193 if (owner && owner.running) m_core.resumeTask(owner); 194 if (callback) runTask(callback); 195 }); 196 } 197 198 override void exitEventLoop() 199 { 200 m_exit = true; 201 PostThreadMessageW(m_tid, WM_QUIT, 0, 0); 202 } 203 204 override Win32FileStream openFile(Path path, FileMode mode) 205 { 206 assert(m_tid == GetCurrentThreadId()); 207 return new Win32FileStream(m_core, path, mode); 208 } 209 210 override DirectoryWatcher watchDirectory(Path path, bool recursive) 211 { 212 assert(m_tid == GetCurrentThreadId()); 213 return new Win32DirectoryWatcher(m_core, path, recursive); 214 } 215 216 override NetworkAddress resolveHost(string host, ushort family = AF_UNSPEC, bool use_dns = true) 217 { 218 static immutable ushort[] addrfamilies = [AF_INET, AF_INET6]; 219 220 NetworkAddress addr; 221 foreach( af; addrfamilies ){ 222 if( family != af && family != AF_UNSPEC ) continue; 223 addr.family = af; 224 225 INT addrlen = addr.sockAddrLen; 226 auto ret = WSAStringToAddressW(toUTFz!(immutable(wchar)*)(host), af, null, addr.sockAddr, &addrlen); 227 if( ret != 0 ) continue; 228 assert(addrlen == addr.sockAddrLen); 229 return addr; 230 } 231 232 enforce(use_dns, "Invalid IP address string: "~host); 233 234 LookupStatus status; 235 status.task = Task.getThis(); 236 status.driver = this; 237 status.finished = false; 238 239 WSAOVERLAPPEDX overlapped; 240 overlapped.Internal = 0; 241 overlapped.InternalHigh = 0; 242 overlapped.hEvent = cast(HANDLE)cast(void*)&status; 243 244 version(none){ // Windows 8+ 245 void* aif; 246 ADDRINFOEXW addr_hint; 247 ADDRINFOEXW* addr_ret; 248 addr_hint.ai_family = family; 249 addr_hint.ai_socktype = SOCK_STREAM; 250 addr_hint.ai_protocol = IPPROTO_TCP; 251 252 enforce(GetAddrInfoExW(toUTFz!(immutable(wchar)*)(host), null, NS_DNS, null, &addr_hint, &addr_ret, null, &overlapped, &onDnsResult, null) == 0, "Failed to lookup host"); 253 while( !status.finished ) m_core.yieldForEvent(); 254 enforce(!status.error, "Failed to lookup host: "~to!string(status.error)); 255 256 aif = addr_ret; 257 addr.family = cast(ubyte)addr_ret.ai_family; 258 switch(addr.family){ 259 default: assert(false, "Invalid address family returned from host lookup."); 260 case AF_INET: addr.sockAddrInet4 = *cast(sockaddr_in*)addr_ret.ai_addr; break; 261 case AF_INET6: addr.sockAddrInet6 = *cast(sockaddr_in6*)addr_ret.ai_addr; break; 262 } 263 FreeAddrInfoExW(addr_ret); 264 } else { 265 auto he = gethostbyname(toUTFz!(immutable(char)*)(host)); 266 socketEnforce(he !is null, "Failed to look up host "~host); 267 addr.family = he.h_addrtype; 268 switch(addr.family){ 269 default: assert(false, "Invalid address family returned from host lookup."); 270 case AF_INET: addr.sockAddrInet4.sin_addr = *cast(in_addr*)he.h_addr_list[0]; break; 271 case AF_INET6: addr.sockAddrInet6.sin6_addr = *cast(in6_addr*)he.h_addr_list[0]; break; 272 } 273 } 274 275 return addr; 276 } 277 278 override Win32TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_addr) 279 { 280 assert(m_tid == GetCurrentThreadId()); 281 282 auto sock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 283 socketEnforce(sock != INVALID_SOCKET, "Failed to create socket"); 284 285 socketEnforce(bind(sock, bind_addr.sockAddr, bind_addr.sockAddrLen) == 0, "Failed to bind socket"); 286 287 auto conn = new Win32TCPConnection(this, sock, addr); 288 conn.connect(addr); 289 return conn; 290 } 291 292 override Win32TCPListener listenTCP(ushort port, void delegate(TCPConnection conn) @safe conn_callback, string bind_address, TCPListenOptions options) 293 { 294 assert(m_tid == GetCurrentThreadId()); 295 auto addr = resolveHost(bind_address); 296 addr.port = port; 297 298 auto sock = WSASocketW(addr.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 299 socketEnforce(sock != INVALID_SOCKET, "Failed to create socket"); 300 301 socketEnforce(bind(sock, addr.sockAddr, addr.sockAddrLen) == 0, 302 "Failed to bind listening socket"); 303 304 socketEnforce(listen(sock, 128) == 0, 305 "Failed to listen"); 306 307 socklen_t balen = addr.sockAddrLen; 308 socketEnforce(getsockname(sock, addr.sockAddr, &balen) == 0, "getsockname failed"); 309 310 // TODO: support TCPListenOptions.distribute 311 312 return new Win32TCPListener(this, sock, addr, conn_callback, options); 313 } 314 315 override Win32UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0") 316 { 317 assert(m_tid == GetCurrentThreadId()); 318 /*auto addr = resolveHost(bind_address); 319 addr.port = port;*/ 320 321 assert(false); 322 } 323 324 override Win32ManualEvent createManualEvent() 325 { 326 assert(m_tid == GetCurrentThreadId()); 327 return new Win32ManualEvent(this); 328 } 329 330 override FileDescriptorEvent createFileDescriptorEvent(int file_descriptor, FileDescriptorEvent.Trigger events, FileDescriptorEvent.Mode mode) 331 { 332 assert(false, "Not implemented."); 333 } 334 335 override size_t createTimer(void delegate() @safe callback) { return m_timers.create(TimerInfo(callback)); } 336 337 override void acquireTimer(size_t timer_id) { m_timers.getUserData(timer_id).refCount++; } 338 override void releaseTimer(size_t timer_id) 339 nothrow { 340 if (!--m_timers.getUserData(timer_id).refCount) 341 m_timers.destroy(timer_id); 342 } 343 344 override bool isTimerPending(size_t timer_id) { return m_timers.isPending(timer_id); } 345 346 override void rearmTimer(size_t timer_id, Duration dur, bool periodic) 347 { 348 if (!m_timers.isPending(timer_id)) 349 acquireTimer(timer_id); 350 m_timers.schedule(timer_id, dur, periodic); 351 } 352 353 override void stopTimer(size_t timer_id) 354 { 355 if (m_timers.isPending(timer_id)) 356 releaseTimer(timer_id); 357 m_timers.unschedule(timer_id); 358 } 359 360 override void waitTimer(size_t timer_id) 361 { 362 while (true) { 363 auto data = &m_timers.getUserData(timer_id); 364 assert(data.owner == Task.init, "Waiting for the same timer from multiple tasks is not supported."); 365 if (!m_timers.isPending(timer_id)) return; 366 data.owner = Task.getThis(); 367 scope (exit) m_timers.getUserData(timer_id).owner = Task.init; 368 m_core.yieldForEvent(); 369 } 370 } 371 372 373 static struct LookupStatus { 374 Task task; 375 DWORD error; 376 bool finished; 377 Win32EventDriver driver; 378 } 379 380 private static nothrow extern(System) 381 void onDnsResult(DWORD dwError, DWORD /*dwBytes*/, WSAOVERLAPPEDX* lpOverlapped) 382 { 383 auto stat = cast(LookupStatus*)cast(void*)lpOverlapped.hEvent; 384 stat.error = dwError; 385 stat.finished = true; 386 if( stat.task ) 387 try stat.driver.m_core.resumeTask(stat.task); 388 catch (UncaughtException th) logWarn("Resuming task for DNS lookup has thrown: %s", th.msg); 389 } 390 391 private static nothrow extern(System) 392 LRESULT onMessage(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam) 393 { 394 auto driver = cast(Win32EventDriver)cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA); 395 switch(msg){ 396 default: break; 397 case WM_USER_SIGNAL: 398 auto sig = cast(Win32ManualEvent)cast(void*)lparam; 399 Win32EventDriver[Task] lst; 400 try { 401 synchronized(sig.m_mutex) lst = sig.m_listeners.dup; 402 foreach( task, tid; lst ) 403 if( tid is driver && task ) 404 driver.m_core.resumeTask(task); 405 } catch(UncaughtException th){ 406 logWarn("Failed to resume signal listeners: %s", th.msg); 407 return 0; 408 } 409 return 0; 410 case WM_USER_SOCKET: 411 SOCKET sock = cast(SOCKET)wparam; 412 auto evt = LOWORD(lparam); 413 auto err = HIWORD(lparam); 414 auto ph = sock in driver.m_socketHandlers; 415 if( ph is null ){ 416 logWarn("Socket %s has no associated handler for event %s/%s", sock, evt, err); 417 } else ph.notifySocketEvent(sock, evt, err); 418 return 0; 419 } 420 return DefWindowProcA(wnd, msg, wparam, lparam); 421 } 422 } 423 424 interface SocketEventHandler { 425 SOCKET socket() nothrow; 426 void notifySocketEvent(SOCKET sock, WORD event, WORD error) nothrow; 427 } 428 429 private struct TimerInfo { 430 size_t refCount = 1; 431 void delegate() callback; 432 Task owner; 433 434 this(void delegate() callback) { this.callback = callback; } 435 } 436 437 438 /******************************************************************************/ 439 /* class Win32ManualEvent */ 440 /******************************************************************************/ 441 442 final class Win32ManualEvent : ManualEvent { 443 @trusted: 444 private { 445 core.sync.mutex.Mutex m_mutex; 446 Win32EventDriver m_driver; 447 Win32EventDriver[Task] m_listeners; 448 shared int m_emitCount = 0; 449 Task m_waiter; 450 bool m_timedOut; 451 } 452 453 this(Win32EventDriver driver) 454 nothrow { 455 scope (failure) assert(false); // Mutex.this() now nothrow < 2.070 456 m_mutex = new core.sync.mutex.Mutex; 457 m_driver = driver; 458 } 459 460 override void emit() 461 { 462 scope (failure) assert(false); // AA.opApply is not nothrow 463 /*auto newcnt =*/ atomicOp!"+="(m_emitCount, 1); 464 bool[Win32EventDriver] threads; 465 synchronized(m_mutex) 466 { 467 foreach( th; m_listeners ) 468 threads[th] = true; 469 } 470 foreach( th, _; threads ) 471 if( !PostMessageW(th.m_hwnd, WM_USER_SIGNAL, 0, cast(LPARAM)cast(void*)this) ) 472 logWarn("Failed to post thread message."); 473 } 474 475 override void wait() { wait(m_emitCount); } 476 override int wait(int reference_emit_count) { return doWait!true(reference_emit_count); } 477 override int wait(Duration timeout, int reference_emit_count) { return doWait!true(timeout, reference_emit_count); } 478 override int waitUninterruptible(int reference_emit_count) { return doWait!false(reference_emit_count); } 479 override int waitUninterruptible(Duration timeout, int reference_emit_count) { return doWait!false(timeout, reference_emit_count); } 480 481 void acquire() 482 nothrow { 483 static if (!synchronizedIsNothrow) 484 scope (failure) assert(0, "Internal error: function should be nothrow"); 485 486 synchronized(m_mutex) 487 { 488 m_listeners[Task.getThis()] = cast(Win32EventDriver)getEventDriver(); 489 } 490 } 491 492 void release() 493 nothrow { 494 static if (!synchronizedIsNothrow) 495 scope (failure) assert(0, "Internal error: function should be nothrow"); 496 497 auto self = Task.getThis(); 498 synchronized(m_mutex) 499 { 500 if( self in m_listeners ) 501 m_listeners.remove(self); 502 } 503 } 504 505 bool amOwner() 506 nothrow { 507 static if (!synchronizedIsNothrow) 508 scope (failure) assert(0, "Internal error: function should be nothrow"); 509 510 synchronized(m_mutex) 511 { 512 return (Task.getThis() in m_listeners) !is null; 513 } 514 } 515 516 override @property int emitCount() const { return atomicLoad(m_emitCount); } 517 518 private int doWait(bool INTERRUPTIBLE)(int reference_emit_count) 519 { 520 //logDebugV("Signal %s wait enter %s", cast(void*)this, reference_emit_count); 521 acquire(); 522 scope(exit) release(); 523 auto ec = atomicLoad(m_emitCount); 524 while( ec == reference_emit_count ){ 525 static if (INTERRUPTIBLE) m_driver.m_core.yieldForEvent(); 526 else m_driver.m_core.yieldForEventDeferThrow(); 527 ec = atomicLoad(m_emitCount); 528 } 529 //logDebugV("Signal %s wait leave %s", cast(void*)this, ec); 530 return ec; 531 } 532 533 private int doWait(bool INTERRUPTIBLE)(Duration timeout, int reference_emit_count = emitCount) 534 { 535 static if (!INTERRUPTIBLE) scope (failure) assert(false); // timer functions are still not nothrow 536 537 acquire(); 538 scope(exit) release(); 539 auto ec = atomicLoad(m_emitCount); 540 m_timedOut = false; 541 m_waiter = Task.getThis(); 542 auto timer = m_driver.createTimer(null); 543 scope(exit) m_driver.releaseTimer(timer); 544 m_driver.m_timers.getUserData(timer).owner = Task.getThis(); 545 m_driver.rearmTimer(timer, timeout, false); 546 while (ec == reference_emit_count && !m_driver.isTimerPending(timer)) { 547 static if (INTERRUPTIBLE) m_driver.m_core.yieldForEvent(); 548 else m_driver.m_core.yieldForEventDeferThrow(); 549 ec = atomicLoad(m_emitCount); 550 } 551 return ec; 552 } 553 } 554 555 556 /******************************************************************************/ 557 /* class Win32FileStream */ 558 /******************************************************************************/ 559 560 final class Win32FileStream : FileStream { 561 @trusted: 562 private { 563 Path m_path; 564 HANDLE m_handle; 565 FileMode m_mode; 566 DriverCore m_driver; 567 Task m_task; 568 ulong m_size; 569 ulong m_ptr = 0; 570 DWORD m_bytesTransferred; 571 } 572 573 this(DriverCore driver, Path path, FileMode mode) 574 { 575 m_path = path; 576 m_mode = mode; 577 m_driver = driver; 578 579 auto access = m_mode == FileMode.readWrite ? (GENERIC_WRITE | GENERIC_READ) : 580 (m_mode == FileMode.createTrunc || m_mode == FileMode.append)? GENERIC_WRITE : GENERIC_READ; 581 582 auto shareMode = m_mode == FileMode.read? FILE_SHARE_READ : 0; 583 584 auto creation = m_mode == FileMode.createTrunc? CREATE_ALWAYS : m_mode == FileMode.append? OPEN_ALWAYS : OPEN_EXISTING; 585 586 m_handle = CreateFileW( 587 toUTF16z(m_path.toNativeString()), 588 access, 589 shareMode, 590 null, 591 creation, 592 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, 593 null); 594 595 auto errorcode = GetLastError(); 596 enforce(m_handle != INVALID_HANDLE_VALUE, "Failed to open "~path.toNativeString()~": "~to!string(errorcode)); 597 if(mode == FileMode.createTrunc && errorcode == ERROR_ALREADY_EXISTS) 598 { 599 // truncate file 600 // TODO: seek to start pos? 601 BOOL ret = vibe.internal.win32.SetEndOfFile(m_handle); 602 errorcode = GetLastError(); 603 enforce(ret, "Failed to call SetFileEndPos for path "~path.toNativeString()~", Error: " ~ to!string(errorcode)); 604 } 605 606 long size; 607 auto succeeded = GetFileSizeEx(m_handle, &size); 608 enforce(succeeded); 609 m_size = size; 610 } 611 612 ~this() 613 { 614 close(); 615 } 616 617 void release() 618 { 619 assert(m_task == Task.getThis(), "Releasing FileStream that is not owned by the calling task."); 620 m_task = Task(); 621 } 622 623 void acquire() 624 { 625 assert(m_task == Task(), "Acquiring FileStream that is already owned."); 626 m_task = Task.getThis(); 627 } 628 629 bool amOwner() 630 { 631 return m_task == Task.getThis(); 632 } 633 634 override void close() 635 { 636 if(m_handle == INVALID_HANDLE_VALUE) 637 return; 638 CloseHandle(m_handle); 639 m_handle = INVALID_HANDLE_VALUE; 640 } 641 642 override ulong tell() { return m_ptr; } 643 644 override @property Path path() const { return m_path; } 645 646 override @property bool isOpen() const { return m_handle != INVALID_HANDLE_VALUE; } 647 648 override @property ulong size() const { return m_size; } 649 650 override @property bool readable() 651 const { 652 return m_mode != FileMode.append; 653 } 654 655 override @property bool writable() 656 const { 657 return m_mode == FileMode.append || m_mode == FileMode.createTrunc || m_mode == FileMode.readWrite; 658 } 659 660 override void seek(ulong offset) 661 { 662 m_ptr = offset; 663 } 664 665 666 override @property bool empty() const { assert(this.readable); return m_ptr >= m_size; } 667 override @property ulong leastSize() const { assert(this.readable); return m_size - m_ptr; } 668 override @property bool dataAvailableForRead(){ 669 return leastSize() > 0; 670 } 671 672 override const(ubyte)[] peek(){ 673 assert(false); 674 } 675 676 override size_t read(scope ubyte[] dst, IOMode) 677 { 678 assert(this.readable); 679 acquire(); 680 scope(exit) release(); 681 682 size_t nbytes = 0; 683 while (dst.length > 0) { 684 enforce(dst.length <= leastSize); 685 OVERLAPPED overlapped; 686 overlapped.Internal = 0; 687 overlapped.InternalHigh = 0; 688 overlapped.Offset = cast(uint)(m_ptr & 0xFFFFFFFF); 689 overlapped.OffsetHigh = cast(uint)(m_ptr >> 32); 690 overlapped.hEvent = cast(HANDLE)cast(void*)this; 691 m_bytesTransferred = 0; 692 693 auto to_read = min(dst.length, DWORD.max); 694 695 // request to write the data 696 ReadFileEx(m_handle, cast(void*)dst, to_read, &overlapped, &onIOCompleted); 697 698 // yield until the data is read 699 while( !m_bytesTransferred ) m_driver.yieldForEvent(); 700 701 assert(m_bytesTransferred <= to_read, "More bytes read than requested!?"); 702 dst = dst[m_bytesTransferred .. $]; 703 m_ptr += m_bytesTransferred; 704 nbytes += m_bytesTransferred; 705 } 706 707 return nbytes; 708 } 709 710 override size_t write(in ubyte[] bytes_, IOMode) 711 { 712 assert(this.writable, "File is not writable"); 713 acquire(); 714 scope(exit) release(); 715 716 const(ubyte)[] bytes = bytes_; 717 718 size_t nbytes = 0; 719 while (bytes.length > 0) { 720 OVERLAPPED overlapped; 721 overlapped.Internal = 0; 722 overlapped.InternalHigh = 0; 723 overlapped.Offset = cast(uint)(m_ptr & 0xFFFFFFFF); 724 overlapped.OffsetHigh = cast(uint)(m_ptr >> 32); 725 overlapped.hEvent = cast(HANDLE)cast(void*)this; 726 m_bytesTransferred = 0; 727 728 auto to_write = min(bytes.length, DWORD.max); 729 730 // request to write the data 731 WriteFileEx(m_handle, cast(void*)bytes, to_write, &overlapped, &onIOCompleted); 732 733 // yield until the data is written 734 while( !m_bytesTransferred ) m_driver.yieldForEvent(); 735 736 assert(m_bytesTransferred <= to_write, "More bytes written than requested!?"); 737 bytes = bytes[m_bytesTransferred .. $]; 738 m_ptr += m_bytesTransferred; 739 nbytes += m_bytesTransferred; 740 } 741 if(m_ptr > m_size) m_size = m_ptr; 742 743 return nbytes; 744 } 745 746 override void flush(){} 747 748 override void finalize(){} 749 750 private static extern(System) nothrow 751 void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped) 752 { 753 try { 754 auto fileStream = cast(Win32FileStream)(overlapped.hEvent); 755 fileStream.m_bytesTransferred = cbTransferred; 756 if( fileStream.m_task ){ 757 Exception ex; 758 if( dwError != 0 ) ex = new Exception("File I/O error: "~to!string(dwError)); 759 if( fileStream.m_task ) fileStream.m_driver.resumeTask(fileStream.m_task, ex); 760 } 761 } catch( UncaughtException e ){ 762 logWarn("Exception while handling file I/O: %s", e.msg); 763 } 764 } 765 } 766 767 768 /******************************************************************************/ 769 /* class Win32Directory Watcher */ 770 /******************************************************************************/ 771 772 final class Win32DirectoryWatcher : DirectoryWatcher { 773 @trusted: 774 private { 775 Path m_path; 776 bool m_recursive; 777 HANDLE m_handle; 778 DWORD m_bytesTransferred; 779 DriverCore m_core; 780 ubyte[16384] m_buffer; 781 UINT m_notifications = FILE_NOTIFY_CHANGE_FILE_NAME|FILE_NOTIFY_CHANGE_DIR_NAME| 782 FILE_NOTIFY_CHANGE_SIZE|FILE_NOTIFY_CHANGE_LAST_WRITE; 783 Task m_task; 784 } 785 786 this(DriverCore core, Path path, bool recursive) 787 { 788 m_core = core; 789 m_path = path; 790 m_recursive = recursive; 791 m_task = Task.getThis(); 792 793 auto pstr = m_path.toString(); 794 m_handle = CreateFileW(toUTFz!(const(wchar)*)(pstr), 795 FILE_LIST_DIRECTORY, 796 FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, 797 null, 798 OPEN_EXISTING, 799 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, 800 null); 801 } 802 803 ~this() 804 { 805 CloseHandle(m_handle); 806 } 807 808 override @property Path path() const { return m_path; } 809 override @property bool recursive() const { return m_recursive; } 810 811 void release() 812 { 813 assert(m_task == Task.getThis(), "Releasing FileStream that is not owned by the calling task."); 814 m_task = Task(); 815 } 816 817 void acquire() 818 { 819 assert(m_task == Task(), "Acquiring FileStream that is already owned."); 820 m_task = Task.getThis(); 821 } 822 823 bool amOwner() 824 { 825 return m_task == Task.getThis(); 826 } 827 828 override bool readChanges(ref DirectoryChange[] dst, Duration timeout) 829 { 830 OVERLAPPED overlapped; 831 overlapped.Internal = 0; 832 overlapped.InternalHigh = 0; 833 overlapped.Offset = 0; 834 overlapped.OffsetHigh = 0; 835 overlapped.hEvent = cast(HANDLE)cast(void*)this; 836 837 m_bytesTransferred = 0; 838 DWORD bytesReturned; 839 if( !ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, m_recursive, 840 m_notifications, &bytesReturned, &overlapped, &onIOCompleted) ) 841 { 842 logError("Failed to read directory changes in '%s'", m_path); 843 return false; 844 } 845 846 // FIXME: obey timeout! 847 assert(timeout.isNegative()); 848 while( !m_bytesTransferred ) m_core.yieldForEvent(); 849 850 ubyte[] result = m_buffer[0 .. m_bytesTransferred]; 851 do { 852 assert(result.length >= FILE_NOTIFY_INFORMATION.sizeof); 853 auto fni = cast(FILE_NOTIFY_INFORMATION*)result.ptr; 854 DirectoryChangeType kind; 855 switch( fni.Action ){ 856 default: kind = DirectoryChangeType.modified; break; 857 case 0x1: kind = DirectoryChangeType.added; break; 858 case 0x2: kind = DirectoryChangeType.removed; break; 859 case 0x3: kind = DirectoryChangeType.modified; break; 860 case 0x4: kind = DirectoryChangeType.removed; break; 861 case 0x5: kind = DirectoryChangeType.added; break; 862 } 863 string filename = to!string(fni.FileName[0 .. fni.FileNameLength/2]); 864 dst ~= DirectoryChange(kind, Path(filename)); 865 //logTrace("File changed: %s", fni.FileName.ptr[0 .. fni.FileNameLength/2]); 866 if( fni.NextEntryOffset == 0 ) break; 867 result = result[fni.NextEntryOffset .. $]; 868 } while(result.length > 0); 869 870 return true; 871 } 872 873 static nothrow extern(System) 874 { 875 void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped) 876 { 877 try { 878 auto watcher = cast(Win32DirectoryWatcher)overlapped.hEvent; 879 watcher.m_bytesTransferred = cbTransferred; 880 if( watcher.m_task ){ 881 Exception ex; 882 if( dwError != 0 ) ex = new Exception("Diretory watcher error: "~to!string(dwError)); 883 if( watcher.m_task ) watcher.m_core.resumeTask(watcher.m_task, ex); 884 } 885 } catch( UncaughtException th ){ 886 logWarn("Exception in directory watcher callback: %s", th.msg); 887 } 888 } 889 } 890 } 891 892 893 /******************************************************************************/ 894 /* class Win32UDPConnection */ 895 /******************************************************************************/ 896 897 final class Win32UDPConnection : UDPConnection, SocketEventHandler { 898 @trusted: 899 private { 900 Task m_task; 901 Win32EventDriver m_driver; 902 SOCKET m_socket; 903 NetworkAddress m_bindAddress; 904 bool m_canBroadcast; 905 } 906 907 this(Win32EventDriver driver, SOCKET sock, NetworkAddress bind_addr) 908 { 909 m_driver = driver; 910 m_socket = sock; 911 m_bindAddress = bind_addr; 912 913 WSAAsyncSelect(sock, m_driver.m_hwnd, WM_USER_SOCKET, FD_READ|FD_WRITE|FD_CLOSE); 914 915 //bind... 916 } 917 918 @property SOCKET socket() { return m_socket; } 919 920 override @property string bindAddress() const { 921 // NOTE: using WSAAddressToStringW instead of inet_ntop because that is only available from Vista up 922 wchar[64] buf; 923 DWORD buf_len = 64; 924 WSAAddressToStringW(m_bindAddress.sockAddr, m_bindAddress.sockAddrLen, null, buf.ptr, &buf_len); 925 auto ret = to!string(buf[0 .. buf_len]); 926 ret = ret[0 .. ret.lastIndexOf(':')]; // strip the port number 927 return ret; 928 } 929 930 override @property NetworkAddress localAddress() const { return m_bindAddress; } 931 932 override @property bool canBroadcast() const { return m_canBroadcast; } 933 override @property void canBroadcast(bool val) 934 { 935 int tmp_broad = val; 936 socketEnforce(setsockopt(m_socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof) == 0, 937 "Failed to change the socket broadcast flag"); 938 m_canBroadcast = val; 939 } 940 941 override void close() 942 { 943 if (m_socket == INVALID_SOCKET) return; 944 closesocket(m_socket); 945 m_socket = INVALID_SOCKET; 946 } 947 948 bool amOwner() { 949 return m_task != Task() && m_task == Task.getThis(); 950 } 951 952 void acquire() 953 { 954 assert(m_task == Task(), "Trying to acquire a TCP connection that is currently owned."); 955 m_task = Task.getThis(); 956 } 957 958 void release() 959 { 960 assert(m_task != Task(), "Trying to release a TCP connection that is not owned."); 961 assert(m_task == Task.getThis(), "Trying to release a foreign TCP connection."); 962 m_task = Task(); 963 } 964 965 override void connect(string host, ushort port) 966 { 967 NetworkAddress addr = m_driver.resolveHost(host, m_bindAddress.family); 968 addr.port = port; 969 connect(addr); 970 } 971 override void connect(NetworkAddress addr) 972 { 973 socketEnforce(.connect(m_socket, addr.sockAddr, addr.sockAddrLen) == 0, "Failed to connect UDP socket"); 974 } 975 976 override void send(in ubyte[] data, in NetworkAddress* peer_address = null) 977 { 978 assert(data.length <= int.max); 979 sizediff_t ret; 980 if( peer_address ){ 981 ret = .sendto(m_socket, data.ptr, cast(int)data.length, 0, peer_address.sockAddr, peer_address.sockAddrLen); 982 } else { 983 ret = .send(m_socket, data.ptr, cast(int)data.length, 0); 984 } 985 logTrace("send ret: %s, %s", ret, WSAGetLastError()); 986 socketEnforce(ret >= 0, "Error sending UDP packet"); 987 enforce(ret == data.length, "Unable to send full packet."); 988 } 989 990 override ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null) 991 { 992 return recv(Duration.max, buf, peer_address); 993 } 994 995 override ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null) 996 { 997 size_t tm; 998 if (timeout != Duration.max && timeout > 0.seconds) { 999 tm = m_driver.createTimer(null); 1000 m_driver.rearmTimer(tm, timeout, false); 1001 m_driver.acquireTimer(tm); 1002 } 1003 1004 acquire(); 1005 scope(exit) { 1006 release(); 1007 if (tm != size_t.max) m_driver.releaseTimer(tm); 1008 } 1009 1010 assert(buf.length <= int.max); 1011 if( buf.length == 0 ) buf.length = 65507; 1012 NetworkAddress from; 1013 from.family = m_bindAddress.family; 1014 while(true){ 1015 socklen_t addr_len = from.sockAddrLen; 1016 auto ret = .recvfrom(m_socket, buf.ptr, cast(int)buf.length, 0, from.sockAddr, &addr_len); 1017 if( ret > 0 ){ 1018 if( peer_address ) *peer_address = from; 1019 return buf[0 .. ret]; 1020 } 1021 if( ret < 0 ){ 1022 auto err = WSAGetLastError(); 1023 logDebug("UDP recv err: %s", err); 1024 socketEnforce(err == WSAEWOULDBLOCK, "Error receiving UDP packet"); 1025 1026 if (timeout != Duration.max) { 1027 enforce(timeout > 0.seconds && m_driver.isTimerPending(tm), "UDP receive timeout."); 1028 } 1029 } 1030 m_driver.m_core.yieldForEvent(); 1031 } 1032 } 1033 1034 void notifySocketEvent(SOCKET sock, WORD event, WORD error) 1035 { 1036 assert(false); 1037 } 1038 1039 void addMembership(ref NetworkAddress multiaddr) 1040 { 1041 assert(false, "TODO!"); 1042 } 1043 1044 @property void multicastLoopback(bool loop) 1045 { 1046 assert(false, "TODO!"); 1047 } 1048 1049 private static nothrow extern(C) void onUDPRead(SOCKET sockfd, short evts, void* arg) 1050 { 1051 /*auto ctx = cast(TCPContext*)arg; 1052 logTrace("udp socket %d read event!", ctx.socketfd); 1053 1054 try { 1055 auto f = ctx.task; 1056 if( f && f.state != Fiber.State.TERM ) 1057 ctx.core.resumeTask(f); 1058 } catch( UncaughtException e ){ 1059 logError("Exception onUDPRead: %s", e.msg); 1060 debug assert(false); 1061 }*/ 1062 } 1063 } 1064 1065 1066 /******************************************************************************/ 1067 /* class Win32TCPConnection */ 1068 /******************************************************************************/ 1069 1070 enum ConnectionStatus { Initialized, Connected, Disconnected } 1071 1072 final class Win32TCPConnection : TCPConnection, SocketEventHandler { 1073 @trusted: 1074 private { 1075 Win32EventDriver m_driver; 1076 Task m_readOwner; 1077 Task m_writeOwner; 1078 bool m_tcpNoDelay; 1079 Duration m_readTimeout; 1080 bool m_keepAlive; 1081 SOCKET m_socket; 1082 NetworkAddress m_localAddress; 1083 NetworkAddress m_peerAddress; 1084 string m_peerAddressString; 1085 DWORD m_bytesTransferred; 1086 ConnectionStatus m_status; 1087 FixedRingBuffer!(ubyte, 64*1024) m_readBuffer; 1088 void delegate(TCPConnection) m_connectionCallback; 1089 Exception m_exception; 1090 1091 HANDLE m_transferredFile; 1092 OVERLAPPED m_fileOverlapped; 1093 } 1094 1095 this(Win32EventDriver driver, SOCKET sock, NetworkAddress peer_address, ConnectionStatus status = ConnectionStatus.Initialized) 1096 { 1097 m_driver = driver; 1098 m_socket = sock; 1099 m_driver.m_socketHandlers[sock] = this; 1100 m_status = status; 1101 1102 m_localAddress.family = peer_address.family; 1103 if (peer_address.family == AF_INET) m_localAddress.sockAddrInet4.sin_addr.s_addr = 0; 1104 else m_localAddress.sockAddrInet6.sin6_addr.s6_addr[] = 0; 1105 socklen_t balen = m_localAddress.sockAddrLen; 1106 socketEnforce(getsockname(sock, m_localAddress.sockAddr, &balen) == 0, "getsockname failed"); 1107 1108 m_peerAddress = peer_address; 1109 1110 // NOTE: using WSAAddressToStringW instead of inet_ntop because that is only available from Vista up 1111 wchar[64] buf; 1112 DWORD buflen = buf.length; 1113 socketEnforce(WSAAddressToStringW(m_peerAddress.sockAddr, m_peerAddress.sockAddrLen, null, buf.ptr, &buflen) == 0, "Failed to get string representation of peer address"); 1114 m_peerAddressString = to!string(buf[0 .. buflen]); 1115 m_peerAddressString = m_peerAddressString[0 .. m_peerAddressString.lastIndexOf(':')]; // strip the port number 1116 1117 // setup overlapped structure for copy-less file sending 1118 m_fileOverlapped.Internal = 0; 1119 m_fileOverlapped.InternalHigh = 0; 1120 m_fileOverlapped.Offset = 0; 1121 m_fileOverlapped.OffsetHigh = 0; 1122 m_fileOverlapped.hEvent = m_driver.m_fileCompletionEvent; 1123 1124 WSAAsyncSelect(sock, m_driver.m_hwnd, WM_USER_SOCKET, FD_READ|FD_WRITE|FD_CONNECT|FD_CLOSE); 1125 } 1126 1127 ~this() 1128 { 1129 /*if( m_socket != -1 ){ 1130 closesocket(m_socket); 1131 }*/ 1132 } 1133 1134 @property SOCKET socket() { return m_socket; } 1135 1136 private void connect(NetworkAddress addr) 1137 { 1138 enforce(m_status != ConnectionStatus.Connected, "Connection is already established."); 1139 acquire(); 1140 scope(exit) release(); 1141 1142 auto ret = .connect(m_socket, addr.sockAddr, addr.sockAddrLen); 1143 //enforce(WSAConnect(m_socket, addr.sockAddr, addr.sockAddrLen, null, null, null, null), "Failed to connect to host"); 1144 1145 if (ret != 0) { 1146 auto err = WSAGetLastError(); 1147 logDebugV("connect err: %s", err); 1148 import std.string; 1149 socketEnforce(err == WSAEWOULDBLOCK, "Connect call failed"); 1150 while (m_status != ConnectionStatus.Connected) { 1151 m_driver.m_core.yieldForEvent(); 1152 if (m_exception) throw m_exception; 1153 } 1154 } 1155 assert(m_status == ConnectionStatus.Connected); 1156 } 1157 1158 void release() 1159 { 1160 assert(m_readOwner == Task.getThis() && m_readOwner == m_writeOwner, "Releasing TCP connection that is not owned by the calling task."); 1161 m_readOwner = m_writeOwner = Task(); 1162 } 1163 1164 void acquire() 1165 { 1166 assert(m_readOwner == Task() && m_writeOwner == Task(), "Acquiring TCP connection that is currently owned."); 1167 m_readOwner = m_writeOwner = Task.getThis(); 1168 } 1169 1170 bool amOwner() { return Task.getThis() == m_readOwner && m_readOwner == m_writeOwner; } 1171 1172 override @property void tcpNoDelay(bool enabled) 1173 { 1174 m_tcpNoDelay = enabled; 1175 BOOL eni = enabled; 1176 setsockopt(m_socket, IPPROTO_TCP, TCP_NODELAY, &eni, eni.sizeof); 1177 } 1178 override @property bool tcpNoDelay() const { return m_tcpNoDelay; } 1179 1180 override @property void readTimeout(Duration v) 1181 { 1182 m_readTimeout = v; 1183 auto msecs = v.total!"msecs"(); 1184 assert(msecs < DWORD.max); 1185 DWORD vdw = cast(DWORD)msecs; 1186 setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, &vdw, vdw.sizeof); 1187 } 1188 override @property Duration readTimeout() const { return m_readTimeout; } 1189 1190 override @property void keepAlive(bool enabled) 1191 { 1192 m_keepAlive = enabled; 1193 BOOL eni = enabled; 1194 setsockopt(m_socket, SOL_SOCKET, SO_KEEPALIVE, &eni, eni.sizeof); 1195 } 1196 override @property bool keepAlive() const { return m_keepAlive; } 1197 1198 override @property bool connected() const { return m_status == ConnectionStatus.Connected; } 1199 1200 override @property string peerAddress() const { return m_peerAddressString; } 1201 1202 override @property NetworkAddress localAddress() const { return m_localAddress; } 1203 override @property NetworkAddress remoteAddress() const { return m_peerAddress; } 1204 1205 override @property bool empty() { return leastSize == 0; } 1206 1207 override @property ulong leastSize() 1208 { 1209 acquireReader(); 1210 scope(exit) releaseReader(); 1211 1212 while( m_readBuffer.empty ){ 1213 if( !connected ) return 0; 1214 m_driver.m_core.yieldForEvent(); 1215 } 1216 return m_readBuffer.length; 1217 } 1218 1219 override @property bool dataAvailableForRead() 1220 { 1221 acquireReader(); 1222 scope(exit) releaseReader(); 1223 return !m_readBuffer.empty; 1224 } 1225 1226 override void close() 1227 { 1228 acquire(); 1229 scope(exit) release(); 1230 WSASendDisconnect(m_socket, null); 1231 closesocket(m_socket); 1232 m_socket = -1; 1233 m_status = ConnectionStatus.Disconnected; 1234 } 1235 1236 override bool waitForData(Duration timeout) 1237 { 1238 if (timeout == 0.seconds) 1239 logDebug("Warning: use Duration.max as an argument to waitForData() to wait infinitely, not 0.seconds."); 1240 1241 acquireReader(); 1242 scope(exit) releaseReader(); 1243 if (timeout != Duration.max && timeout != 0.seconds) { 1244 auto tm = m_driver.createTimer(null); 1245 scope(exit) m_driver.releaseTimer(tm); 1246 m_driver.m_timers.getUserData(tm).owner = Task.getThis(); 1247 m_driver.rearmTimer(tm, timeout, false); 1248 while (m_readBuffer.empty) { 1249 if (!connected) return false; 1250 m_driver.m_core.yieldForEvent(); 1251 if (!m_driver.isTimerPending(tm)) return false; 1252 } 1253 } else { 1254 while (m_readBuffer.empty) { 1255 if (!connected) return false; 1256 m_driver.m_core.yieldForEvent(); 1257 } 1258 } 1259 return true; 1260 } 1261 1262 override const(ubyte)[] peek() 1263 { 1264 acquireReader(); 1265 scope(exit) releaseReader(); 1266 return m_readBuffer.peek(); 1267 } 1268 1269 override size_t read(scope ubyte[] dst, IOMode) 1270 { 1271 acquireReader(); 1272 scope(exit) releaseReader(); 1273 1274 size_t nbytes = 0; 1275 while (dst.length > 0) { 1276 while( m_readBuffer.empty ){ 1277 checkConnected(); 1278 m_driver.m_core.yieldForEvent(); 1279 } 1280 size_t amt = min(dst.length, m_readBuffer.length); 1281 1282 m_readBuffer.read(dst[0 .. amt]); 1283 dst = dst[amt .. $]; 1284 nbytes += amt; 1285 } 1286 1287 return nbytes; 1288 } 1289 1290 override size_t write(in ubyte[] bytes_, IOMode) 1291 { 1292 acquireWriter(); 1293 scope(exit) releaseWriter(); 1294 1295 checkConnected(); 1296 const(ubyte)[] bytes = bytes_; 1297 logTrace("TCP write with %s bytes called", bytes.length); 1298 1299 WSAOVERLAPPEDX overlapped; 1300 overlapped.Internal = 0; 1301 overlapped.InternalHigh = 0; 1302 overlapped.Offset = 0; 1303 overlapped.OffsetHigh = 0; 1304 overlapped.hEvent = cast(HANDLE)cast(void*)this; 1305 1306 size_t nbytes = 0; 1307 while (bytes.length > 0) { 1308 WSABUF buf; 1309 buf.len = bytes.length; 1310 buf.buf = cast(ubyte*)bytes.ptr; 1311 1312 m_bytesTransferred = 0; 1313 logTrace("Sending %s bytes TCP", buf.len); 1314 auto ret = WSASend(m_socket, &buf, 1, null, 0, &overlapped, &onIOWriteCompleted); 1315 if( ret == SOCKET_ERROR ){ 1316 auto err = WSAGetLastError(); 1317 socketEnforce(err == WSA_IO_PENDING, "Failed to send data"); 1318 } 1319 while( !m_bytesTransferred ) m_driver.m_core.yieldForEvent(); 1320 1321 assert(m_bytesTransferred <= bytes.length, "More data sent than requested!?"); 1322 bytes = bytes[m_bytesTransferred .. $]; 1323 nbytes += m_bytesTransferred; 1324 } 1325 return nbytes; 1326 1327 } 1328 1329 override void flush() 1330 { 1331 acquireWriter(); 1332 scope(exit) releaseWriter(); 1333 1334 checkConnected(); 1335 } 1336 1337 override void finalize() 1338 { 1339 flush(); 1340 } 1341 1342 void writeFile(Path filename) 1343 { 1344 auto fstream = m_driver.openFile(filename, FileMode.read); 1345 enforce(fstream.size <= 1<<31); 1346 acquireWriter(); 1347 m_bytesTransferred = 0; 1348 m_driver.m_fileWriters[this] = true; 1349 scope(exit) releaseWriter(); 1350 logDebug("Using sendfile! %s %s %s", fstream.m_handle, fstream.tell(), fstream.size); 1351 1352 if (TransmitFile(m_socket, fstream.m_handle, 0, 0, &m_fileOverlapped, null, 0)) 1353 m_bytesTransferred = 1; 1354 1355 socketEnforce(WSAGetLastError() == WSA_IO_PENDING, "Failed to send file over TCP."); 1356 1357 while (m_bytesTransferred < fstream.size) m_driver.m_core.yieldForEvent(); 1358 } 1359 1360 InputStream acquireReader() { assert(m_readOwner == Task()); m_readOwner = Task.getThis(); return this; } 1361 void releaseReader() { assert(m_readOwner == Task.getThis()); m_readOwner = Task(); } 1362 bool amReadOwner() const { return m_readOwner == Task.getThis(); } 1363 1364 OutputStream acquireWriter() { assert(m_writeOwner == Task()); m_writeOwner = Task.getThis(); return this; } 1365 void releaseWriter() { assert(m_writeOwner == Task.getThis()); m_writeOwner = Task(); } 1366 bool amWriteOwner() const { return m_writeOwner == Task.getThis(); } 1367 1368 private void checkConnected() 1369 { 1370 // TODO! 1371 } 1372 1373 private bool testFileWritten() 1374 { 1375 if( !GetOverlappedResult(m_transferredFile, &m_fileOverlapped, &m_bytesTransferred, false) ){ 1376 if( GetLastError() != ERROR_IO_PENDING ){ 1377 auto ex = new Exception("File transfer over TCP failed."); 1378 if (m_writeOwner != Task.init) { 1379 m_driver.m_core.resumeTask(m_writeOwner, ex); 1380 return true; 1381 } else throw ex; 1382 } 1383 return false; 1384 } else { 1385 if (m_writeOwner != Task.init) m_driver.m_core.resumeTask(m_writeOwner); 1386 return true; 1387 } 1388 } 1389 1390 void notifySocketEvent(SOCKET sock, WORD event, WORD error) 1391 nothrow { 1392 try { 1393 logDebugV("Socket event for %s: %s, error: %s", sock, event, error); 1394 if (m_socket == -1) { 1395 logDebug("Event for already closed socket - ignoring"); 1396 return; 1397 } 1398 assert(sock == m_socket); 1399 Exception ex; 1400 switch(event){ 1401 default: break; 1402 case FD_CONNECT: // doesn't seem to occur, but we handle it just in case 1403 if (error) { 1404 ex = new SystemSocketException("Failed to connect to host", error); 1405 m_status = ConnectionStatus.Disconnected; 1406 } else m_status = ConnectionStatus.Connected; 1407 if (m_writeOwner) m_driver.m_core.resumeTask(m_writeOwner, ex); 1408 break; 1409 case FD_READ: 1410 logTrace("TCP read event"); 1411 while (m_readBuffer.freeSpace > 0) { 1412 auto dst = m_readBuffer.peekDst(); 1413 assert(dst.length <= int.max); 1414 logTrace("Try to read up to %s bytes", dst.length); 1415 auto ret = .recv(m_socket, dst.ptr, cast(int)dst.length, 0); 1416 if (ret >= 0) { 1417 logTrace("received %s bytes", ret); 1418 if( ret == 0 ) break; 1419 m_readBuffer.putN(ret); 1420 } else { 1421 auto err = WSAGetLastError(); 1422 if( err != WSAEWOULDBLOCK ){ 1423 logTrace("receive error %s", err); 1424 ex = new SystemSocketException("Error reading data from socket", error); 1425 } 1426 break; 1427 } 1428 } 1429 1430 //m_driver.m_core.resumeTask(m_readOwner, ex); 1431 /*WSABUF buf; 1432 buf.len = dst.length; 1433 buf.buf = dst.ptr; 1434 DWORD flags = 0; 1435 1436 WSAOVERLAPPEDX overlapped; 1437 overlapped.Internal = 0; 1438 overlapped.InternalHigh = 0; 1439 overlapped.Offset = 0; 1440 overlapped.OffsetHigh = 0; 1441 overlapped.hEvent = cast(HANDLE)cast(void*)this; 1442 1443 m_bytesTransferred = 0; 1444 auto ret = WSARecv(m_socket, &buf, 1, null, &flags, &overlapped, &onIOCompleted); 1445 if( ret == SOCKET_ERROR ){ 1446 auto err = WSAGetLastError(); 1447 socketEnforce(err == WSA_IO_PENDING, "Failed to receive data"); 1448 } 1449 while( !m_bytesTransferred ) m_driver.m_core.yieldForEvent(); 1450 1451 assert(m_bytesTransferred <= dst.length, "More data received than requested!?"); 1452 m_readBuffer.pushN(m_bytesTransferred);*/ 1453 if (m_readOwner) m_driver.m_core.resumeTask(m_readOwner, ex); 1454 break; 1455 case FD_WRITE: 1456 if (m_status == ConnectionStatus.Initialized) { 1457 if( error ){ 1458 ex = new SystemSocketException("Failed to connect to host", error); 1459 } else m_status = ConnectionStatus.Connected; 1460 } 1461 if (m_writeOwner) m_driver.m_core.resumeTask(m_writeOwner, ex); 1462 break; 1463 case FD_CLOSE: 1464 if (error) { 1465 if (m_status == ConnectionStatus.Initialized) { 1466 ex = new SystemSocketException("Failed to connect to host", error); 1467 } else { 1468 ex = new SystemSocketException("The connection was closed with an error", error); 1469 } 1470 } else { 1471 m_status = ConnectionStatus.Disconnected; 1472 closesocket(m_socket); 1473 m_socket = -1; 1474 } 1475 if (m_writeOwner) m_driver.m_core.resumeTask(m_writeOwner, ex); 1476 break; 1477 } 1478 1479 if (ex) m_exception = ex; 1480 } catch( UncaughtException th ){ 1481 logWarn("Exception while handling socket event: %s", th.msg); 1482 } 1483 } 1484 1485 private void runConnectionCallback(TCPListenOptions options) 1486 { 1487 try { 1488 m_connectionCallback(this); 1489 logDebug("task out (fd %d).", m_socket); 1490 } catch( Exception e ){ 1491 logWarn("Handling of connection failed: %s", e.msg); 1492 logDiagnostic("%s", e.toString()); 1493 } finally { 1494 if (!(options & TCPListenOptions.disableAutoClose) && this.connected) close(); 1495 } 1496 } 1497 1498 private static extern(System) nothrow 1499 void onIOWriteCompleted(DWORD dwError, DWORD cbTransferred, WSAOVERLAPPEDX* lpOverlapped, DWORD dwFlags) 1500 { 1501 logTrace("IO completed for TCP send: %s (error=%s)", cbTransferred, dwError); 1502 try { 1503 auto conn = cast(Win32TCPConnection)(lpOverlapped.hEvent); 1504 conn.m_bytesTransferred = cbTransferred; 1505 if (conn.m_writeOwner != Task.init) { 1506 Exception ex; 1507 if( dwError != 0 ) ex = new Exception("Socket I/O error: "~to!string(dwError)); 1508 conn.m_driver.m_core.resumeTask(conn.m_writeOwner, ex); 1509 } 1510 } catch( UncaughtException th ){ 1511 logWarn("Exception while handling TCP I/O: %s", th.msg); 1512 } 1513 } 1514 } 1515 1516 /******************************************************************************/ 1517 /* class Win32TCPListener */ 1518 /******************************************************************************/ 1519 1520 final class Win32TCPListener : TCPListener, SocketEventHandler { 1521 @trusted: 1522 private { 1523 Win32EventDriver m_driver; 1524 SOCKET m_socket; 1525 NetworkAddress m_bindAddress; 1526 void delegate(TCPConnection conn) m_connectionCallback; 1527 TCPListenOptions m_options; 1528 } 1529 1530 this(Win32EventDriver driver, SOCKET sock, NetworkAddress bind_addr, void delegate(TCPConnection conn) @safe conn_callback, TCPListenOptions options) 1531 { 1532 m_driver = driver; 1533 m_socket = sock; 1534 m_bindAddress = bind_addr; 1535 m_connectionCallback = conn_callback; 1536 m_driver.m_socketHandlers[sock] = this; 1537 m_options = options; 1538 1539 WSAAsyncSelect(sock, m_driver.m_hwnd, WM_USER_SOCKET, FD_ACCEPT); 1540 } 1541 1542 override @property NetworkAddress bindAddress() 1543 { 1544 return m_bindAddress; 1545 } 1546 1547 override void stopListening() 1548 { 1549 if( m_socket == -1 ) return; 1550 closesocket(m_socket); 1551 m_socket = -1; 1552 } 1553 1554 SOCKET socket() nothrow { return m_socket; } 1555 1556 void notifySocketEvent(SOCKET sock, WORD event, WORD error) 1557 nothrow { 1558 assert(sock == m_socket); 1559 switch(event){ 1560 default: assert(false); 1561 case FD_ACCEPT: 1562 try { 1563 NetworkAddress addr; 1564 addr.family = AF_INET6; 1565 int addrlen = addr.sockAddrLen; 1566 auto clientsock = WSAAccept(sock, addr.sockAddr, &addrlen, null, 0); 1567 assert(addrlen == addr.sockAddrLen); 1568 // TODO avoid GC allocations for delegate and Win32TCPConnection 1569 auto conn = new Win32TCPConnection(m_driver, clientsock, addr, ConnectionStatus.Connected); 1570 conn.m_connectionCallback = m_connectionCallback; 1571 runTask(&conn.runConnectionCallback, m_options); 1572 } catch( Exception e ){ 1573 logWarn("Exception white accepting TCP connection: %s", e.msg); 1574 try logDiagnostic("Exception white accepting TCP connection: %s", e.toString()); 1575 catch( Exception ){} 1576 } 1577 break; 1578 } 1579 } 1580 } 1581 1582 1583 private { 1584 struct TimerMapTraits { 1585 enum clearValue = UINT_PTR.max; 1586 static bool equals(UINT_PTR a, UINT_PTR b) { return a == b; } 1587 } 1588 __gshared s_setupWindowClass = false; 1589 } 1590 1591 void setupWindowClass() nothrow 1592 @trusted { 1593 if( s_setupWindowClass ) return; 1594 WNDCLASSA wc; 1595 wc.lpfnWndProc = &Win32EventDriver.onMessage; 1596 wc.lpszClassName = "VibeWin32MessageWindow"; 1597 RegisterClassA(&wc); 1598 s_setupWindowClass = true; 1599 } 1600 1601 version (VibeDebugCatchAll) private alias UncaughtException = Throwable; 1602 else private alias UncaughtException = Exception; 1603 1604 } // version(VibeWin32Driver)