1 /** 2 Driver implementation for the libasync library 3 4 Libasync is an asynchronous library completely written in D. 5 6 See_Also: 7 `vibe.core.driver` = interface definition 8 https://github.com/etcimon/libasync = Github repository 9 10 11 Copyright: © 2014-2015 RejectedSoftware e.K., GlobecSys Inc 12 Authors: Sönke Ludwig, Etienne Cimon 13 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 14 */ 15 module vibe.core.drivers.libasync; 16 17 version(VibeLibasyncDriver): 18 19 import vibe.core.core; 20 import vibe.core.driver; 21 import vibe.core.drivers.threadedfile; 22 import vibe.core.drivers.timerqueue; 23 import vibe.core.log; 24 import vibe.utils.array : FixedRingBuffer; 25 26 import libasync : AsyncDirectoryWatcher, AsyncDNS, AsyncFile, AsyncSignal, AsyncTimer, 27 AsyncTCPConnection, AsyncTCPListener, AsyncUDPSocket, DWFileEvent, DWChangeInfo, 28 EventLoop, NetworkAddressLA = NetworkAddress, UDPEvent, TCPEvent, TCPOption, fd_t, 29 getThreadEventLoop; 30 import libasync.internals.memory; 31 import libasync.types : Status; 32 33 import std.algorithm : min, max; 34 import std.array; 35 import std.container : Array; 36 import std.conv; 37 import std.datetime; 38 import std.encoding; 39 import std.exception; 40 import std.string; 41 import std.stdio : File; 42 import std.typecons; 43 44 import core.atomic; 45 import core.memory; 46 import core.thread; 47 import core.sync.mutex; 48 import core.stdc.stdio; 49 import core.sys.posix.netinet.in_; 50 51 version (Posix) import core.sys.posix.sys.socket; 52 version (Windows) import core.sys.windows.winsock2; 53 54 private __gshared EventLoop gs_evLoop; 55 private EventLoop s_evLoop; 56 private DriverCore s_driverCore; 57 58 version(Windows) extern(C) { 59 FILE* _wfopen(const(wchar)* filename, in wchar* mode); 60 int _wchmod(in wchar*, int); 61 } 62 63 EventLoop getMainEventLoop() @trusted nothrow 64 { 65 if (s_evLoop is null) 66 return gs_evLoop; 67 68 return s_evLoop; 69 } 70 71 DriverCore getDriverCore() @safe nothrow 72 { 73 assert(s_driverCore !is null); 74 return s_driverCore; 75 } 76 77 private struct TimerInfo { 78 size_t refCount = 1; 79 void delegate() callback; 80 Task owner; 81 82 this(void delegate() callback) { this.callback = callback; } 83 } 84 85 /// one per thread 86 final class LibasyncDriver : EventDriver { 87 @trusted: 88 private { 89 bool m_break = false; 90 debug Thread m_ownerThread; 91 AsyncTimer m_timerEvent; 92 TimerQueue!TimerInfo m_timers; 93 SysTime m_nextSched = SysTime.max; 94 shared AsyncSignal m_exitSignal; 95 } 96 97 this(DriverCore core) nothrow 98 { 99 assert(!isControlThread, "Libasync driver created in control thread"); 100 try { 101 import core.atomic : atomicOp; 102 if (!gs_mutex) { 103 import core.sync.mutex; 104 gs_mutex = new core.sync.mutex.Mutex; 105 106 gs_availID.reserve(32); 107 108 foreach (i; gs_availID.length .. gs_availID.capacity) { 109 gs_availID.insertBack(i + 1); 110 } 111 112 gs_maxID = 32; 113 114 } 115 } 116 catch (Throwable) { 117 assert(false, "Couldn't reserve necessary space for available Manual Events"); 118 } 119 120 debug m_ownerThread = Thread.getThis(); 121 s_driverCore = core; 122 s_evLoop = getThreadEventLoop(); 123 if (!gs_evLoop) 124 gs_evLoop = s_evLoop; 125 126 m_exitSignal = new shared AsyncSignal(getMainEventLoop()); 127 m_exitSignal.run({ 128 m_break = true; 129 }); 130 logTrace("Loaded libasync backend in thread %s", Thread.getThis().name); 131 132 } 133 134 static @property bool isControlThread() nothrow { 135 scope(failure) assert(false); 136 return Thread.getThis().isDaemon && Thread.getThis().name == "CmdProcessor"; 137 } 138 139 override void dispose() 140 { 141 logTrace("Deleting event driver"); 142 m_break = true; 143 getMainEventLoop().exit(); 144 } 145 146 override int runEventLoop() 147 { 148 while(!m_break && getMainEventLoop().loop(int.max.msecs)){ 149 processTimers(); 150 getDriverCore().notifyIdle(); 151 } 152 m_break = false; 153 logDebug("Event loop exit %d", m_break); 154 return 0; 155 } 156 157 override int runEventLoopOnce() 158 { 159 getMainEventLoop().loop(int.max.msecs); 160 processTimers(); 161 getDriverCore().notifyIdle(); 162 logTrace("runEventLoopOnce exit"); 163 return 0; 164 } 165 166 override bool processEvents() 167 { 168 getMainEventLoop().loop(0.seconds); 169 processTimers(); 170 if (m_break) { 171 m_break = false; 172 return false; 173 } 174 return true; 175 } 176 177 override void exitEventLoop() 178 { 179 logDebug("Exiting (%s)", m_break); 180 m_exitSignal.trigger(); 181 182 } 183 184 override LibasyncFileStream openFile(Path path, FileMode mode) 185 { 186 return new LibasyncFileStream(path, mode); 187 } 188 189 override DirectoryWatcher watchDirectory(Path path, bool recursive) 190 { 191 return new LibasyncDirectoryWatcher(path, recursive); 192 } 193 194 /** Resolves the given host name or IP address string. */ 195 override NetworkAddress resolveHost(string host, ushort family = 2, bool use_dns = true) 196 { 197 import libasync.types : isIPv6; 198 isIPv6 is_ipv6; 199 200 if (family == AF_INET6) 201 is_ipv6 = isIPv6.yes; 202 else 203 is_ipv6 = isIPv6.no; 204 205 import std.regex : regex, Captures, Regex, matchFirst, ctRegex; 206 import std.traits : ReturnType; 207 208 auto IPv4Regex = ctRegex!(`^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}$`, ``); 209 auto IPv6Regex = ctRegex!(`^([0-9A-Fa-f]{0,4}:){2,7}([0-9A-Fa-f]{1,4}$|((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4})$`, ``); 210 auto ipv4 = matchFirst(host, IPv4Regex); 211 auto ipv6 = matchFirst(host, IPv6Regex); 212 if (!ipv4.empty) 213 { 214 if (!ipv4.empty) 215 is_ipv6 = isIPv6.no; 216 use_dns = false; 217 } 218 else if (!ipv6.empty) 219 { // fixme: match host instead? 220 is_ipv6 = isIPv6.yes; 221 use_dns = false; 222 } 223 else 224 { 225 use_dns = true; 226 } 227 228 NetworkAddress ret; 229 230 if (use_dns) { 231 bool done; 232 struct DNSCallback { 233 Task waiter; 234 NetworkAddress* address; 235 bool* finished; 236 void handler(NetworkAddressLA addr) { 237 *address = NetworkAddress(addr); 238 *finished = true; 239 if (waiter != Task() && waiter != Task.getThis()) 240 getDriverCore().resumeTask(waiter); 241 } 242 } 243 244 DNSCallback* cb = FreeListObjectAlloc!DNSCallback.alloc(); 245 cb.waiter = Task.getThis(); 246 cb.address = &ret; 247 cb.finished = &done; 248 249 // todo: remove the shared attribute to avoid GC? 250 shared AsyncDNS dns = new shared AsyncDNS(getMainEventLoop()); 251 scope(exit) dns.destroy(); 252 bool success = dns.handler(&cb.handler).resolveHost(host, is_ipv6); 253 if (!success || dns.status.code != Status.OK) 254 throw new Exception(dns.status.text); 255 while(!done) 256 getDriverCore.yieldForEvent(); 257 if (dns.status.code != Status.OK) 258 throw new Exception(dns.status.text); 259 assert(ret != NetworkAddress.init); 260 assert(ret.family != 0); 261 logTrace("Async resolved address %s", ret.toString()); 262 FreeListObjectAlloc!DNSCallback.free(cb); 263 264 if (ret.family == 0) 265 ret.family = family; 266 267 return ret; 268 } 269 else { 270 ret = NetworkAddress(getMainEventLoop().resolveIP(host, 0, is_ipv6)); 271 if (ret.family == 0) 272 ret.family = family; 273 return ret; 274 } 275 276 } 277 278 override LibasyncTCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_addr) 279 { 280 AsyncTCPConnection conn = new AsyncTCPConnection(getMainEventLoop()); 281 282 LibasyncTCPConnection tcp_connection = new LibasyncTCPConnection(conn, (TCPConnection conn) { 283 Task waiter = (cast(LibasyncTCPConnection) conn).m_settings.writer.task; 284 if (waiter != Task()) { 285 getDriverCore().resumeTask(waiter); 286 } 287 }); 288 289 if (Task.getThis() != Task()) 290 tcp_connection.m_settings.writer.acquire(); 291 292 tcp_connection.m_tcpImpl.conn = conn; 293 //conn.local = bind_addr; 294 conn.ip(bind_addr.toAddressString(), bind_addr.port); 295 conn.peer = cast(NetworkAddressLA)addr; 296 297 enforce(conn.run(&tcp_connection.handler), "An error occured while starting a new connection: " ~ conn.error); 298 299 while (!tcp_connection.connected && !tcp_connection.m_error) getDriverCore().yieldForEvent(); 300 enforce(!tcp_connection.m_error, tcp_connection.m_error); 301 tcp_connection.m_tcpImpl.localAddr = NetworkAddress(conn.local); 302 303 if (Task.getThis() != Task()) 304 tcp_connection.m_settings.writer.release(); 305 return tcp_connection; 306 } 307 308 override LibasyncTCPListener listenTCP(ushort port, void delegate(TCPConnection conn) @safe conn_callback, string address, TCPListenOptions options) 309 { 310 NetworkAddress localaddr = getEventDriver().resolveHost(address); 311 localaddr.port = port; 312 313 return new LibasyncTCPListener(localaddr, conn_callback, options); 314 } 315 316 override LibasyncUDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0") 317 { 318 NetworkAddress localaddr = getEventDriver().resolveHost(bind_address); 319 localaddr.port = port; 320 AsyncUDPSocket sock = new AsyncUDPSocket(getMainEventLoop()); 321 sock.local = cast(NetworkAddressLA)localaddr; 322 auto udp_connection = new LibasyncUDPConnection(sock); 323 sock.run(&udp_connection.handler); 324 return udp_connection; 325 } 326 327 override LibasyncManualEvent createManualEvent() 328 { 329 return new LibasyncManualEvent(this); 330 } 331 332 override FileDescriptorEvent createFileDescriptorEvent(int file_descriptor, FileDescriptorEvent.Trigger triggers, FileDescriptorEvent.Mode mode) 333 { 334 assert(false); 335 } 336 337 338 // The following timer implementation was adapted from the equivalent in libevent2.d 339 340 override size_t createTimer(void delegate() @safe callback) { return m_timers.create(TimerInfo(callback)); } 341 342 override void acquireTimer(size_t timer_id) { m_timers.getUserData(timer_id).refCount++; } 343 override void releaseTimer(size_t timer_id) 344 nothrow { 345 debug assert(m_ownerThread is Thread.getThis()); 346 logTrace("Releasing timer %s", timer_id); 347 if (!--m_timers.getUserData(timer_id).refCount) 348 m_timers.destroy(timer_id); 349 } 350 351 override bool isTimerPending(size_t timer_id) nothrow { return m_timers.isPending(timer_id); } 352 353 override void rearmTimer(size_t timer_id, Duration dur, bool periodic) 354 { 355 debug assert(m_ownerThread is Thread.getThis()); 356 if (!isTimerPending(timer_id)) acquireTimer(timer_id); 357 m_timers.schedule(timer_id, dur, periodic); 358 rescheduleTimerEvent(Clock.currTime(UTC())); 359 } 360 361 override void stopTimer(size_t timer_id) 362 { 363 logTrace("Stopping timer %s", timer_id); 364 if (m_timers.isPending(timer_id)) { 365 m_timers.unschedule(timer_id); 366 releaseTimer(timer_id); 367 } 368 } 369 370 override void waitTimer(size_t timer_id) 371 { 372 logTrace("Waiting for timer in %s", Task.getThis()); 373 debug assert(m_ownerThread is Thread.getThis()); 374 while (true) { 375 assert(!m_timers.isPeriodic(timer_id), "Cannot wait for a periodic timer."); 376 if (!m_timers.isPending(timer_id)) { 377 // logTrace("Timer is not pending"); 378 return; 379 } 380 auto data = &m_timers.getUserData(timer_id); 381 assert(data.owner == Task.init, "Waiting for the same timer from multiple tasks is not supported."); 382 data.owner = Task.getThis(); 383 scope (exit) m_timers.getUserData(timer_id).owner = Task.init; 384 getDriverCore().yieldForEvent(); 385 } 386 } 387 388 /// If the timer has an owner, it will resume the task. 389 /// if the timer has a callback, it will run a new task. 390 private void processTimers() 391 { 392 if (!m_timers.anyPending) return; 393 logTrace("Processing due timers"); 394 // process all timers that have expired up to now 395 auto now = Clock.currTime(UTC()); 396 // event loop timer will need to be rescheduled because we'll process everything until now 397 m_nextSched = SysTime.max; 398 399 m_timers.consumeTimeouts(now, (timer, periodic, ref data) { 400 Task owner = data.owner; 401 auto callback = data.callback; 402 403 logTrace("Timer %s fired (%s/%s)", timer, owner != Task.init, callback !is null); 404 405 if (!periodic) releaseTimer(timer); 406 407 if (owner && owner.running && owner != Task.getThis()) { 408 if (Task.getThis == Task.init) getDriverCore().resumeTask(owner); 409 else getDriverCore().yieldAndResumeTask(owner); 410 } 411 if (callback) runTask(callback); 412 }); 413 414 rescheduleTimerEvent(now); 415 } 416 417 private void rescheduleTimerEvent(SysTime now) 418 { 419 logTrace("Rescheduling timer event %s", Task.getThis()); 420 421 // don't bother scheduling, the timers will be processed before leaving for the event loop 422 if (m_nextSched <= Clock.currTime(UTC())) 423 return; 424 425 bool first; 426 auto next = m_timers.getFirstTimeout(); 427 Duration dur; 428 if (next == SysTime.max) return; 429 dur = max(1.msecs, next - now); 430 if (m_nextSched != next) 431 m_nextSched = next; 432 else return; 433 if (dur.total!"seconds"() >= int.max) 434 return; // will never trigger, don't bother 435 if (!m_timerEvent) { 436 //logTrace("creating new async timer"); 437 m_timerEvent = new AsyncTimer(getMainEventLoop()); 438 bool success = m_timerEvent.duration(dur).run(&onTimerTimeout); 439 assert(success, "Failed to run timer"); 440 } 441 else { 442 //logTrace("rearming the same timer instance"); 443 bool success = m_timerEvent.rearm(dur); 444 assert(success, "Failed to rearm timer"); 445 } 446 //logTrace("Rescheduled timer event for %s seconds in thread '%s' :: task '%s'", dur.total!"usecs" * 1e-6, Thread.getThis().name, Task.getThis()); 447 } 448 449 private void onTimerTimeout() 450 { 451 import std.encoding : sanitize; 452 453 logTrace("timer event fired"); 454 try processTimers(); 455 catch (Exception e) { 456 logError("Failed to process timers: %s", e.msg); 457 try logDiagnostic("Full error: %s", e.toString().sanitize); catch (Throwable) {} 458 } 459 } 460 } 461 462 /// Writes or reads asynchronously (in another thread) for sizes > 64kb to benefit from kernel page cache 463 /// in lower size operations. 464 final class LibasyncFileStream : FileStream { 465 @trusted: 466 import vibe.core.path : Path; 467 468 private { 469 Path m_path; 470 ulong m_size; 471 ulong m_offset = 0; 472 FileMode m_mode; 473 Task m_task; 474 Exception m_ex; 475 shared AsyncFile m_impl; 476 477 bool m_started; 478 bool m_truncated; 479 bool m_finished; 480 } 481 482 this(Path path, FileMode mode) 483 { 484 import std.file : getSize,exists; 485 if (mode != FileMode.createTrunc) 486 m_size = getSize(path.toNativeString()); 487 else { 488 auto path_str = path.toNativeString(); 489 if (exists(path_str)) 490 removeFile(path); 491 { // touch 492 import std.string : toStringz; 493 version(Windows) { 494 import std.utf : toUTF16z; 495 auto path_str_utf = path_str.toUTF16z(); 496 FILE* f = _wfopen(path_str_utf, "w"); 497 _wchmod(path_str_utf, S_IREAD|S_IWRITE); 498 } 499 else FILE * f = fopen(path_str.toStringz, "w"); 500 fclose(f); 501 m_truncated = true; 502 } 503 } 504 m_path = path; 505 m_mode = mode; 506 507 m_impl = new shared AsyncFile(getMainEventLoop()); 508 m_impl.onReady(&handler); 509 510 m_started = true; 511 } 512 513 ~this() 514 { 515 try close(); 516 catch (Exception e) { assert(false, e.msg); } 517 } 518 519 override @property Path path() const { return m_path; } 520 override @property bool isOpen() const { return m_started; } 521 override @property ulong size() const { return m_size; } 522 override @property bool readable() const { return m_mode != FileMode.append; } 523 override @property bool writable() const { return m_mode != FileMode.read; } 524 525 override void seek(ulong offset) 526 { 527 m_offset = offset; 528 } 529 530 override ulong tell() { return m_offset; } 531 532 override void close() 533 { 534 if (m_impl) { 535 m_impl.kill(); 536 m_impl = null; 537 } 538 m_started = false; 539 if (m_task != Task() && Task.getThis() != Task()) 540 getDriverCore().yieldAndResumeTask(m_task, new Exception("The file was closed during an operation")); 541 else if (m_task != Task() && Task.getThis() == Task()) 542 getDriverCore().resumeTask(m_task, new Exception("The file was closed during an operation")); 543 544 } 545 546 override @property bool empty() const { assert(this.readable); return m_offset >= m_size; } 547 override @property ulong leastSize() const { assert(this.readable); return m_size - m_offset; } 548 override @property bool dataAvailableForRead() { return true; } 549 550 override const(ubyte)[] peek() 551 { 552 return null; 553 } 554 555 override size_t read(scope ubyte[] dst, IOMode) 556 { 557 scope(failure) 558 close(); 559 assert(this.readable, "To read a file, it must be opened in a read-enabled mode."); 560 shared ubyte[] bytes = cast(shared) dst; 561 bool truncate_if_exists; 562 if (!m_truncated && m_mode == FileMode.createTrunc) { 563 truncate_if_exists = true; 564 m_truncated = true; 565 m_size = 0; 566 } 567 m_finished = false; 568 enforce(dst.length <= leastSize); 569 enforce(m_impl.read(m_path.toNativeString(), bytes, m_offset, true, truncate_if_exists), "Failed to read data from disk: " ~ m_impl.error); 570 571 if (!m_finished) { 572 acquire(); 573 scope(exit) release(); 574 getDriverCore().yieldForEvent(); 575 } 576 m_finished = false; 577 578 if (m_ex) throw m_ex; 579 580 m_offset += dst.length; 581 assert(m_impl.offset == m_offset, "Incoherent offset returned from file reader: " ~ m_offset.to!string ~ "B assumed but the implementation is at: " ~ m_impl.offset.to!string ~ "B"); 582 583 return dst.length; 584 } 585 586 alias Stream.write write; 587 override size_t write(in ubyte[] bytes_, IOMode) 588 { 589 assert(this.writable, "To write to a file, it must be opened in a write-enabled mode."); 590 591 shared const(ubyte)[] bytes = cast(shared const(ubyte)[]) bytes_; 592 593 bool truncate_if_exists; 594 if (!m_truncated && m_mode == FileMode.createTrunc) { 595 truncate_if_exists = true; 596 m_truncated = true; 597 m_size = 0; 598 } 599 m_finished = false; 600 601 if (m_mode == FileMode.append) 602 enforce(m_impl.append(m_path.toNativeString(), cast(shared ubyte[]) bytes, true, truncate_if_exists), "Failed to write data to disk: " ~ m_impl.error); 603 else 604 enforce(m_impl.write(m_path.toNativeString(), bytes, m_offset, true, truncate_if_exists), "Failed to write data to disk: " ~ m_impl.error); 605 606 if (!m_finished) { 607 acquire(); 608 scope(exit) release(); 609 getDriverCore().yieldForEvent(); 610 } 611 m_finished = false; 612 613 if (m_ex) throw m_ex; 614 615 if (m_mode == FileMode.append) { 616 m_size += bytes.length; 617 } 618 else { 619 m_offset += bytes.length; 620 if (m_offset >= m_size) 621 m_size += m_offset - m_size; 622 assert(m_impl.offset == m_offset, "Incoherent offset returned from file writer."); 623 } 624 //assert(getSize(m_path.toNativeString()) == m_size, "Incoherency between local size and filesize: " ~ m_size.to!string ~ "B assumed for a file of size " ~ getSize(m_path.toNativeString()).to!string ~ "B"); 625 626 return bytes_.length; 627 } 628 629 override void flush() 630 { 631 assert(this.writable, "To write to a file, it must be opened in a write-enabled mode."); 632 633 } 634 635 override void finalize() 636 { 637 if (this.writable) 638 flush(); 639 } 640 641 void release() 642 { 643 assert(Task.getThis() == Task() || m_task == Task.getThis(), "Releasing FileStream that is not owned by the calling task."); 644 m_task = Task(); 645 } 646 647 void acquire() 648 { 649 assert(Task.getThis() == Task() || m_task == Task(), "Acquiring FileStream that is already owned."); 650 m_task = Task.getThis(); 651 } 652 653 private void handler() { 654 // This may be called by the event loop if read/write > 64kb and another thread was delegated 655 Exception ex; 656 657 if (m_impl.status.code != Status.OK) 658 ex = new Exception(m_impl.error); 659 m_finished = true; 660 if (m_task != Task()) 661 getDriverCore().resumeTask(m_task, ex); 662 else m_ex = ex; 663 } 664 } 665 666 667 final class LibasyncDirectoryWatcher : DirectoryWatcher { 668 @trusted: 669 private { 670 Path m_path; 671 bool m_recursive; 672 Task m_task; 673 AsyncDirectoryWatcher m_impl; 674 Array!DirectoryChange m_changes; 675 Exception m_error; 676 } 677 678 this(Path path, bool recursive) 679 { 680 m_impl = new AsyncDirectoryWatcher(getMainEventLoop()); 681 m_impl.run(&handler); 682 m_path = path; 683 m_recursive = recursive; 684 watch(path, recursive); 685 // logTrace("DirectoryWatcher called with: %s", path.toNativeString()); 686 } 687 688 ~this() 689 { 690 m_impl.kill(); 691 } 692 693 override @property Path path() const { return m_path; } 694 override @property bool recursive() const { return m_recursive; } 695 696 void release() 697 { 698 assert(m_task == Task.getThis(), "Releasing FileStream that is not owned by the calling task."); 699 m_task = Task(); 700 } 701 702 void acquire() 703 { 704 assert(m_task == Task(), "Acquiring FileStream that is already owned."); 705 m_task = Task.getThis(); 706 } 707 708 bool amOwner() 709 { 710 return m_task == Task.getThis(); 711 } 712 713 override bool readChanges(ref DirectoryChange[] dst, Duration timeout) 714 { 715 dst.length = 0; 716 assert(!amOwner()); 717 if (m_error) 718 throw m_error; 719 acquire(); 720 scope(exit) release(); 721 void consumeChanges() { 722 if (m_impl.status.code == Status.ERROR) { 723 throw new Exception(m_impl.error); 724 } 725 726 foreach (ref change; m_changes[]) { 727 //logTrace("Adding change: %s", change.to!string); 728 dst ~= change; 729 } 730 731 //logTrace("Consumed change 1: %s", dst.to!string); 732 import std.array : array; 733 import std.algorithm : uniq; 734 dst = cast(DirectoryChange[]) uniq!((a, b) => a.path == b.path && a.type == b.type)(dst).array; 735 logTrace("Consumed change: %s", dst.to!string); 736 m_changes.clear(); 737 } 738 739 if (!m_changes.empty) { 740 consumeChanges(); 741 return true; 742 } 743 744 auto tm = getEventDriver().createTimer(null); 745 getEventDriver().m_timers.getUserData(tm).owner = Task.getThis(); 746 getEventDriver().rearmTimer(tm, timeout, false); 747 748 while (m_changes.empty) { 749 getDriverCore().yieldForEvent(); 750 if (!getEventDriver().isTimerPending(tm)) break; 751 } 752 753 if (!m_changes.empty) { 754 consumeChanges(); 755 return true; 756 } 757 758 return false; 759 } 760 761 private void watch(Path path, bool recursive) { 762 m_impl.watchDir(path.toNativeString(), DWFileEvent.ALL, recursive); 763 } 764 765 private void handler() { 766 import std.stdio; 767 DWChangeInfo[] changes = allocArray!DWChangeInfo(manualAllocator(), 128); 768 scope(exit) freeArray(manualAllocator(), changes); 769 Exception ex; 770 try { 771 uint cnt; 772 do { 773 cnt = m_impl.readChanges(changes); 774 size_t i; 775 foreach (DWChangeInfo change; changes) { 776 DirectoryChange dc; 777 778 final switch (change.event){ 779 case DWFileEvent.CREATED: dc.type = DirectoryChangeType.added; break; 780 case DWFileEvent.DELETED: dc.type = DirectoryChangeType.removed; break; 781 case DWFileEvent.MODIFIED: dc.type = DirectoryChangeType.modified; break; 782 case DWFileEvent.MOVED_FROM: dc.type = DirectoryChangeType.removed; break; 783 case DWFileEvent.MOVED_TO: dc.type = DirectoryChangeType.added; break; 784 case DWFileEvent.ALL: break; // impossible 785 case DWFileEvent.ERROR: throw new Exception(m_impl.error); 786 } 787 788 dc.path = Path(change.path); 789 //logTrace("Inserted %s absolute %s", dc.to!string, dc.path.absolute.to!string); 790 m_changes.insert(dc); 791 i++; 792 if (cnt == i) break; 793 } 794 } while(cnt == 0 && m_impl.status.code == Status.OK); 795 if (m_impl.status.code == Status.ERROR) { 796 ex = new Exception(m_impl.error); 797 } 798 799 } 800 catch (Exception e) { 801 ex = e; 802 } 803 if (m_task != Task()) getDriverCore().resumeTask(m_task, ex); 804 else m_error = ex; 805 } 806 807 } 808 809 810 811 final class LibasyncManualEvent : ManualEvent { 812 @trusted: 813 private { 814 shared(int) m_emitCount = 0; 815 shared(int) m_threadCount = 0; 816 shared(size_t) m_instance; 817 Array!(void*) ms_signals; 818 819 core.sync.mutex.Mutex m_mutex; 820 821 @property size_t instanceID() nothrow { return atomicLoad(m_instance); } 822 @property void instanceID(size_t instance) nothrow{ atomicStore(m_instance, instance); } 823 } 824 825 this(LibasyncDriver driver) 826 nothrow { 827 m_mutex = new core.sync.mutex.Mutex; 828 instanceID = generateID(); 829 } 830 831 ~this() 832 { 833 try { 834 recycleID(instanceID); 835 836 foreach (ref signal; ms_signals[]) { 837 if (signal) { 838 (cast(shared AsyncSignal) signal).kill(); 839 signal = null; 840 } 841 } 842 } catch (Exception e) { 843 import std.stdio; 844 writefln("Exception thrown while finalizing LibasyncManualEvent: %s", e.msg); 845 } 846 } 847 848 override void emit() 849 { 850 scope (failure) assert(false); // synchronized is not nothrow on DMD 2.066 and below and Array is not nothrow at all 851 logTrace("Emitting signal"); 852 atomicOp!"+="(m_emitCount, 1); 853 synchronized (m_mutex) { 854 logTrace("Looping signals. found: " ~ ms_signals.length.to!string); 855 foreach (ref signal; ms_signals[]) { 856 auto evloop = getMainEventLoop(); 857 shared AsyncSignal sig = cast(shared AsyncSignal) signal; 858 if (!sig.trigger(evloop)) logError("Failed to trigger ManualEvent: %s", sig.error); 859 } 860 } 861 } 862 863 override void wait() { wait(m_emitCount); } 864 override int wait(int reference_emit_count) { return doWait!true(reference_emit_count); } 865 override int wait(Duration timeout, int reference_emit_count) { return doWait!true(timeout, reference_emit_count); } 866 override int waitUninterruptible(int reference_emit_count) { return doWait!false(reference_emit_count); } 867 override int waitUninterruptible(Duration timeout, int reference_emit_count) { return doWait!false(timeout, reference_emit_count); } 868 869 void acquire() 870 { 871 auto task = Task.getThis(); 872 873 bool signal_exists; 874 875 size_t instance = instanceID; 876 if (s_eventWaiters.length <= instance) 877 expandWaiters(); 878 879 logTrace("Acquire event ID#%d", instance); 880 auto taskList = s_eventWaiters[instance]; 881 if (taskList.length > 0) 882 signal_exists = true; 883 884 if (!signal_exists) { 885 shared AsyncSignal sig = new shared AsyncSignal(getMainEventLoop()); 886 sig.run(&onSignal); 887 synchronized (m_mutex) ms_signals.insertBack(cast(void*)sig); 888 } 889 s_eventWaiters[instance].insertBack(Task.getThis()); 890 } 891 892 void release() 893 { 894 assert(amOwner(), "Releasing non-acquired signal."); 895 896 import std.algorithm : countUntil; 897 898 size_t instance = instanceID; 899 auto taskList = s_eventWaiters[instance]; 900 auto idx = taskList[].countUntil!((a, b) => a == b)(Task.getThis()); 901 logTrace("Release event ID#%d", instance); 902 s_eventWaiters[instance].linearRemove(taskList[idx .. idx+1]); 903 904 if (s_eventWaiters[instance].empty) { 905 removeMySignal(); 906 } 907 } 908 909 bool amOwner() 910 { 911 import std.algorithm : countUntil; 912 size_t instance = instanceID; 913 if (s_eventWaiters.length <= instance) return false; 914 auto taskList = s_eventWaiters[instance]; 915 if (taskList.length == 0) return false; 916 917 auto idx = taskList[].countUntil!((a, b) => a == b)(Task.getThis()); 918 919 return idx != -1; 920 } 921 922 override @property int emitCount() const { return atomicLoad(m_emitCount); } 923 924 private int doWait(bool INTERRUPTIBLE)(int reference_emit_count) 925 { 926 try { 927 assert(!amOwner()); 928 acquire(); 929 scope(exit) release(); 930 auto ec = this.emitCount; 931 while( ec == reference_emit_count ){ 932 //synchronized(m_mutex) logTrace("Waiting for event with signal count: " ~ ms_signals.length.to!string); 933 static if (INTERRUPTIBLE) getDriverCore().yieldForEvent(); 934 else getDriverCore().yieldForEventDeferThrow(); 935 ec = this.emitCount; 936 } 937 return ec; 938 } catch (Exception e) { 939 static if (!INTERRUPTIBLE) 940 assert(false, e.msg); // still some function calls not marked nothrow 941 else throw e; 942 } 943 } 944 945 private int doWait(bool INTERRUPTIBLE)(Duration timeout, int reference_emit_count) 946 { 947 static if (!INTERRUPTIBLE) scope (failure) assert(false); // still some function calls not marked nothrow 948 assert(!amOwner()); 949 acquire(); 950 scope(exit) release(); 951 auto tm = getEventDriver().createTimer(null); 952 scope (exit) getEventDriver().releaseTimer(tm); 953 getEventDriver().m_timers.getUserData(tm).owner = Task.getThis(); 954 getEventDriver().rearmTimer(tm, timeout, false); 955 956 auto ec = this.emitCount; 957 while (ec == reference_emit_count) { 958 static if (INTERRUPTIBLE) getDriverCore().yieldForEvent(); 959 else getDriverCore().yieldForEventDeferThrow(); 960 ec = this.emitCount; 961 if (!getEventDriver().isTimerPending(tm)) break; 962 } 963 return ec; 964 } 965 966 private void removeMySignal() { 967 import std.algorithm : countUntil; 968 synchronized(m_mutex) { 969 auto idx = ms_signals[].countUntil!((void* a, LibasyncManualEvent b) { return ((cast(shared AsyncSignal) a).owner == Thread.getThis() && this is b);})(this); 970 if (idx >= 0) 971 ms_signals.linearRemove(ms_signals[idx .. idx+1]); 972 } 973 } 974 975 private void expandWaiters() { 976 size_t maxID; 977 synchronized(gs_mutex) maxID = gs_maxID; 978 s_eventWaiters.reserve(maxID); 979 logTrace("gs_maxID: %d", maxID); 980 size_t s_ev_len = s_eventWaiters.length; 981 size_t s_ev_cap = s_eventWaiters.capacity; 982 assert(maxID > s_eventWaiters.length); 983 foreach (i; s_ev_len .. s_ev_cap) { 984 s_eventWaiters.insertBack(Array!Task.init); 985 } 986 } 987 988 private void onSignal() 989 { 990 logTrace("Got signal in onSignal"); 991 try { 992 auto thread = Thread.getThis(); 993 auto core = getDriverCore(); 994 995 size_t instance = instanceID; 996 logTrace("Got context: %d", instance); 997 foreach (Task task; s_eventWaiters[instance][]) { 998 logTrace("Task Found"); 999 core.resumeTask(task); 1000 } 1001 } catch (Exception e) { 1002 logError("Exception while handling signal event: %s", e.msg); 1003 try logDebug("Full error: %s", sanitize(e.msg)); 1004 catch (Exception) {} 1005 } 1006 } 1007 } 1008 1009 final class LibasyncTCPListener : TCPListener { 1010 @trusted: 1011 private { 1012 NetworkAddress m_local; 1013 void delegate(TCPConnection conn) @safe m_connectionCallback; 1014 TCPListenOptions m_options; 1015 AsyncTCPListener[] m_listeners; 1016 fd_t socket; 1017 } 1018 1019 this(NetworkAddress addr, void delegate(TCPConnection conn) @safe connection_callback, TCPListenOptions options) 1020 { 1021 m_connectionCallback = connection_callback; 1022 m_options = options; 1023 m_local = addr; 1024 void function(shared LibasyncTCPListener) init = (shared LibasyncTCPListener ctxt){ 1025 synchronized(ctxt) { 1026 LibasyncTCPListener ctxt2 = cast(LibasyncTCPListener)ctxt; 1027 AsyncTCPListener listener = new AsyncTCPListener(getMainEventLoop(), ctxt2.socket); 1028 listener.local = cast(NetworkAddressLA)ctxt2.m_local; 1029 1030 enforce(listener.run(&ctxt2.initConnection), "Failed to start listening to local socket: " ~ listener.error); 1031 ctxt2.socket = listener.socket; 1032 ctxt2.m_listeners ~= listener; 1033 } 1034 }; 1035 if (options & TCPListenOptions.distribute) runWorkerTaskDist(init, cast(shared) this); 1036 else init(cast(shared) this); 1037 1038 } 1039 1040 override @property NetworkAddress bindAddress() { return m_local; } 1041 1042 @property void delegate(TCPConnection) connectionCallback() { return m_connectionCallback; } 1043 1044 private void delegate(TCPEvent) initConnection(AsyncTCPConnection conn) { 1045 logTrace("Connection initialized in thread: " ~ Thread.getThis().name); 1046 1047 LibasyncTCPConnection native_conn = new LibasyncTCPConnection(conn, m_connectionCallback); 1048 native_conn.m_tcpImpl.conn = conn; 1049 native_conn.m_tcpImpl.localAddr = m_local; 1050 return &native_conn.handler; 1051 } 1052 1053 override void stopListening() 1054 { 1055 synchronized(this) { 1056 foreach (listener; m_listeners) { 1057 listener.kill(); 1058 listener = null; 1059 } 1060 } 1061 } 1062 } 1063 1064 final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ { 1065 @trusted: 1066 private { 1067 FixedRingBuffer!ubyte m_readBuffer; 1068 ubyte[] m_buffer; 1069 ubyte[] m_slice; 1070 TCPConnectionImpl m_tcpImpl; 1071 Settings m_settings; 1072 1073 bool m_closed = true; 1074 bool m_mustRecv = true; 1075 string m_error; 1076 1077 // The socket descriptor is unavailable to motivate low-level/API feature additions 1078 // rather than high-lvl platform-dependent hacking 1079 // fd_t socket; 1080 } 1081 1082 ubyte[] readChunk(ubyte[] buffer = null) 1083 { 1084 logTrace("readBuf TCP: %d", buffer.length); 1085 import std.algorithm : swap; 1086 ubyte[] ret; 1087 1088 if (m_slice.length > 0) { 1089 swap(ret, m_slice); 1090 logTrace("readBuf returned instantly with slice length: %d", ret.length); 1091 return ret; 1092 } 1093 1094 if (m_readBuffer.length > 0) { 1095 size_t amt = min(buffer.length, m_readBuffer.length); 1096 m_readBuffer.read(buffer[0 .. amt]); 1097 logTrace("readBuf returned with existing amount: %d", amt); 1098 return buffer[0 .. amt]; 1099 } 1100 1101 if (buffer) { 1102 m_buffer = buffer; 1103 m_readBuffer.dispose(); 1104 } 1105 1106 leastSize(); 1107 1108 swap(ret, m_slice); 1109 logTrace("readBuf returned with buffered length: %d", ret.length); 1110 return ret; 1111 } 1112 1113 this(AsyncTCPConnection conn, void delegate(TCPConnection) @safe cb) 1114 in { assert(conn !is null); } 1115 body { 1116 m_settings.onConnect = cb; 1117 m_readBuffer.capacity = 64*1024; 1118 } 1119 1120 private @property AsyncTCPConnection conn() { 1121 1122 return m_tcpImpl.conn; 1123 } 1124 1125 // Using this setting completely disables the internal buffers as well 1126 override @property void tcpNoDelay(bool enabled) 1127 { 1128 m_settings.tcpNoDelay = enabled; 1129 conn.setOption(TCPOption.NODELAY, enabled); 1130 } 1131 1132 override @property bool tcpNoDelay() const { return m_settings.tcpNoDelay; } 1133 1134 override @property void readTimeout(Duration dur) 1135 { 1136 m_settings.readTimeout = dur; 1137 conn.setOption(TCPOption.TIMEOUT_RECV, dur); 1138 } 1139 1140 override @property Duration readTimeout() const { return m_settings.readTimeout; } 1141 1142 override @property void keepAlive(bool enabled) 1143 { 1144 m_settings.keepAlive = enabled; 1145 conn.setOption(TCPOption.KEEPALIVE_ENABLE, enabled); 1146 } 1147 1148 override @property bool keepAlive() const { return m_settings.keepAlive; } 1149 1150 override @property bool connected() const { return !m_closed && m_tcpImpl.conn && m_tcpImpl.conn.isConnected; } 1151 1152 override @property bool dataAvailableForRead(){ 1153 logTrace("dataAvailableForRead"); 1154 m_settings.reader.acquire(); 1155 scope(exit) m_settings.reader.release(); 1156 return !readEmpty; 1157 } 1158 1159 private @property bool readEmpty() { 1160 return (m_buffer && !m_slice) || (!m_buffer && m_readBuffer.empty); 1161 } 1162 1163 override @property string peerAddress() const { return m_tcpImpl.conn.peer.toString(); } 1164 1165 override @property NetworkAddress localAddress() const { return m_tcpImpl.localAddr; } 1166 override @property NetworkAddress remoteAddress() const { return NetworkAddress(m_tcpImpl.conn.peer); } 1167 1168 override @property bool empty() { return leastSize == 0; } 1169 1170 override @property ulong leastSize() 1171 { 1172 logTrace("leastSize TCP"); 1173 m_settings.reader.acquire(); 1174 scope(exit) m_settings.reader.release(); 1175 1176 while( m_readBuffer.empty ){ 1177 if (!connected) 1178 return 0; 1179 m_settings.reader.noExcept = true; 1180 getDriverCore().yieldForEvent(); 1181 m_settings.reader.noExcept = false; 1182 } 1183 return (m_slice.length > 0) ? m_slice.length : m_readBuffer.length; 1184 } 1185 1186 override void close() 1187 { 1188 logTrace("Close TCP enter"); 1189 1190 // resume any reader, so that the read operation can be ended with a failure 1191 Task reader = m_settings.reader.task; 1192 while (m_settings.reader.isWaiting && reader.running) { 1193 logTrace("resuming reader first"); 1194 getDriverCore().yieldAndResumeTask(reader); 1195 } 1196 1197 // test if the connection is already closed 1198 if (m_closed) { 1199 logTrace("connection already closed."); 1200 return; 1201 } 1202 1203 //logTrace("closing"); 1204 m_settings.writer.acquire(); 1205 scope(exit) m_settings.writer.release(); 1206 1207 // checkConnected(); 1208 m_readBuffer.dispose(); 1209 onClose(null, false); 1210 } 1211 1212 override bool waitForData(Duration timeout = Duration.max) 1213 { 1214 // 0 seconds is max. CHanging this would be breaking, might as well use -1 for immediate 1215 if (timeout == 0.seconds) 1216 timeout = Duration.max; 1217 logTrace("WaitForData enter, timeout %s :: Ptr %s", timeout.toString(), (cast(void*)this).to!string); 1218 m_settings.reader.acquire(); 1219 auto _driver = getEventDriver(); 1220 auto tm = _driver.createTimer(null); 1221 scope(exit) { 1222 _driver.stopTimer(tm); 1223 _driver.releaseTimer(tm); 1224 m_settings.reader.release(); 1225 } 1226 _driver.m_timers.getUserData(tm).owner = Task.getThis(); 1227 if (timeout != Duration.max) _driver.rearmTimer(tm, timeout, false); 1228 logTrace("waitForData TCP"); 1229 while (m_readBuffer.empty) { 1230 if (!connected) return false; 1231 1232 if (m_mustRecv) 1233 onRead(); 1234 else { 1235 //logTrace("Yielding for event in waitForData, waiting? %s", m_settings.reader.isWaiting); 1236 m_settings.reader.noExcept = true; 1237 getDriverCore().yieldForEvent(); 1238 m_settings.reader.noExcept = false; 1239 } 1240 if (timeout != Duration.max && !_driver.isTimerPending(tm)) { 1241 logTrace("WaitForData TCP: timer signal"); 1242 return false; 1243 } 1244 } 1245 if (m_readBuffer.empty && !connected) return false; 1246 logTrace("WaitForData exit: fiber resumed with read buffer"); 1247 return !m_readBuffer.empty; 1248 } 1249 1250 override const(ubyte)[] peek() 1251 { 1252 logTrace("Peek TCP enter"); 1253 m_settings.reader.acquire(); 1254 scope(exit) m_settings.reader.release(); 1255 1256 if (!readEmpty) 1257 return (m_slice.length > 0) ? cast(const(ubyte)[]) m_slice : m_readBuffer.peek(); 1258 else 1259 return null; 1260 } 1261 1262 override size_t read(scope ubyte[] dst, IOMode) 1263 { 1264 if (!dst.length) return 0; 1265 assert(dst !is null && !m_slice); 1266 logTrace("Read TCP"); 1267 m_settings.reader.acquire(); 1268 size_t len = 0; 1269 scope(exit) m_settings.reader.release(); 1270 while( dst.length > 0 ){ 1271 while( m_readBuffer.empty ){ 1272 checkConnected(); 1273 if (m_mustRecv) 1274 onRead(); 1275 else { 1276 getDriverCore().yieldForEvent(); 1277 checkConnected(); 1278 } 1279 } 1280 size_t amt = min(dst.length, m_readBuffer.length); 1281 1282 m_readBuffer.read(dst[0 .. amt]); 1283 dst = dst[amt .. $]; 1284 len += amt; 1285 } 1286 1287 return len; 1288 } 1289 1290 override size_t write(in ubyte[] bytes_, IOMode) 1291 { 1292 assert(bytes_ !is null); 1293 logTrace("%s", "write enter"); 1294 m_settings.writer.acquire(); 1295 scope(exit) m_settings.writer.release(); 1296 checkConnected(); 1297 const(ubyte)[] bytes = bytes_; 1298 logTrace("TCP write with %s bytes called", bytes.length); 1299 1300 bool first = true; 1301 size_t offset; 1302 size_t len = bytes.length; 1303 do { 1304 if (!first) { 1305 getDriverCore().yieldForEvent(); 1306 } 1307 checkConnected(); 1308 offset += conn.send(bytes[offset .. $]); 1309 1310 if (conn.hasError) { 1311 throw new Exception(conn.error); 1312 } 1313 first = false; 1314 } while (offset != len); 1315 1316 return len; 1317 } 1318 1319 override void flush() 1320 { 1321 logTrace("%s", "Flush"); 1322 m_settings.writer.acquire(); 1323 scope(exit) m_settings.writer.release(); 1324 1325 checkConnected(); 1326 1327 } 1328 1329 override void finalize() 1330 { 1331 logTrace("%s", "finalize"); 1332 flush(); 1333 } 1334 1335 private void checkConnected() 1336 { 1337 enforce(connected, "The remote peer has closed the connection."); 1338 logTrace("Check Connected"); 1339 } 1340 1341 private bool tryReadBuf() { 1342 //logTrace("TryReadBuf with m_buffer: %s", m_buffer.length); 1343 if (m_buffer) { 1344 ubyte[] buf = m_buffer[m_slice.length .. $]; 1345 uint ret = conn.recv(buf); 1346 logTrace("Received: %s", buf[0 .. ret]); 1347 // check for overflow 1348 if (ret == buf.length) { 1349 logTrace("Overflow detected, revert to ring buffer"); 1350 m_slice = null; 1351 m_readBuffer.capacity = 64*1024; 1352 m_readBuffer.put(buf); 1353 m_buffer = null; 1354 return false; // cancel slices and revert to the fixed ring buffer 1355 } 1356 1357 if (m_slice.length > 0) { 1358 //logDebug("post-assign m_slice "); 1359 m_slice = m_slice.ptr[0 .. m_slice.length + ret]; 1360 } 1361 else { 1362 //logDebug("using m_buffer"); 1363 m_slice = m_buffer[0 .. ret]; 1364 } 1365 return true; 1366 } 1367 logTrace("TryReadBuf exit with %d bytes in m_slice, %d bytes in m_readBuffer ", m_slice.length, m_readBuffer.length); 1368 1369 return false; 1370 } 1371 1372 private void onRead() { 1373 m_mustRecv = true; // assume we didn't receive everything 1374 1375 if (tryReadBuf()) { 1376 m_mustRecv = false; 1377 return; 1378 } 1379 1380 assert(!m_slice); 1381 1382 logTrace("OnRead with %s", m_readBuffer.freeSpace); 1383 1384 while( m_readBuffer.freeSpace > 0 ) { 1385 ubyte[] dst = m_readBuffer.peekDst(); 1386 assert(dst.length <= int.max); 1387 logTrace("Try to read up to bytes: %s", dst.length); 1388 bool read_more; 1389 do { 1390 uint ret = conn.recv(dst); 1391 if( ret > 0 ){ 1392 logTrace("received bytes: %s", ret); 1393 m_readBuffer.putN(ret); 1394 } 1395 read_more = ret == dst.length; 1396 // ret == 0! let's look for some errors 1397 if (read_more) { 1398 if (m_readBuffer.freeSpace == 0) 1399 m_readBuffer.capacity = m_readBuffer.capacity*2; 1400 dst = m_readBuffer.peekDst(); 1401 } 1402 } while( read_more ); 1403 if (conn.status.code == Status.ASYNC) { 1404 m_mustRecv = false; // we'll have to wait 1405 break; // the kernel's buffer is empty 1406 } 1407 // ret == 0! let's look for some errors 1408 else if (conn.status.code == Status.ASYNC) { 1409 m_mustRecv = false; // we'll have to wait 1410 break; // the kernel's buffer is empty 1411 } 1412 else if (conn.status.code != Status.OK) { 1413 // We have a read error and the socket may now even be closed... 1414 auto err = conn.error; 1415 1416 logTrace("receive error %s %s", err, conn.status.code); 1417 throw new Exception("Socket error: " ~ conn.status.code.to!string); 1418 } 1419 else { 1420 m_mustRecv = false; 1421 break; 1422 } 1423 } 1424 logTrace("OnRead exit with free bytes: %s", m_readBuffer.freeSpace); 1425 } 1426 1427 /* The AsyncTCPConnection object will be automatically disposed when this returns. 1428 * We're given some time to cleanup. 1429 */ 1430 private void onClose(in string msg = null, bool wake_ex = true) { 1431 logTrace("onClose"); 1432 1433 if (msg) 1434 m_error = msg; 1435 if (!m_closed) { 1436 1437 m_closed = true; 1438 1439 if (m_tcpImpl.conn && m_tcpImpl.conn.isConnected) { 1440 m_tcpImpl.conn.kill(Task.getThis() != Task.init); // close the connection 1441 m_tcpImpl.conn = null; 1442 } 1443 } 1444 if (Task.getThis() != Task.init) { 1445 return; 1446 } 1447 Exception ex; 1448 if (!msg && wake_ex) 1449 ex = new Exception("Connection closed"); 1450 else if (wake_ex) ex = new Exception(msg); 1451 1452 1453 Task reader = m_settings.reader.task; 1454 Task writer = m_settings.writer.task; 1455 1456 bool hasUniqueReader = m_settings.reader.isWaiting; 1457 bool hasUniqueWriter = m_settings.writer.isWaiting && reader != writer; 1458 1459 if (hasUniqueWriter && Task.getThis() != writer && wake_ex) { 1460 getDriverCore().resumeTask(writer, ex); 1461 } 1462 if (hasUniqueReader && Task.getThis() != reader) { 1463 getDriverCore().resumeTask(reader, m_settings.reader.noExcept?null:ex); 1464 } 1465 } 1466 1467 void onConnect() { 1468 scope(failure) onClose(); 1469 1470 if (m_tcpImpl.conn && m_tcpImpl.conn.isConnected) 1471 { 1472 bool inbound = m_tcpImpl.conn.inbound; 1473 1474 try m_settings.onConnect(this); 1475 catch ( Exception e) { 1476 //logError(e.toString); 1477 throw e; 1478 } 1479 catch ( Throwable e) { 1480 logError("%s", e.toString); 1481 throw e; 1482 } 1483 if (inbound) close(); 1484 } 1485 logTrace("Finished callback"); 1486 } 1487 1488 void handler(TCPEvent ev) { 1489 logTrace("Handler"); 1490 Exception ex; 1491 final switch (ev) { 1492 case TCPEvent.CONNECT: 1493 m_closed = false; 1494 // read & write are guaranteed to be successful on any platform at this point 1495 1496 if (m_tcpImpl.conn.inbound) 1497 runTask(&onConnect); 1498 else onConnect(); 1499 m_settings.onConnect = null; 1500 break; 1501 case TCPEvent.READ: 1502 // fill the read buffer and resume any task if waiting 1503 try onRead(); 1504 catch (Exception e) ex = e; 1505 if (m_settings.reader.isWaiting) 1506 getDriverCore().resumeTask(m_settings.reader.task, ex); 1507 goto case TCPEvent.WRITE; // sometimes the kernel notified write with read events 1508 case TCPEvent.WRITE: 1509 // The kernel is ready to have some more data written, all we need to do is wake up the writer 1510 if (m_settings.writer.isWaiting) 1511 getDriverCore().resumeTask(m_settings.writer.task, ex); 1512 break; 1513 case TCPEvent.CLOSE: 1514 m_closed = false; 1515 onClose(); 1516 if (m_settings.onConnect) 1517 m_settings.onConnect(this); 1518 m_settings.onConnect = null; 1519 break; 1520 case TCPEvent.ERROR: 1521 m_closed = false; 1522 onClose(conn.error); 1523 if (m_settings.onConnect) 1524 m_settings.onConnect(this); 1525 m_settings.onConnect = null; 1526 break; 1527 } 1528 return; 1529 } 1530 1531 struct Waiter { 1532 Task task; // we can only have one task waiting for read/write operations 1533 bool isWaiting; // if a task is actively waiting 1534 bool noExcept; 1535 1536 void acquire() { 1537 assert(!this.isWaiting, "Acquiring waiter that is already in use."); 1538 if (Task.getThis() == Task()) return; 1539 logTrace("%s", "Acquire waiter"); 1540 assert(!amOwner(), "Failed to acquire waiter in task: " ~ Task.getThis().fiber.to!string ~ ", it was busy with: " ~ this.task.to!string); 1541 this.task = Task.getThis(); 1542 this.isWaiting = true; 1543 } 1544 1545 void release() { 1546 if (Task.getThis() == Task()) return; 1547 logTrace("%s", "Release waiter"); 1548 assert(amOwner()); 1549 this.isWaiting = false; 1550 } 1551 1552 bool amOwner() const { 1553 if (this.isWaiting && this.task == Task.getThis()) 1554 return true; 1555 return false; 1556 } 1557 } 1558 1559 struct Settings { 1560 void delegate(TCPConnection) onConnect; 1561 Duration readTimeout; 1562 bool keepAlive; 1563 bool tcpNoDelay; 1564 Waiter reader; 1565 Waiter writer; 1566 } 1567 1568 struct TCPConnectionImpl { 1569 NetworkAddress localAddr; 1570 AsyncTCPConnection conn; 1571 } 1572 } 1573 1574 int total_conn; 1575 1576 final class LibasyncUDPConnection : UDPConnection { 1577 @trusted: 1578 private { 1579 Task m_task; 1580 AsyncUDPSocket m_udpImpl; 1581 bool m_canBroadcast; 1582 NetworkAddressLA m_peer; 1583 1584 bool m_waiting; 1585 } 1586 1587 private @property AsyncUDPSocket socket() { 1588 return m_udpImpl; 1589 } 1590 1591 this(AsyncUDPSocket conn) 1592 in { assert(conn !is null); } 1593 body { 1594 m_udpImpl = conn; 1595 } 1596 1597 override @property string bindAddress() const { 1598 1599 return m_udpImpl.local.toAddressString(); 1600 } 1601 1602 override @property NetworkAddress localAddress() const { return NetworkAddress(m_udpImpl.local); } 1603 1604 override @property bool canBroadcast() const { return m_canBroadcast; } 1605 override @property void canBroadcast(bool val) 1606 { 1607 socket.broadcast(val); 1608 m_canBroadcast = val; 1609 } 1610 1611 override void close() 1612 { 1613 socket.kill(); 1614 m_udpImpl = null; 1615 } 1616 1617 bool amOwner() { 1618 return m_task != Task() && m_task == Task.getThis(); 1619 } 1620 1621 void acquire() 1622 { 1623 assert(m_task == Task(), "Trying to acquire a UDP socket that is currently owned."); 1624 m_task = Task.getThis(); 1625 } 1626 1627 void release() 1628 { 1629 assert(m_task != Task(), "Trying to release a UDP socket that is not owned."); 1630 assert(m_task == Task.getThis(), "Trying to release a foreign UDP socket."); 1631 m_task = Task(); 1632 } 1633 1634 override void connect(string host, ushort port) 1635 { 1636 // assert(m_peer == NetworkAddress.init, "Cannot connect to another peer"); 1637 NetworkAddress addr = getEventDriver().resolveHost(host, localAddress.family, true); 1638 addr.port = port; 1639 connect(addr); 1640 } 1641 1642 override void connect(NetworkAddress addr) 1643 { 1644 m_peer = cast(NetworkAddressLA)addr; 1645 } 1646 1647 override void send(in ubyte[] data, in NetworkAddress* peer_address = null) 1648 { 1649 assert(data.length <= int.max); 1650 uint ret; 1651 size_t retries = 3; 1652 foreach (i; 0 .. retries) { 1653 if( peer_address ){ 1654 auto pa = cast(NetworkAddressLA)*cast(NetworkAddress*)peer_address; 1655 ret = socket.sendTo(data, pa); 1656 } else { 1657 ret = socket.sendTo(data, m_peer); 1658 } 1659 if (socket.status.code == Status.ASYNC) { 1660 m_waiting = true; 1661 getDriverCore().yieldForEvent(); 1662 } 1663 else break; 1664 } 1665 1666 logTrace("send ret: %s, %s", ret, socket.status.text); 1667 enforce(socket.status.code == Status.OK, "Error sending UDP packet: " ~ socket.status.text); 1668 1669 enforce(ret == data.length, "Unable to send full packet."); 1670 } 1671 1672 override ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null) 1673 { 1674 return recv(Duration.max, buf, peer_address); 1675 } 1676 1677 override ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null) 1678 { 1679 size_t tm = size_t.max; 1680 auto m_driver = getEventDriver(); 1681 if (timeout != Duration.max && timeout > 0.seconds) { 1682 tm = m_driver.createTimer(null); 1683 m_driver.rearmTimer(tm, timeout, false); 1684 m_driver.acquireTimer(tm); 1685 } 1686 1687 acquire(); 1688 scope(exit) { 1689 release(); 1690 if (tm != size_t.max) m_driver.releaseTimer(tm); 1691 } 1692 1693 assert(buf.length <= int.max); 1694 if( buf.length == 0 ) buf.length = 65507; 1695 NetworkAddressLA from; 1696 from.family = localAddress.family; 1697 while(true){ 1698 auto ret = socket.recvFrom(buf, from); 1699 if( ret > 0 ){ 1700 if( peer_address ) *peer_address = NetworkAddress(from); 1701 return buf[0 .. ret]; 1702 } 1703 else if( socket.status.code != Status.OK ){ 1704 auto err = socket.status.text; 1705 logDebug("UDP recv err: %s", err); 1706 enforce(socket.status.code == Status.ASYNC, "Error receiving UDP packet"); 1707 1708 if (timeout != Duration.max) { 1709 enforce(timeout > 0.seconds && m_driver.isTimerPending(tm), "UDP receive timeout."); 1710 } 1711 } 1712 m_waiting = true; 1713 getDriverCore().yieldForEvent(); 1714 } 1715 } 1716 1717 void addMembership(ref NetworkAddress multiaddr) 1718 { 1719 assert(false, "TODO!"); 1720 } 1721 1722 @property void multicastLoopback(bool loop) 1723 { 1724 assert(false, "TODO!"); 1725 } 1726 1727 private void handler(UDPEvent ev) 1728 { 1729 logTrace("UDPConnection %p event", this); 1730 1731 Exception ex; 1732 final switch (ev) { 1733 case UDPEvent.READ: 1734 if (m_waiting) { 1735 m_waiting = false; 1736 getDriverCore().resumeTask(m_task, null); 1737 } 1738 break; 1739 case UDPEvent.WRITE: 1740 if (m_waiting) { 1741 m_waiting = false; 1742 getDriverCore().resumeTask(m_task, null); 1743 } 1744 break; 1745 case UDPEvent.ERROR: 1746 getDriverCore.resumeTask(m_task, new Exception(socket.error)); 1747 break; 1748 } 1749 1750 } 1751 } 1752 1753 1754 1755 /* The following is used for LibasyncManualEvent */ 1756 1757 import std.container : Array; 1758 Array!(Array!Task) s_eventWaiters; // Task list in the current thread per instance ID 1759 __gshared Array!size_t gs_availID; 1760 __gshared size_t gs_maxID; 1761 __gshared core.sync.mutex.Mutex gs_mutex; 1762 1763 private size_t generateID() 1764 nothrow @trusted { 1765 size_t idx; 1766 import std.algorithm : max; 1767 try { 1768 size_t getIdx() { 1769 if (!gs_availID.empty) { 1770 immutable size_t ret = gs_availID.back; 1771 gs_availID.removeBack(); 1772 return ret; 1773 } 1774 return 0; 1775 } 1776 1777 synchronized(gs_mutex) { 1778 idx = getIdx(); 1779 if (idx == 0) { 1780 import std.range : iota; 1781 gs_availID.insert( iota(gs_maxID + 1, max(32, gs_maxID * 2 + 1), 1) ); 1782 gs_maxID = gs_availID[$-1]; 1783 idx = getIdx(); 1784 } 1785 } 1786 } catch (Exception e) { 1787 assert(false, "Failed to generate necessary ID for Manual Event waiters: " ~ e.msg); 1788 } 1789 1790 return idx - 1; 1791 } 1792 1793 void recycleID(size_t id) 1794 @trusted nothrow { 1795 try { 1796 synchronized(gs_mutex) gs_availID.insert(id+1); 1797 } 1798 catch (Exception e) { 1799 assert(false, "Error destroying Manual Event ID: " ~ id.to!string ~ " [" ~ e.msg ~ "]"); 1800 } 1801 }