Merge pull request #2168 from nghttp2/nghttpx-gso-failover

nghttpx: Dynamic GSO failover
This commit is contained in:
Tatsuhiro Tsujikawa
2024-04-26 18:11:22 +09:00
committed by GitHub
5 changed files with 117 additions and 92 deletions

View File

@@ -120,6 +120,9 @@ Http3Upstream::Http3Upstream(ClientHandler *handler)
!get_config()->http2_proxy}, !get_config()->http2_proxy},
tx_{ tx_{
.data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]), .data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
#ifndef UDP_SEGMENT
.no_gso = true,
#endif // UDP_SEGMENT
} { } {
auto conn = handler_->get_connection(); auto conn = handler_->get_connection();
conn->conn_ref.get_conn = shrpx::get_conn; conn->conn_ref.get_conn = shrpx::get_conn;
@@ -875,13 +878,13 @@ int Http3Upstream::write_streams() {
auto data = tx_.data.get(); auto data = tx_.data.get();
auto datalen = bufpos - data; auto datalen = bufpos - data;
rv = send_packet(faddr, prev_ps.path.remote.addr, auto [nsent, rv] = send_packet(
prev_ps.path.remote.addrlen, prev_ps.path.local.addr, faddr, prev_ps.path.remote.addr, prev_ps.path.remote.addrlen,
prev_ps.path.local.addrlen, prev_pi, data, datalen, prev_ps.path.local.addr, prev_ps.path.local.addrlen, prev_pi, data,
gso_size); datalen, gso_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) { if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
prev_pi, data, datalen, gso_size); prev_pi, data + nsent, datalen - nsent, gso_size);
signal_write_upstream_addr(faddr); signal_write_upstream_addr(faddr);
} }
@@ -893,12 +896,10 @@ int Http3Upstream::write_streams() {
} }
auto first_pkt = bufpos == tx_.data.get(); auto first_pkt = bufpos == tx_.data.get();
(void)first_pkt;
bufpos += nwrite; bufpos += nwrite;
bufleft -= nwrite; bufleft -= nwrite;
#ifdef UDP_SEGMENT
if (first_pkt) { if (first_pkt) {
ngtcp2_path_copy(&prev_ps.path, &ps.path); ngtcp2_path_copy(&prev_ps.path, &ps.path);
prev_pi = pi; prev_pi = pi;
@@ -912,14 +913,14 @@ int Http3Upstream::write_streams() {
auto data = tx_.data.get(); auto data = tx_.data.get();
auto datalen = bufpos - data - nwrite; auto datalen = bufpos - data - nwrite;
rv = send_packet(faddr, prev_ps.path.remote.addr, auto [nsent, rv] = send_packet(
prev_ps.path.remote.addrlen, prev_ps.path.local.addr, faddr, prev_ps.path.remote.addr, prev_ps.path.remote.addrlen,
prev_ps.path.local.addrlen, prev_pi, data, datalen, prev_ps.path.local.addr, prev_ps.path.local.addrlen, prev_pi, data,
gso_size); datalen, gso_size);
switch (rv) { switch (rv) {
case SHRPX_ERR_SEND_BLOCKED: case SHRPX_ERR_SEND_BLOCKED:
on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi, on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
data, datalen, gso_size); data + nsent, datalen - nsent, gso_size);
on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data), on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
ps.path.remote, ps.path.local, pi, bufpos - nwrite, ps.path.remote, ps.path.local, pi, bufpos - nwrite,
@@ -932,10 +933,12 @@ int Http3Upstream::write_streams() {
auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data); auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
auto data = bufpos - nwrite; auto data = bufpos - nwrite;
rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen, auto [nsent, rv] = send_packet(
ps.path.local.addr, ps.path.local.addrlen, pi, data, faddr, ps.path.remote.addr, ps.path.remote.addrlen,
nwrite, 0); ps.path.local.addr, ps.path.local.addrlen, pi, data, nwrite, 0);
if (rv == SHRPX_ERR_SEND_BLOCKED) { if (rv == SHRPX_ERR_SEND_BLOCKED) {
assert(nsent == 0);
on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data,
nwrite, 0); nwrite, 0);
@@ -955,12 +958,13 @@ int Http3Upstream::write_streams() {
auto data = tx_.data.get(); auto data = tx_.data.get();
auto datalen = bufpos - data; auto datalen = bufpos - data;
rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen, auto [nsent, rv] =
ps.path.local.addr, ps.path.local.addrlen, pi, data, send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
datalen, gso_size); ps.path.local.addr, ps.path.local.addrlen, pi, data,
datalen, gso_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) { if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen, on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data + nsent,
gso_size); datalen - nsent, gso_size);
signal_write_upstream_addr(faddr); signal_write_upstream_addr(faddr);
} }
@@ -969,33 +973,6 @@ int Http3Upstream::write_streams() {
return 0; return 0;
} }
#else // !UDP_SEGMENT
auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
auto data = tx_.data.get();
auto datalen = bufpos - data;
rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
ps.path.local.addr, ps.path.local.addrlen, pi, data,
datalen, 0);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
0);
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
signal_write_upstream_addr(faddr);
return 0;
}
if (bufleft < path_max_udp_payload_size) {
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
return 0;
}
bufpos = tx_.data.get();
#endif // !UDP_SEGMENT
} }
return 0; return 0;
@@ -1880,16 +1857,44 @@ int Http3Upstream::on_read(const UpstreamAddr *faddr,
return 0; return 0;
} }
int Http3Upstream::send_packet(const UpstreamAddr *faddr, std::pair<size_t, int> Http3Upstream::send_packet(
const sockaddr *remote_sa, size_t remote_salen, const UpstreamAddr *faddr, const sockaddr *remote_sa, size_t remote_salen,
const sockaddr *local_sa, size_t local_salen, const sockaddr *local_sa, size_t local_salen, const ngtcp2_pkt_info &pi,
const ngtcp2_pkt_info &pi, const uint8_t *data, const uint8_t *data, size_t datalen, size_t gso_size) {
size_t datalen, size_t gso_size) { if (tx_.no_gso) {
auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa, if (gso_size == 0) {
local_salen, pi, data, datalen, gso_size); gso_size = datalen;
}
for (auto p = data, end = data + datalen; p != end;) {
auto len = std::min(gso_size, static_cast<size_t>(end - p));
auto [nwrite, rv] =
quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
local_salen, pi, p, len, gso_size);
if (rv != 0) {
switch (rv) {
case -EAGAIN:
#if EAGAIN != EWOULDBLOCK
case -EWOULDBLOCK:
#endif // EAGAIN != EWOULDBLOCK
return {p - data, SHRPX_ERR_SEND_BLOCKED};
default:
return {p - data, -1};
}
}
p += nwrite;
}
return {datalen, 0};
}
auto [nwrite, rv] =
quic_send_packet(faddr, remote_sa, remote_salen, local_sa, local_salen,
pi, data, datalen, gso_size);
switch (rv) { switch (rv) {
case 0: case 0:
return 0; return {nwrite, 0};
// With GSO, sendmsg may fail with EINVAL if UDP payload is too // With GSO, sendmsg may fail with EINVAL if UDP payload is too
// large. // large.
case -EINVAL: case -EINVAL:
@@ -1900,12 +1905,21 @@ int Http3Upstream::send_packet(const UpstreamAddr *faddr,
#if EAGAIN != EWOULDBLOCK #if EAGAIN != EWOULDBLOCK
case -EWOULDBLOCK: case -EWOULDBLOCK:
#endif // EAGAIN != EWOULDBLOCK #endif // EAGAIN != EWOULDBLOCK
return SHRPX_ERR_SEND_BLOCKED; return {nwrite, SHRPX_ERR_SEND_BLOCKED};
case -EIO:
if (tx_.no_gso) {
break;
}
tx_.no_gso = true;
return send_packet(faddr, remote_sa, remote_salen, local_sa, local_salen,
pi, data, datalen, gso_size);
default: default:
break; break;
} }
return -1; return {nwrite, -1};
} }
void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr, void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
@@ -1934,17 +1948,18 @@ void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
} }
int Http3Upstream::send_blocked_packet() { int Http3Upstream::send_blocked_packet() {
int rv;
assert(tx_.send_blocked); assert(tx_.send_blocked);
for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) { for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
auto &p = tx_.blocked[tx_.num_blocked_sent]; auto &p = tx_.blocked[tx_.num_blocked_sent];
rv = send_packet(p.faddr, &p.remote_addr.su.sa, p.remote_addr.len, auto [nsent, rv] = send_packet(
&p.local_addr.su.sa, p.local_addr.len, p.pi, p.data, p.faddr, &p.remote_addr.su.sa, p.remote_addr.len, &p.local_addr.su.sa,
p.datalen, p.gso_size); p.local_addr.len, p.pi, p.data, p.datalen, p.gso_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) { if (rv == SHRPX_ERR_SEND_BLOCKED) {
p.data += nsent;
p.datalen -= nsent;
signal_write_upstream_addr(p.faddr); signal_write_upstream_addr(p.faddr);
return 0; return 0;

View File

@@ -137,10 +137,11 @@ public:
int check_shutdown(); int check_shutdown();
int start_graceful_shutdown(); int start_graceful_shutdown();
int submit_goaway(); int submit_goaway();
int send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa, std::pair<size_t, int>
size_t remote_salen, const sockaddr *local_sa, send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
size_t local_salen, const ngtcp2_pkt_info &pi, size_t remote_salen, const sockaddr *local_sa, size_t local_salen,
const uint8_t *data, size_t datalen, size_t gso_size); const ngtcp2_pkt_info &pi, const uint8_t *data, size_t datalen,
size_t gso_size);
void qlog_write(const void *data, size_t datalen, bool fin); void qlog_write(const void *data, size_t datalen, bool fin);
int open_qlog_file(const StringRef &dir, const ngtcp2_cid &scid) const; int open_qlog_file(const StringRef &dir, const ngtcp2_cid &scid) const;
@@ -184,6 +185,7 @@ private:
size_t gso_size; size_t gso_size;
} blocked[2]; } blocked[2];
std::unique_ptr<uint8_t[]> data; std::unique_ptr<uint8_t[]> data;
bool no_gso;
} tx_; } tx_;
}; };

View File

@@ -55,10 +55,11 @@ ngtcp2_tstamp quic_timestamp() {
.count(); .count();
} }
int quic_send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa, std::pair<size_t, int>
size_t remote_salen, const sockaddr *local_sa, quic_send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
size_t local_salen, const ngtcp2_pkt_info &pi, size_t remote_salen, const sockaddr *local_sa,
const uint8_t *data, size_t datalen, size_t gso_size) { size_t local_salen, const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen, size_t gso_size) {
iovec msg_iov = {const_cast<uint8_t *>(data), datalen}; iovec msg_iov = {const_cast<uint8_t *>(data), datalen};
msghdr msg{}; msghdr msg{};
msg.msg_name = const_cast<sockaddr *>(remote_sa); msg.msg_name = const_cast<sockaddr *>(remote_sa);
@@ -159,7 +160,7 @@ int quic_send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
LOG(INFO) << "sendmsg failed: errno=" << error; LOG(INFO) << "sendmsg failed: errno=" << error;
} }
return -errno; return {0, -errno};
} }
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@@ -170,7 +171,7 @@ int quic_send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
<< " bytes"; << " bytes";
} }
return 0; return {nwrite, 0};
} }
int generate_quic_retry_connection_id(ngtcp2_cid &cid, uint32_t server_id, int generate_quic_retry_connection_id(ngtcp2_cid &cid, uint32_t server_id,

View File

@@ -112,10 +112,11 @@ struct ConnectionID {
ngtcp2_tstamp quic_timestamp(); ngtcp2_tstamp quic_timestamp();
int quic_send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa, std::pair<size_t, int>
size_t remote_salen, const sockaddr *local_sa, quic_send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
size_t local_salen, const ngtcp2_pkt_info &pi, size_t remote_salen, const sockaddr *local_sa,
const uint8_t *data, size_t datalen, size_t gso_size); size_t local_salen, const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen, size_t gso_size);
int generate_quic_retry_connection_id(ngtcp2_cid &cid, uint32_t server_id, int generate_quic_retry_connection_id(ngtcp2_cid &cid, uint32_t server_id,
uint8_t km_id, EVP_CIPHER_CTX *ctx); uint8_t km_id, EVP_CIPHER_CTX *ctx);

View File

@@ -550,9 +550,11 @@ int QUICConnectionHandler::send_version_negotiation(
return -1; return -1;
} }
return quic_send_packet(faddr, &remote_addr.su.sa, remote_addr.len, auto [_, rv] = quic_send_packet(faddr, &remote_addr.su.sa, remote_addr.len,
&local_addr.su.sa, local_addr.len, ngtcp2_pkt_info{}, &local_addr.su.sa, local_addr.len,
buf.data(), nwrite, 0); ngtcp2_pkt_info{}, buf.data(), nwrite, 0);
return rv;
} }
int QUICConnectionHandler::send_stateless_reset( int QUICConnectionHandler::send_stateless_reset(
@@ -572,7 +574,6 @@ int QUICConnectionHandler::send_stateless_reset(
ev_timer_again(worker_->get_loop(), &stateless_reset_bucket_regen_timer_); ev_timer_again(worker_->get_loop(), &stateless_reset_bucket_regen_timer_);
} }
int rv;
std::array<uint8_t, NGTCP2_STATELESS_RESET_TOKENLEN> token; std::array<uint8_t, NGTCP2_STATELESS_RESET_TOKENLEN> token;
ngtcp2_cid cid; ngtcp2_cid cid;
@@ -582,9 +583,9 @@ int QUICConnectionHandler::send_stateless_reset(
auto &qkms = conn_handler->get_quic_keying_materials(); auto &qkms = conn_handler->get_quic_keying_materials();
auto &qkm = qkms->keying_materials.front(); auto &qkm = qkms->keying_materials.front();
rv = generate_quic_stateless_reset_token(token.data(), cid, qkm.secret.data(), if (auto rv = generate_quic_stateless_reset_token(
qkm.secret.size()); token.data(), cid, qkm.secret.data(), qkm.secret.size());
if (rv != 0) { rv != 0) {
return -1; return -1;
} }
@@ -624,9 +625,11 @@ int QUICConnectionHandler::send_stateless_reset(
<< " dcid=" << util::format_hex(std::span{dcid, dcidlen}); << " dcid=" << util::format_hex(std::span{dcid, dcidlen});
} }
return quic_send_packet(faddr, &remote_addr.su.sa, remote_addr.len, auto [_, rv] = quic_send_packet(faddr, &remote_addr.su.sa, remote_addr.len,
&local_addr.su.sa, local_addr.len, ngtcp2_pkt_info{}, &local_addr.su.sa, local_addr.len,
buf.data(), nwrite, 0); ngtcp2_pkt_info{}, buf.data(), nwrite, 0);
return rv;
} }
int QUICConnectionHandler::send_connection_close( int QUICConnectionHandler::send_connection_close(
@@ -655,9 +658,11 @@ int QUICConnectionHandler::send_connection_close(
<< util::format_hex(std::span{ini_dcid.data, ini_dcid.datalen}); << util::format_hex(std::span{ini_dcid.data, ini_dcid.datalen});
} }
return quic_send_packet(faddr, &remote_addr.su.sa, remote_addr.len, auto [_, rv] = quic_send_packet(faddr, &remote_addr.su.sa, remote_addr.len,
&local_addr.su.sa, local_addr.len, ngtcp2_pkt_info{}, &local_addr.su.sa, local_addr.len,
buf.data(), nwrite, 0); ngtcp2_pkt_info{}, buf.data(), nwrite, 0);
return rv;
} }
void QUICConnectionHandler::add_connection_id(const ngtcp2_cid &cid, void QUICConnectionHandler::add_connection_id(const ngtcp2_cid &cid,
@@ -751,9 +756,10 @@ int CloseWait::handle_packet(const UpstreamAddr *faddr,
return 0; return 0;
} }
if (quic_send_packet(faddr, &remote_addr.su.sa, remote_addr.len, auto [_, rv] = quic_send_packet(faddr, &remote_addr.su.sa, remote_addr.len,
&local_addr.su.sa, local_addr.len, ngtcp2_pkt_info{}, &local_addr.su.sa, local_addr.len,
pkt.data(), pkt.size(), 0) != 0) { ngtcp2_pkt_info{}, pkt.data(), pkt.size(), 0);
if (rv != 0) {
return -1; return -1;
} }