1 /** 2 TCP side of the libevent driver 3 4 For the base driver implementation, see `vibe.core.drivers.libevent2`. 5 6 Copyright: © 2012-2015 RejectedSoftware e.K. 7 Authors: Sönke Ludwig 8 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 9 */ 10 module vibe.core.drivers.libevent2_tcp; 11 12 version(VibeLibeventDriver) 13 { 14 15 public import vibe.core.core; 16 17 import vibe.core.log; 18 import vibe.core.drivers.libevent2; 19 import vibe.core.drivers.utils; 20 import vibe.internal.freelistref; 21 22 import deimos.event2.buffer; 23 import deimos.event2.bufferevent; 24 import deimos.event2.bufferevent_ssl; 25 import deimos.event2.event; 26 import deimos.event2.util; 27 28 import std.algorithm; 29 import std.encoding : sanitize; 30 import std.exception; 31 import std.conv; 32 import std.string; 33 34 import core.stdc.errno; 35 import core.thread; 36 import core.sys.posix.netinet.in_; 37 import core.sys.posix.netinet.tcp; 38 import core.sys.posix.sys.socket; 39 40 41 private { 42 version (Windows) { 43 import core.sys.windows.winsock2; 44 // make some neccessary parts of the socket interface public 45 alias in6_addr = core.sys.windows.winsock2.in6_addr; 46 alias INADDR_ANY = core.sys.windows.winsock2.INADDR_ANY; 47 alias IN6ADDR_ANY = core.sys.windows.winsock2.IN6ADDR_ANY; 48 49 enum EWOULDBLOCK = WSAEWOULDBLOCK; 50 } else { 51 alias in6_addr = core.sys.posix.netinet.in_.in6_addr; 52 alias IN6ADDR_ANY = core.sys.posix.netinet.in_.in6addr_any; 53 alias INADDR_ANY = core.sys.posix.netinet.in_.INADDR_ANY; 54 alias TCP_NODELAY = core.sys.posix.netinet.tcp.TCP_NODELAY; 55 } 56 } 57 58 package final class Libevent2TCPConnection : TCPConnection { 59 @safe: 60 61 import vibe.utils.array : FixedRingBuffer; 62 private { 63 bool m_timeout_triggered; 64 TCPContext* m_ctx; 65 FixedRingBuffer!(ubyte, 4096, false) m_readBuffer; 66 string m_peerAddress; 67 bool m_tcpNoDelay = false; 68 bool m_tcpKeepAlive = false; 69 Duration m_readTimeout; 70 char[64] m_peerAddressBuf; 71 NetworkAddress m_localAddress, m_remoteAddress; 72 event* m_waitDataEvent; 73 } 74 75 this(TCPContext* ctx) 76 { 77 m_ctx = ctx; 78 m_waitDataEvent = () @trusted { return event_new(m_ctx.eventLoop, -1, 0, &onTimeout, cast(void*)this); } (); 79 80 assert(!amOwner()); 81 82 m_localAddress = ctx.local_addr; 83 m_remoteAddress = ctx.remote_addr; 84 85 void* ptr; 86 switch (ctx.remote_addr.family) { 87 default: throw new Exception("Unsupported address family."); 88 case AF_INET: ptr = &ctx.remote_addr.sockAddrInet4.sin_addr; break; 89 case AF_INET6: ptr = &ctx.remote_addr.sockAddrInet6.sin6_addr; break; 90 version (Posix) { 91 case AF_UNIX: ptr = &ctx.remote_addr.sockAddrUnix.sun_path; break; 92 } 93 } 94 95 if (() @trusted { return evutil_inet_ntop(ctx.remote_addr.family, ptr, m_peerAddressBuf.ptr, m_peerAddressBuf.length); } () !is null) 96 m_peerAddress = () @trusted { return cast(string)m_peerAddressBuf[0 .. m_peerAddressBuf[].indexOf('\0')]; } (); 97 98 () @trusted { 99 bufferevent_setwatermark(m_ctx.event, EV_WRITE, 4096, 65536); 100 bufferevent_setwatermark(m_ctx.event, EV_READ, 0, 65536); 101 } (); 102 } 103 104 /*~this() 105 { 106 //assert(m_ctx is null, "Leaking TCPContext because it has not been cleaned up and we are not allowed to touch the GC in finalizers.."); 107 }*/ 108 109 @property void tcpNoDelay(bool enabled) 110 { 111 m_tcpNoDelay = enabled; 112 auto fd = m_ctx.socketfd; 113 int opt = enabled; 114 assert(fd <= int.max, "Socket descriptor > int.max"); 115 () @trusted { setsockopt(cast(int)fd, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } (); 116 } 117 @property bool tcpNoDelay() const { return m_tcpNoDelay; } 118 119 @property void readTimeout(Duration v) 120 { 121 m_readTimeout = v; 122 if( v == dur!"seconds"(0) ){ 123 () @trusted { bufferevent_set_timeouts(m_ctx.event, null, null); } (); 124 } else { 125 assert(v.total!"seconds" <= int.max); 126 timeval toread = v.toTimeVal(); 127 () @trusted { bufferevent_set_timeouts(m_ctx.event, &toread, null); } (); 128 } 129 } 130 @property Duration readTimeout() const { return m_readTimeout; } 131 132 @property void keepAlive(bool enable) 133 { 134 m_tcpKeepAlive = enable; 135 auto fd = m_ctx.socketfd; 136 ubyte opt = enable; 137 assert(fd <= int.max, "Socket descriptor > int.max"); 138 () @trusted { setsockopt(cast(int)fd, SOL_SOCKET, SO_KEEPALIVE, &opt, opt.sizeof); } (); 139 } 140 @property bool keepAlive() const { return m_tcpKeepAlive; } 141 142 @property NetworkAddress localAddress() const { return m_localAddress; } 143 @property NetworkAddress remoteAddress() const { return m_remoteAddress; } 144 145 private void acquire() 146 @safe { 147 assert(m_ctx, "Trying to acquire a closed TCP connection."); 148 assert(m_ctx.readOwner == Task() && m_ctx.writeOwner == Task(), "Trying to acquire a TCP connection that is currently owned."); 149 m_ctx.readOwner = m_ctx.writeOwner = Task.getThis(); 150 } 151 152 private void release() 153 @safe { 154 if( !m_ctx ) return; 155 assert(m_ctx.readOwner != Task() && m_ctx.writeOwner != Task(), "Trying to release a TCP connection that is not owned."); 156 assert(m_ctx.readOwner == Task.getThis() && m_ctx.readOwner == m_ctx.writeOwner, "Trying to release a foreign TCP connection."); 157 m_ctx.readOwner = m_ctx.writeOwner = Task(); 158 } 159 160 private bool amOwner() 161 @safe { 162 return m_ctx !is null && m_ctx.readOwner != Task() && m_ctx.readOwner == Task.getThis() && m_ctx.readOwner == m_ctx.writeOwner; 163 } 164 165 /// Closes the connection. 166 void close() 167 { 168 logDebug("TCP close request %s %s", m_ctx !is null, m_ctx ? m_ctx.state : ConnectionState.open); 169 if (!m_ctx || m_ctx.state == ConnectionState.activeClose) return; 170 171 if (!getThreadLibeventEventLoop()) { 172 import std.stdio; 173 () @trusted { stderr.writefln("Warning: Attempt to close dangling TCP connection to %s at shutdown. " 174 ~ "Please avoid closing connections in GC finalizers.", m_remoteAddress); } (); 175 return; 176 } 177 178 // set the closing flag 179 m_ctx.state = ConnectionState.activeClose; 180 181 // resume any reader, so that the read operation can be ended with a failure 182 while (m_ctx.readOwner != Task.init) { 183 logTrace("resuming reader first"); 184 m_ctx.core.yieldAndResumeTask(m_ctx.readOwner); 185 logTrace("back (%s)!", m_ctx !is null); 186 // test if the resumed task has already closed the connection 187 if (!m_ctx) return; 188 } 189 190 // acquire read+write access 191 acquire(); 192 193 scope (exit) cleanup(); 194 195 if (m_ctx.event) { 196 logDiagnostic("Actively closing TCP connection"); 197 auto fd = m_ctx.socketfd; 198 199 scope (exit) () @trusted { 200 version(Windows) shutdown(m_ctx.socketfd, SD_SEND); 201 else shutdown(m_ctx.socketfd, SHUT_WR); 202 if (m_ctx.event) bufferevent_free(m_ctx.event); 203 logTrace("...socket %d closed.", fd); 204 } (); 205 206 m_ctx.shutdown = true; 207 () @trusted { 208 bufferevent_setwatermark(m_ctx.event, EV_WRITE, 1, 0); 209 bufferevent_flush(m_ctx.event, EV_WRITE, bufferevent_flush_mode.BEV_FINISHED); 210 } (); 211 logTrace("Closing socket %d...", fd); 212 auto buf = () @trusted { return bufferevent_get_output(m_ctx.event); } (); 213 while (m_ctx.event && () @trusted { return evbuffer_get_length(buf); } () > 0) 214 m_ctx.core.yieldForEvent(); 215 } 216 } 217 218 /// The 'connected' status of this connection 219 @property bool connected() const { return m_ctx !is null && m_ctx.state == ConnectionState.open && m_ctx.event !is null; } 220 221 @property bool empty() { return leastSize == 0; } 222 223 @property ulong leastSize() 224 { 225 if (!m_ctx || !m_ctx.event) return 0; 226 if (m_readBuffer.length) { 227 checkReader(); 228 return m_readBuffer.length; 229 } 230 acquireReader(); 231 scope(exit) releaseReader(); 232 fillReadBuffer(true, false); 233 return m_readBuffer.length; 234 } 235 236 @property bool dataAvailableForRead() 237 { 238 if (!m_ctx || !m_ctx.event) return false; 239 checkReader(); 240 if (!m_readBuffer.length) 241 fillReadBuffer(false); 242 return m_readBuffer.length > 0; 243 } 244 245 @property string peerAddress() const { return m_peerAddress; } 246 247 const(ubyte)[] peek() 248 { 249 if (!m_ctx || !m_ctx.event) return null; 250 checkReader(); 251 if (!m_readBuffer.length) 252 fillReadBuffer(false); 253 return m_readBuffer.peek(); 254 } 255 256 void skip(ulong count) 257 { 258 checkConnected(false); 259 260 if (m_readBuffer.length >= count) { 261 checkReader(); 262 m_readBuffer.popFrontN(cast(size_t)count); 263 if (m_readBuffer.empty) m_readBuffer.clear(); // start filling at index 0 again 264 return; 265 } 266 267 acquireReader(); 268 scope(exit) releaseReader(); 269 270 while (true) { 271 auto nbytes = min(count, m_readBuffer.length); 272 m_readBuffer.popFrontN(nbytes); 273 if (m_readBuffer.empty) m_readBuffer.clear(); // start filling at index 0 again 274 count -= nbytes; 275 276 if (!count) break; 277 278 fillReadBuffer(true); 279 checkConnected(false); 280 } 281 } 282 283 /** Reads as many bytes as 'dst' can hold. 284 */ 285 size_t read(scope ubyte[] dst, IOMode) 286 { 287 checkConnected(false); 288 289 if (m_readBuffer.length >= dst.length) { 290 checkReader(); 291 m_readBuffer.read(dst); 292 if (m_readBuffer.empty) m_readBuffer.clear(); // start filling at index 0 again 293 return dst.length; 294 } 295 296 acquireReader(); 297 scope(exit) releaseReader(); 298 299 size_t len = dst.length; 300 301 while (true) { 302 auto nbytes = min(dst.length, m_readBuffer.length); 303 m_readBuffer.read(dst[0 .. nbytes]); 304 if (m_readBuffer.empty) m_readBuffer.clear(); // start filling at index 0 again 305 dst = dst[nbytes .. $]; 306 307 if (!dst.length) break; 308 309 fillReadBuffer(true); 310 checkConnected(false); 311 } 312 logTrace("read data"); 313 314 return len; 315 } 316 317 bool waitForData(Duration timeout) 318 { 319 if (timeout == 0.seconds) 320 logDebug("Warning: use Duration.max as an argument to waitForData() to wait infinitely, not 0.seconds."); 321 322 if (dataAvailableForRead) return true; 323 if (!m_ctx || m_ctx.state != ConnectionState.open) return false; 324 325 acquireReader(); 326 scope(exit) releaseReader(); 327 m_timeout_triggered = false; 328 329 if (timeout != 0.seconds && timeout != Duration.max) { // 0.seconds is for compatibility with old code 330 assert(timeout.total!"seconds"() <= int.max, "Timeouts must not be larger than int.max seconds!"); 331 timeval t = timeout.toTimeVal(); 332 logTrace("add timeout event with %d/%d", t.tv_sec, t.tv_usec); 333 () @trusted { event_add(m_waitDataEvent, &t); } (); 334 } 335 336 logTrace("wait for data"); 337 while (m_ctx && m_ctx.event) { 338 if (m_readBuffer.length) return true; 339 if (m_ctx.state != ConnectionState.open) return false; 340 try { 341 if (fillReadBuffer(true, false, true)) 342 return false; 343 } catch (Exception e) { 344 logDiagnostic("Connection error during waitForData: %s", e.msg); 345 } 346 } 347 348 return false; 349 } 350 351 alias write = Stream.write; 352 353 /** Writes the given byte array. 354 */ 355 size_t write(in ubyte[] bytes, IOMode) 356 { 357 checkConnected(); 358 acquireWriter(); 359 scope(exit) releaseWriter(); 360 361 if (!bytes.length) return 0; 362 //logTrace("evbuffer_add (fd %d): %s", m_ctx.socketfd, bytes); 363 //logTrace("evbuffer_add (fd %d): <%s>", m_ctx.socketfd, cast(string)bytes); 364 logTrace("evbuffer_add (fd %d): %d B", m_ctx.socketfd, bytes.length); 365 auto outbuf = () @trusted { return bufferevent_get_output(m_ctx.event); } (); 366 if (() @trusted { return bufferevent_write(m_ctx.event, cast(char*)bytes.ptr, bytes.length); } () != 0 ) 367 throw new Exception("Failed to write data to buffer"); 368 369 // wait for the data to be written up the the low watermark 370 while (() @trusted { return evbuffer_get_length(outbuf); } () > 4096) { 371 rawYield(); 372 checkConnected(); 373 } 374 375 return bytes.length; 376 } 377 378 /** Causes any buffered data to be written. 379 */ 380 void flush() 381 { 382 checkConnected(); 383 acquireWriter(); 384 scope(exit) releaseWriter(); 385 logTrace("bufferevent_flush"); 386 () @trusted { bufferevent_flush(m_ctx.event, EV_WRITE, bufferevent_flush_mode.BEV_NORMAL); } (); 387 } 388 389 void finalize() 390 { 391 flush(); 392 } 393 394 private bool fillReadBuffer(bool block, bool throw_on_fail = true, bool wait_for_timeout = false) 395 @safe { 396 if (m_readBuffer.length) return false; 397 m_readBuffer.clear(); 398 assert(m_readBuffer.peekDst.length > 0); 399 while (m_ctx && m_ctx.event) { 400 auto nbytes = () @trusted { return bufferevent_read(m_ctx.event, m_readBuffer.peekDst.ptr, m_readBuffer.peekDst.length); } (); 401 m_readBuffer.putN(nbytes); 402 if (m_readBuffer.length || !block) break; 403 if (throw_on_fail) checkConnected(false); 404 else if (!m_ctx || !m_ctx.event) return false; 405 else if (m_ctx.state != ConnectionState.open 406 && () @trusted { return evbuffer_get_length(bufferevent_get_input(m_ctx.event)); } () == 0) 407 return false; 408 if (wait_for_timeout && m_timeout_triggered) return true; 409 m_ctx.core.yieldForEvent(); 410 } 411 return false; 412 } 413 414 private void checkReader() @safe { assert(m_ctx.readOwner == Task(), "Acquiring reader of already owned connection."); } 415 private void acquireReader() @safe { checkReader(); m_ctx.readOwner = Task.getThis(); } 416 private void releaseReader() @safe { if (!m_ctx) return; assert(m_ctx.readOwner == Task.getThis(), "Releasing reader of unowned connection."); m_ctx.readOwner = Task(); } 417 418 private void acquireWriter() @safe { assert(m_ctx.writeOwner == Task(), "Acquiring writer of already owned connection."); m_ctx.writeOwner = Task.getThis(); } 419 private void releaseWriter() @safe { if (!m_ctx) return; assert(m_ctx.writeOwner == Task.getThis(), "Releasing reader of already unowned connection."); m_ctx.writeOwner = Task(); } 420 421 private void checkConnected(bool write = true) 422 @safe { 423 enforce(m_ctx !is null, "Operating on closed TCPConnection."); 424 if (m_ctx.event is null) { 425 cleanup(); 426 throw new Exception(format("Connection error while %s TCPConnection.", write ? "writing to" : "reading from")); 427 } 428 if (m_ctx.state == ConnectionState.activeClose) throw new Exception("Connection was actively closed."); 429 enforce (!write || m_ctx.state == ConnectionState.open, "Remote hung up while writing to TCPConnection."); 430 if (!write && m_ctx.state == ConnectionState.passiveClose) { 431 auto buf = () @trusted { return bufferevent_get_input(m_ctx.event); } (); 432 auto data_left = m_readBuffer.length > 0 || () @trusted { return evbuffer_get_length(buf); } () > 0; 433 enforce(data_left, "Remote hung up while reading from TCPConnection."); 434 } 435 } 436 437 private void cleanup() 438 @safe { 439 () @trusted { 440 event_free(m_waitDataEvent); 441 TCPContextAlloc.free(m_ctx); 442 } (); 443 m_ctx = null; 444 } 445 } 446 447 final class Libevent2TCPListener : TCPListener { 448 @safe: 449 450 private { 451 TCPContext*[] m_ctx; 452 NetworkAddress m_bindAddress; 453 } 454 455 this(NetworkAddress bind_address) 456 { 457 m_bindAddress = bind_address; 458 } 459 460 @property NetworkAddress bindAddress() 461 { 462 return m_bindAddress; 463 } 464 465 void addContext(TCPContext* ctx) 466 { 467 synchronized(this) m_ctx ~= ctx; 468 } 469 470 void stopListening() 471 { 472 synchronized(this) 473 { 474 foreach (ctx; m_ctx) () @trusted { 475 event_free(ctx.listenEvent); 476 evutil_closesocket(ctx.socketfd); 477 TCPContextAlloc.free(ctx); 478 } (); 479 m_ctx = null; 480 } 481 } 482 } 483 484 485 /**************************************************************************************************/ 486 /* Private types */ 487 /**************************************************************************************************/ 488 489 package struct TCPContext 490 { 491 @safe: 492 493 this(DriverCore c, event_base* evbase, int sock, bufferevent* evt, NetworkAddress bindaddr, NetworkAddress peeraddr){ 494 core = c; 495 eventLoop = evbase; 496 socketfd = sock; 497 event = evt; 498 local_addr = bindaddr; 499 remote_addr = peeraddr; 500 } 501 502 this(DriverCore c, event_base* evbase, int sock, bufferevent* evt){ 503 core = c; 504 eventLoop = evbase; 505 socketfd = sock; 506 event = evt; 507 } 508 509 ~this() 510 { 511 magic__ = 0; 512 } 513 514 void checkForException() { 515 if (auto ex = this.exception) { 516 this.exception = null; 517 throw ex; 518 } 519 } 520 521 enum MAGIC = 0x1F3EC272; 522 uint magic__ = MAGIC; 523 DriverCore core; 524 event_base* eventLoop; 525 void delegate(TCPConnection conn) connectionCallback; 526 bufferevent* event; 527 deimos.event2.event_struct.event* listenEvent; 528 NetworkAddress local_addr; 529 NetworkAddress remote_addr; 530 bool shutdown = false; 531 int socketfd = -1; 532 int status = 0; 533 const(char)* statusMessage; 534 Task readOwner; 535 Task writeOwner; 536 Exception exception; // set during onSocketEvent calls that were emitted synchronously 537 TCPListenOptions listenOptions; 538 ConnectionState state; 539 } 540 alias TCPContextAlloc = FreeListObjectAlloc!(TCPContext, false, true); 541 542 package enum ConnectionState { 543 open, // connection CTR and CTS 544 activeClose, // TCPConnection.close() was called 545 passiveClose, // remote has hung up 546 } 547 548 /**************************************************************************************************/ 549 /* Private functions */ 550 /**************************************************************************************************/ 551 552 package nothrow extern(C) 553 { 554 version (VibeDebugCatchAll) alias UncaughtException = Throwable; 555 else alias UncaughtException = Exception; 556 557 // should be a nested static struct in onConnect, but that triggers an ICE in ldc2-0.14.0 558 private extern(D) struct ClientTask { 559 TCPContext* listen_ctx; 560 NetworkAddress bind_addr; 561 NetworkAddress remote_addr; 562 int sockfd; 563 TCPListenOptions options; 564 565 void execute() 566 { 567 assert(sockfd > 0); 568 if( evutil_make_socket_nonblocking(sockfd) ){ 569 logError("Error setting non-blocking I/O on an incoming connection."); 570 } 571 572 auto eventloop = getThreadLibeventEventLoop(); 573 auto drivercore = getThreadLibeventDriverCore(); 574 575 // Initialize a buffered I/O event 576 auto buf_event = bufferevent_socket_new(eventloop, sockfd, bufferevent_options.BEV_OPT_CLOSE_ON_FREE); 577 if( !buf_event ){ 578 logError("Error initializing buffered I/O event for fd %d.", sockfd); 579 return; 580 } 581 582 auto client_ctx = TCPContextAlloc.alloc(drivercore, eventloop, sockfd, buf_event, bind_addr, remote_addr); 583 assert(client_ctx.event !is null, "event is null although it was just != null?"); 584 bufferevent_setcb(buf_event, &onSocketRead, &onSocketWrite, &onSocketEvent, client_ctx); 585 if( bufferevent_enable(buf_event, EV_READ|EV_WRITE) ){ 586 bufferevent_free(buf_event); 587 TCPContextAlloc.free(client_ctx); 588 logError("Error enabling buffered I/O event for fd %d.", sockfd); 589 return; 590 } 591 592 assert(client_ctx.event !is null, "Client task called without event!?"); 593 if (options & TCPListenOptions.disableAutoClose) { 594 auto conn = new Libevent2TCPConnection(client_ctx); 595 assert(conn.connected, "Connection closed directly after accept?!"); 596 logDebug("start task (fd %d).", client_ctx.socketfd); 597 try { 598 listen_ctx.connectionCallback(conn); 599 logDebug("task out (fd %d).", client_ctx.socketfd); 600 } catch (Exception e) { 601 logWarn("Handling of connection failed: %s", e.msg); 602 logDiagnostic("%s", e.toString().sanitize); 603 } finally { 604 logDebug("task finished."); 605 FreeListObjectAlloc!ClientTask.free(&this); 606 } 607 } else { 608 auto conn = FreeListRef!Libevent2TCPConnection(client_ctx); 609 assert(conn.connected, "Connection closed directly after accept?!"); 610 logDebug("start task (fd %d).", client_ctx.socketfd); 611 try { 612 listen_ctx.connectionCallback(conn); 613 logDebug("task out (fd %d).", client_ctx.socketfd); 614 } catch (Exception e) { 615 logWarn("Handling of connection failed: %s", e.msg); 616 logDiagnostic("%s", e.toString().sanitize); 617 } finally { 618 logDebug("task finished."); 619 FreeListObjectAlloc!ClientTask.free(&this); 620 conn.close(); 621 } 622 } 623 } 624 } 625 626 void onConnect(evutil_socket_t listenfd, short evtype, void *arg) 627 { 628 logTrace("connect callback"); 629 auto ctx = cast(TCPContext*)arg; 630 assert(ctx.magic__ == TCPContext.MAGIC); 631 632 if( !(evtype & EV_READ) ){ 633 logError("Unknown event type in connect callback: 0x%hx", evtype); 634 return; 635 } 636 637 try { 638 // Accept and configure incoming connections (up to 10 connections in one go) 639 foreach( i; 0 .. 10 ){ 640 logTrace("accept"); 641 assert(listenfd < int.max, "Listen socket descriptor >= int.max?!"); 642 sockaddr_in6 remote_addr; 643 socklen_t addrlen = remote_addr.sizeof; 644 auto sockfd_raw = accept(cast(int)listenfd, cast(sockaddr*)&remote_addr, &addrlen); 645 logDebug("FD: %s", sockfd_raw); 646 static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0); 647 auto sockfd = cast(int)sockfd_raw; 648 logTrace("accepted %d", sockfd); 649 if (sockfd == -1) { 650 version(Windows) auto err = evutil_socket_geterror(sockfd); 651 else auto err = errno; 652 if( err != EWOULDBLOCK && err != EAGAIN && err != 0 ){ 653 version(Windows) 654 logError("Error accepting an incoming connection: %s", to!string(evutil_socket_error_to_string(err))); 655 else 656 logError("Error accepting an incoming connection: %d", err); 657 } 658 break; 659 } 660 661 auto task = FreeListObjectAlloc!ClientTask.alloc(); 662 task.listen_ctx = ctx; 663 task.bind_addr = ctx.local_addr; 664 *cast(sockaddr_in6*)task.remote_addr.sockAddr = remote_addr; 665 task.sockfd = sockfd; 666 task.options = ctx.listenOptions; 667 668 runTask(&task.execute); 669 } 670 } catch (UncaughtException e) { 671 logWarn("Got exception while accepting new connections: %s", e.msg); 672 try logDebug("Full error: %s", e.toString().sanitize()); 673 catch (Throwable) {} 674 } 675 676 logTrace("handled incoming connections..."); 677 } 678 679 void onSocketRead(bufferevent *buf_event, void *arg) 680 { 681 auto ctx = cast(TCPContext*)arg; 682 assert(ctx.magic__ == TCPContext.MAGIC); 683 logTrace("socket %d read event!", ctx.socketfd); 684 685 auto f = ctx.readOwner; 686 try { 687 if (f && f.running && !ctx.core.isScheduledForResume(f)) 688 ctx.core.resumeTask(f); 689 } catch (UncaughtException e) { 690 logWarn("Got exception when resuming task onSocketRead: %s", e.msg); 691 } 692 } 693 694 void onSocketWrite(bufferevent *buf_event, void *arg) 695 { 696 try { 697 auto ctx = cast(TCPContext*)arg; 698 assert(ctx.magic__ == TCPContext.MAGIC); 699 assert(ctx.event is buf_event, "Write event on bufferevent that does not match the TCPContext"); 700 logTrace("socket %d write event (%s)!", ctx.socketfd, ctx.shutdown); 701 if (ctx.writeOwner != Task.init && ctx.writeOwner.running && !ctx.core.isScheduledForResume(ctx.writeOwner)) { 702 bufferevent_flush(buf_event, EV_WRITE, bufferevent_flush_mode.BEV_FLUSH); 703 ctx.core.resumeTask(ctx.writeOwner); 704 } 705 } catch (UncaughtException e) { 706 logWarn("Got exception when resuming task onSocketRead: %s", e.msg); 707 } 708 } 709 710 void onSocketEvent(bufferevent *buf_event, short status, void *arg) 711 { 712 try { 713 auto ctx = cast(TCPContext*)arg; 714 assert(ctx.magic__ == TCPContext.MAGIC); 715 ctx.status = status; 716 logDebug("Socket event on fd %d: %d (%s vs %s)", ctx.socketfd, status, cast(void*)buf_event, cast(void*)ctx.event); 717 assert(ctx.event is buf_event, "Status event on bufferevent that does not match the TCPContext"); 718 719 Exception ex; 720 bool free_event = false; 721 722 string errorMessage; 723 if (status & BEV_EVENT_EOF) { 724 logDebug("Connection was closed by remote peer (fd %d).", ctx.socketfd); 725 if (ctx.state != ConnectionState.activeClose) 726 ctx.state = ConnectionState.passiveClose; 727 evbuffer* buf = bufferevent_get_input(buf_event); 728 if (evbuffer_get_length(buf) == 0) free_event = true; 729 } else if (status & BEV_EVENT_TIMEOUT) { 730 logDebug("Remote host on fd %d timed out.", ctx.socketfd); 731 free_event = true; 732 } else if (status & BEV_EVENT_ERROR) { 733 //auto msg = format("Error %s socket %s", 734 // (status & BEV_EVENT_READING) ? "reading from" : (status & BEV_EVENT_WRITING) ? "writing to" : "on", 735 // ctx.socketfd); 736 //ex = new SystemSocketException(msg); 737 ctx.statusMessage = evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()); 738 free_event = true; 739 } 740 741 if (free_event) { 742 bufferevent_free(buf_event); 743 ctx.event = null; 744 } 745 746 ctx.core.eventException = ex; 747 748 // ctx can be destroyed after resuming the reader, so get everything that is required from it first 749 auto reader = ctx.readOwner; 750 auto writer = ctx.writeOwner; 751 auto core = ctx.core; 752 753 if (ex && (reader && reader.fiber.state == Fiber.State.EXEC || writer && writer.fiber.state == Fiber.State.EXEC)) 754 ctx.exception = ex; 755 756 if (writer && writer.running && writer.fiber.state != Fiber.State.EXEC) { 757 logTrace("resuming corresponding write task%s...", ex is null ? "" : " with exception"); 758 core.resumeTask(writer, ex); 759 } 760 761 if (reader && writer != reader && reader.running && !core.isScheduledForResume(reader) && reader.fiber.state != Fiber.State.EXEC) { 762 logTrace("resuming corresponding read task%s...", ex is null ? "" : " with exception"); 763 core.resumeTask(reader, ex); 764 } 765 } catch (UncaughtException e) { 766 logWarn("Got exception when resuming task onSocketEvent: %s", e.msg); 767 try logDiagnostic("Full error: %s", e.toString().sanitize); catch (Throwable) {} 768 } 769 } 770 771 private extern(C) void onTimeout(evutil_socket_t, short events, void* userptr) 772 { 773 try { 774 logTrace("data wait timeout"); 775 auto conn = cast(Libevent2TCPConnection)userptr; 776 conn.m_timeout_triggered = true; 777 if (conn.m_ctx) { 778 if (conn.m_ctx.readOwner) conn.m_ctx.core.resumeTask(conn.m_ctx.readOwner); 779 } else logDebug("waitForData timeout after connection was closed!"); 780 } catch (UncaughtException e) { 781 logWarn("Exception onTimeout: %s", e.msg); 782 } 783 } 784 } 785 786 /// private 787 package void removeFromArray(T)(ref T[] array, T item) 788 { 789 foreach( i; 0 .. array.length ) 790 if( array[i] is item ){ 791 array = array[0 .. i] ~ array[i+1 .. $]; 792 return; 793 } 794 } 795 796 }