src: Adopt ngtcp2_conn_write_aggregate_pkt, require ngtcp2 >= v1.15.0

This commit is contained in:
Tatsuhiro Tsujikawa
2025-08-24 12:25:12 +09:00
parent e435050378
commit 7ef3a91d9b
9 changed files with 177 additions and 235 deletions

View File

@@ -71,19 +71,19 @@ if(WITH_WOLFSSL)
else() else()
find_package(OpenSSL 1.1.1) find_package(OpenSSL 1.1.1)
endif() endif()
find_package(Libngtcp2 1.12.0) find_package(Libngtcp2 1.15.0)
if(OPENSSL_FOUND) if(OPENSSL_FOUND)
find_package(Libngtcp2_crypto_quictls 1.12.0) find_package(Libngtcp2_crypto_quictls 1.15.0)
if(LIBNGTCP2_CRYPTO_QUICTLS_FOUND) if(LIBNGTCP2_CRYPTO_QUICTLS_FOUND)
set(HAVE_LIBNGTCP2_CRYPTO_QUICTLS 1) set(HAVE_LIBNGTCP2_CRYPTO_QUICTLS 1)
endif() endif()
find_package(Libngtcp2_crypto_ossl 1.12.0) find_package(Libngtcp2_crypto_ossl 1.15.0)
if(LIBNGTCP2_CRYPTO_OSSL_FOUND) if(LIBNGTCP2_CRYPTO_OSSL_FOUND)
set(HAVE_LIBNGTCP2_CRYPTO_OSSL 1) set(HAVE_LIBNGTCP2_CRYPTO_OSSL 1)
endif() endif()
endif() endif()
if(WOLFSSL_FOUND) if(WOLFSSL_FOUND)
find_package(Libngtcp2_crypto_wolfssl 1.12.0) find_package(Libngtcp2_crypto_wolfssl 1.15.0)
if(LIBNGTCP2_CRYPTO_WOLFSSL_FOUND) if(LIBNGTCP2_CRYPTO_WOLFSSL_FOUND)
set(HAVE_LIBNGTCP2_CRYPTO_WOLFSSL 1) set(HAVE_LIBNGTCP2_CRYPTO_WOLFSSL 1)
endif() endif()

View File

@@ -127,7 +127,7 @@ following libraries are required:
wolfSSL; or LibreSSL (does not support 0RTT); or aws-lc; or wolfSSL; or LibreSSL (does not support 0RTT); or aws-lc; or
`BoringSSL <https://boringssl.googlesource.com/boringssl/>`_ (commit `BoringSSL <https://boringssl.googlesource.com/boringssl/>`_ (commit
729648fb79df7bc46c145e49b0dfd8d2a24232f1); or OpenSSL >= 3.5.0 729648fb79df7bc46c145e49b0dfd8d2a24232f1); or OpenSSL >= 3.5.0
* `ngtcp2 <https://github.com/ngtcp2/ngtcp2>`_ >= 1.12.0 * `ngtcp2 <https://github.com/ngtcp2/ngtcp2>`_ >= 1.15.0
* `nghttp3 <https://github.com/ngtcp2/nghttp3>`_ >= 1.11.0 * `nghttp3 <https://github.com/ngtcp2/nghttp3>`_ >= 1.11.0
Use ``--enable-http3`` configure option to enable HTTP/3 feature for Use ``--enable-http3`` configure option to enable HTTP/3 feature for

View File

@@ -525,7 +525,7 @@ fi
# ngtcp2 (for src) # ngtcp2 (for src)
have_libngtcp2=no have_libngtcp2=no
if test "x${request_libngtcp2}" != "xno"; then if test "x${request_libngtcp2}" != "xno"; then
PKG_CHECK_MODULES([LIBNGTCP2], [libngtcp2 >= 1.12.0], [have_libngtcp2=yes], PKG_CHECK_MODULES([LIBNGTCP2], [libngtcp2 >= 1.15.0], [have_libngtcp2=yes],
[have_libngtcp2=no]) [have_libngtcp2=no])
if test "x${have_libngtcp2}" = "xno"; then if test "x${have_libngtcp2}" = "xno"; then
AC_MSG_NOTICE($LIBNGTCP2_PKG_ERRORS) AC_MSG_NOTICE($LIBNGTCP2_PKG_ERRORS)
@@ -542,7 +542,7 @@ have_libngtcp2_crypto_wolfssl=no
if test "x${have_wolfssl_quic}" = "xyes" && if test "x${have_wolfssl_quic}" = "xyes" &&
test "x${request_libngtcp2}" != "xno"; then test "x${request_libngtcp2}" != "xno"; then
PKG_CHECK_MODULES([LIBNGTCP2_CRYPTO_WOLFSSL], PKG_CHECK_MODULES([LIBNGTCP2_CRYPTO_WOLFSSL],
[libngtcp2_crypto_wolfssl >= 1.12.0], [libngtcp2_crypto_wolfssl >= 1.15.0],
[have_libngtcp2_crypto_wolfssl=yes], [have_libngtcp2_crypto_wolfssl=yes],
[have_libngtcp2_crypto_wolfssl=no]) [have_libngtcp2_crypto_wolfssl=no])
if test "x${have_libngtcp2_crypto_wolfssl}" = "xno"; then if test "x${have_libngtcp2_crypto_wolfssl}" = "xno"; then
@@ -565,7 +565,7 @@ if test "x${have_ssl_provide_quic_data}" = "xyes" &&
test "x${have_boringssl_quic}" != "xyes" && test "x${have_boringssl_quic}" != "xyes" &&
test "x${request_libngtcp2}" != "xno"; then test "x${request_libngtcp2}" != "xno"; then
PKG_CHECK_MODULES([LIBNGTCP2_CRYPTO_QUICTLS], PKG_CHECK_MODULES([LIBNGTCP2_CRYPTO_QUICTLS],
[libngtcp2_crypto_quictls >= 1.12.0], [libngtcp2_crypto_quictls >= 1.15.0],
[have_libngtcp2_crypto_quictls=yes], [have_libngtcp2_crypto_quictls=yes],
[have_libngtcp2_crypto_quictls=no]) [have_libngtcp2_crypto_quictls=no])
if test "x${have_libngtcp2_crypto_quictls}" = "xno"; then if test "x${have_libngtcp2_crypto_quictls}" = "xno"; then
@@ -610,7 +610,7 @@ have_libngtcp2_crypto_ossl=no
if test "x${have_ossl_quic}" = "xyes" && if test "x${have_ossl_quic}" = "xyes" &&
test "x${request_libngtcp2}" != "xno"; then test "x${request_libngtcp2}" != "xno"; then
PKG_CHECK_MODULES([LIBNGTCP2_CRYPTO_OSSL], PKG_CHECK_MODULES([LIBNGTCP2_CRYPTO_OSSL],
[libngtcp2_crypto_ossl >= 1.12.0], [libngtcp2_crypto_ossl >= 1.15.0],
[have_libngtcp2_crypto_ossl=yes], [have_libngtcp2_crypto_ossl=yes],
[have_libngtcp2_crypto_ossl=no]) [have_libngtcp2_crypto_ossl=no])
if test "x${have_libngtcp2_crypto_ossl}" = "xno"; then if test "x${have_libngtcp2_crypto_ossl}" = "xno"; then

View File

@@ -510,7 +510,7 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo)
# endif // UDP_SEGMENT # endif // UDP_SEGMENT
if (config.is_quic()) { if (config.is_quic()) {
quic.tx.data = std::make_unique<uint8_t[]>(64_k); quic.tx.data = std::make_unique<uint8_t[]>(QUIC_TX_DATALEN);
} }
ngtcp2_ccerr_default(&quic.last_error); ngtcp2_ccerr_default(&quic.last_error);

View File

@@ -364,13 +364,11 @@ struct Client {
struct { struct {
bool send_blocked; bool send_blocked;
size_t num_blocked;
size_t num_blocked_sent;
struct { struct {
Address remote_addr; Address remote_addr;
std::span<const uint8_t> data; std::span<const uint8_t> data;
size_t gso_size; size_t gso_size;
} blocked[2]; } blocked;
std::unique_ptr<uint8_t[]> data; std::unique_ptr<uint8_t[]> data;
bool no_gso; bool no_gso;
} tx; } tx;
@@ -495,11 +493,13 @@ struct Client {
void quic_free(); void quic_free();
int read_quic(); int read_quic();
int write_quic(); int write_quic();
ngtcp2_ssize write_quic_pkt(ngtcp2_path *path, ngtcp2_pkt_info *pi,
uint8_t *dest, size_t destlen, ngtcp2_tstamp ts);
std::span<const uint8_t> write_udp(const sockaddr *addr, socklen_t addrlen, std::span<const uint8_t> write_udp(const sockaddr *addr, socklen_t addrlen,
std::span<const uint8_t> data, std::span<const uint8_t> data,
size_t gso_size); size_t gso_size);
int write_udp_or_blocked(const ngtcp2_path &path, void write_udp_or_blocked(const ngtcp2_path &path,
std::span<const uint8_t> data, size_t gso_size); std::span<const uint8_t> data, size_t gso_size);
void on_send_blocked(const ngtcp2_addr &remote_addr, void on_send_blocked(const ngtcp2_addr &remote_addr,
std::span<const uint8_t> data, size_t gso_size); std::span<const uint8_t> data, size_t gso_size);
int send_blocked_packet(); int send_blocked_packet();

View File

@@ -670,6 +670,79 @@ int Client::read_quic() {
return 0; return 0;
} }
namespace {
ngtcp2_ssize write_pkt(ngtcp2_conn *conn, ngtcp2_path *path,
ngtcp2_pkt_info *pi, uint8_t *dest, size_t destlen,
ngtcp2_tstamp ts, void *user_data) {
auto c = static_cast<Client *>(user_data);
return c->write_quic_pkt(path, pi, dest, destlen, ts);
}
} // namespace
ngtcp2_ssize Client::write_quic_pkt(ngtcp2_path *path, ngtcp2_pkt_info *pi,
uint8_t *dest, size_t destlen,
ngtcp2_tstamp ts) {
std::array<nghttp3_vec, 16> vec;
auto s = static_cast<Http3Session *>(session.get());
for (;;) {
int64_t stream_id = -1;
int fin = 0;
ssize_t sveccnt = 0;
if (session && ngtcp2_conn_get_max_data_left(quic.conn)) {
sveccnt = s->write_stream(stream_id, fin, vec.data(), vec.size());
if (sveccnt == -1) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
}
ngtcp2_ssize ndatalen;
auto v = vec.data();
auto vcnt = static_cast<size_t>(sveccnt);
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
if (fin) {
flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
}
auto nwrite = ngtcp2_conn_writev_stream(
quic.conn, path, nullptr, dest, destlen, &ndatalen, flags, stream_id,
reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
if (nwrite < 0) {
switch (nwrite) {
case NGTCP2_ERR_STREAM_DATA_BLOCKED:
assert(ndatalen == -1);
s->block_stream(stream_id);
continue;
case NGTCP2_ERR_STREAM_SHUT_WR:
assert(ndatalen == -1);
s->shutdown_stream_write(stream_id);
continue;
case NGTCP2_ERR_WRITE_MORE:
assert(ndatalen >= 0);
if (s->add_write_offset(stream_id, as_unsigned(ndatalen)) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
continue;
}
ngtcp2_ccerr_set_liberr(&quic.last_error, static_cast<int>(nwrite),
nullptr, 0);
return NGTCP2_ERR_CALLBACK_FAILURE;
}
if (ndatalen >= 0 &&
s->add_write_offset(stream_id, as_unsigned(ndatalen)) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
return nwrite;
}
}
int Client::write_quic() { int Client::write_quic() {
int rv; int rv;
@@ -690,145 +763,47 @@ int Client::write_quic() {
} }
} }
std::array<nghttp3_vec, 16> vec; auto txbuf = std::span{quic.tx.data.get(), QUIC_TX_DATALEN};
auto max_udp_payload_size =
ngtcp2_conn_get_max_tx_udp_payload_size(quic.conn);
auto path_max_udp_payload_size =
ngtcp2_conn_get_path_max_tx_udp_payload_size(quic.conn);
auto txbuf = std::span{quic.tx.data.get(),
std::max(ngtcp2_conn_get_send_quantum(quic.conn),
path_max_udp_payload_size)};
auto buf = txbuf;
ngtcp2_path_storage ps; ngtcp2_path_storage ps;
size_t gso_size = 0; size_t gso_size = 0;
auto pkt = std::span<const uint8_t>{};
auto extra_pkt = std::span<const uint8_t>{};
ngtcp2_path_storage_zero(&ps); ngtcp2_path_storage_zero(&ps);
auto s = static_cast<Http3Session *>(session.get()); auto nwrite = ngtcp2_conn_write_aggregate_pkt(
auto ts = quic_timestamp(); quic.conn, &ps.path, nullptr, txbuf.data(), txbuf.size(), &gso_size,
h2load::write_pkt, quic_timestamp());
for (;;) { if (nwrite < 0) {
int64_t stream_id = -1; return -1;
int fin = 0;
ssize_t sveccnt = 0;
if (session && ngtcp2_conn_get_max_data_left(quic.conn)) {
sveccnt = s->write_stream(stream_id, fin, vec.data(), vec.size());
if (sveccnt == -1) {
return -1;
}
}
ngtcp2_ssize ndatalen;
auto v = vec.data();
auto vcnt = static_cast<size_t>(sveccnt);
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
if (fin) {
flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
}
auto buflen = buf.size() >= max_udp_payload_size
? max_udp_payload_size
: path_max_udp_payload_size;
auto nwrite = ngtcp2_conn_writev_stream(
quic.conn, &ps.path, nullptr, buf.data(), buflen, &ndatalen, flags,
stream_id, reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
if (nwrite < 0) {
switch (nwrite) {
case NGTCP2_ERR_STREAM_DATA_BLOCKED:
assert(ndatalen == -1);
s->block_stream(stream_id);
continue;
case NGTCP2_ERR_STREAM_SHUT_WR:
assert(ndatalen == -1);
s->shutdown_stream_write(stream_id);
continue;
case NGTCP2_ERR_WRITE_MORE:
assert(ndatalen >= 0);
if (s->add_write_offset(stream_id, as_unsigned(ndatalen)) != 0) {
return -1;
}
continue;
}
ngtcp2_ccerr_set_liberr(&quic.last_error, static_cast<int>(nwrite),
nullptr, 0);
return -1;
} else if (ndatalen >= 0 &&
s->add_write_offset(stream_id, as_unsigned(ndatalen)) != 0) {
return -1;
}
quic_restart_pkt_timer();
if (nwrite == 0) {
pkt = std::span{std::ranges::begin(txbuf), std::ranges::begin(buf)};
if (pkt.empty()) {
return 0;
}
break;
}
auto last_pkt_pos = std::ranges::begin(buf);
buf = buf.subspan(as_unsigned(nwrite));
if (last_pkt_pos == std::ranges::begin(txbuf)) {
gso_size = as_unsigned(nwrite);
} else if (static_cast<size_t>(nwrite) > gso_size ||
(gso_size > path_max_udp_payload_size &&
static_cast<size_t>(nwrite) != gso_size)) {
pkt = std::span{std::ranges::begin(txbuf), last_pkt_pos};
extra_pkt = std::span{last_pkt_pos, std::ranges::begin(buf)};
break;
}
// Assume that the path does not change.
if (buf.size() < path_max_udp_payload_size ||
static_cast<size_t>(nwrite) < gso_size) {
pkt = std::span{std::ranges::begin(txbuf), std::ranges::begin(buf)};
break;
}
} }
if (write_udp_or_blocked(ps.path, pkt, gso_size) != 0) { quic_restart_pkt_timer();
if (!extra_pkt.empty()) {
on_send_blocked(ps.path.remote, extra_pkt, extra_pkt.size()); if (nwrite == 0) {
} return 0;
} else if (!extra_pkt.empty()) {
write_udp_or_blocked(ps.path, extra_pkt, extra_pkt.size());
} }
ngtcp2_conn_update_pkt_tx_time(quic.conn, ts); write_udp_or_blocked(ps.path, txbuf.first(static_cast<size_t>(nwrite)),
gso_size);
return 0; return 0;
} }
int Client::write_udp_or_blocked(const ngtcp2_path &path, void Client::write_udp_or_blocked(const ngtcp2_path &path,
std::span<const uint8_t> data, std::span<const uint8_t> data,
size_t gso_size) { size_t gso_size) {
auto rest = write_udp(path.remote.addr, path.remote.addrlen, data, gso_size); auto rest = write_udp(path.remote.addr, path.remote.addrlen, data, gso_size);
if (!rest.empty()) { if (!rest.empty()) {
on_send_blocked(path.remote, data, gso_size); on_send_blocked(path.remote, data, gso_size);
return -1;
} }
return 0;
} }
void Client::on_send_blocked(const ngtcp2_addr &remote_addr, void Client::on_send_blocked(const ngtcp2_addr &remote_addr,
std::span<const uint8_t> data, size_t gso_size) { std::span<const uint8_t> data, size_t gso_size) {
assert(quic.tx.num_blocked || !quic.tx.send_blocked); assert(!quic.tx.send_blocked);
assert(quic.tx.num_blocked < 2);
quic.tx.send_blocked = true; quic.tx.send_blocked = true;
auto &p = quic.tx.blocked[quic.tx.num_blocked++]; auto &p = quic.tx.blocked;
memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen); memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
@@ -842,24 +817,19 @@ void Client::on_send_blocked(const ngtcp2_addr &remote_addr,
int Client::send_blocked_packet() { int Client::send_blocked_packet() {
assert(quic.tx.send_blocked); assert(quic.tx.send_blocked);
for (; quic.tx.num_blocked_sent < quic.tx.num_blocked; auto &p = quic.tx.blocked;
++quic.tx.num_blocked_sent) {
auto &p = quic.tx.blocked[quic.tx.num_blocked_sent];
auto rest = auto rest =
write_udp(&p.remote_addr.su.sa, p.remote_addr.len, p.data, p.gso_size); write_udp(&p.remote_addr.su.sa, p.remote_addr.len, p.data, p.gso_size);
if (!rest.empty()) { if (!rest.empty()) {
p.data = rest; p.data = rest;
signal_write(); signal_write();
return 0; return 0;
}
} }
quic.tx.send_blocked = false; quic.tx.send_blocked = false;
quic.tx.num_blocked = 0;
quic.tx.num_blocked_sent = 0;
return 0; return 0;
} }

View File

@@ -32,6 +32,8 @@
#include "h2load.h" #include "h2load.h"
namespace h2load { namespace h2load {
constexpr size_t QUIC_TX_DATALEN = 64_k;
void quic_pkt_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents); void quic_pkt_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents);
} // namespace h2load } // namespace h2load

View File

@@ -109,6 +109,10 @@ ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
} }
} // namespace } // namespace
namespace {
constexpr size_t QUIC_TX_DATALEN = 64_k;
} // namespace
Http3Upstream::Http3Upstream(ClientHandler *handler) Http3Upstream::Http3Upstream(ClientHandler *handler)
: handler_{handler}, : handler_{handler},
qlog_fd_{-1}, qlog_fd_{-1},
@@ -121,7 +125,7 @@ Http3Upstream::Http3Upstream(ClientHandler *handler)
downstream_queue_{downstream_queue_size(handler->get_worker()), downstream_queue_{downstream_queue_size(handler->get_worker()),
!get_config()->http2_proxy}, !get_config()->http2_proxy},
tx_{ tx_{
.data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]), .data = std::unique_ptr<uint8_t[]>(new uint8_t[QUIC_TX_DATALEN]),
#ifndef UDP_SEGMENT #ifndef UDP_SEGMENT
.no_gso = true, .no_gso = true,
#endif // UDP_SEGMENT #endif // UDP_SEGMENT
@@ -820,25 +824,21 @@ int Http3Upstream::on_write() {
return 0; return 0;
} }
int Http3Upstream::write_streams() { namespace {
std::array<nghttp3_vec, 16> vec; ngtcp2_ssize write_pkt(ngtcp2_conn *conn, ngtcp2_path *path,
auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_); ngtcp2_pkt_info *pi, uint8_t *dest, size_t destlen,
auto path_max_udp_payload_size = ngtcp2_tstamp ts, void *user_data) {
ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_); auto upstream = static_cast<Http3Upstream *>(user_data);
ngtcp2_pkt_info pi, prev_pi;
auto txbuf =
std::span{tx_.data.get(), std::max(ngtcp2_conn_get_send_quantum(conn_),
path_max_udp_payload_size)};
auto buf = txbuf;
auto pkt = std::span<const uint8_t>{};
auto extra_pkt = std::span<const uint8_t>{};
ngtcp2_path_storage ps, prev_ps;
int rv;
size_t gso_size = 0;
auto ts = quic_timestamp();
ngtcp2_path_storage_zero(&ps); return upstream->write_pkt(path, pi, dest, destlen, ts);
ngtcp2_path_storage_zero(&prev_ps); }
} // namespace
ngtcp2_ssize Http3Upstream::write_pkt(ngtcp2_path *path, ngtcp2_pkt_info *pi,
uint8_t *dest, size_t destlen,
ngtcp2_tstamp ts) {
std::array<nghttp3_vec, 16> vec;
int rv;
for (;;) { for (;;) {
int64_t stream_id = -1; int64_t stream_id = -1;
@@ -855,7 +855,7 @@ int Http3Upstream::write_streams() {
&last_error_, &last_error_,
nghttp3_err_infer_quic_app_error_code(static_cast<int>(sveccnt)), nghttp3_err_infer_quic_app_error_code(static_cast<int>(sveccnt)),
nullptr, 0); nullptr, 0);
return handle_error(); return NGTCP2_ERR_CALLBACK_FAILURE;
} }
} }
@@ -868,11 +868,8 @@ int Http3Upstream::write_streams() {
flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
} }
auto buflen = buf.size() >= max_udp_payload_size
? max_udp_payload_size
: path_max_udp_payload_size;
auto nwrite = ngtcp2_conn_writev_stream( auto nwrite = ngtcp2_conn_writev_stream(
conn_, &ps.path, &pi, buf.data(), buflen, &ndatalen, flags, stream_id, conn_, path, pi, dest, destlen, &ndatalen, flags, stream_id,
reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts); reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
if (nwrite < 0) { if (nwrite < 0) {
switch (nwrite) { switch (nwrite) {
@@ -894,7 +891,7 @@ int Http3Upstream::write_streams() {
ngtcp2_ccerr_set_application_error( ngtcp2_ccerr_set_application_error(
&last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
0); 0);
return handle_error(); return NGTCP2_ERR_CALLBACK_FAILURE;
} }
continue; continue;
} }
@@ -907,8 +904,10 @@ int Http3Upstream::write_streams() {
ngtcp2_ccerr_set_liberr(&last_error_, static_cast<int>(nwrite), nullptr, ngtcp2_ccerr_set_liberr(&last_error_, static_cast<int>(nwrite), nullptr,
0); 0);
return handle_error(); return NGTCP2_ERR_CALLBACK_FAILURE;
} else if (ndatalen >= 0) { }
if (ndatalen >= 0) {
rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, rv = nghttp3_conn_add_write_offset(httpconn_, stream_id,
as_unsigned(ndatalen)); as_unsigned(ndatalen));
if (rv != 0) { if (rv != 0) {
@@ -916,64 +915,42 @@ int Http3Upstream::write_streams() {
<< nghttp3_strerror(rv); << nghttp3_strerror(rv);
ngtcp2_ccerr_set_application_error( ngtcp2_ccerr_set_application_error(
&last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0); &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
return handle_error(); return NGTCP2_ERR_CALLBACK_FAILURE;
} }
} }
if (nwrite == 0) { return nwrite;
pkt = std::span{std::ranges::begin(txbuf), std::ranges::begin(buf)}; }
if (pkt.empty()) { }
return 0;
}
break; int Http3Upstream::write_streams() {
} ngtcp2_path_storage ps;
ngtcp2_pkt_info pi;
auto txbuf = std::span{tx_.data.get(), QUIC_TX_DATALEN};
size_t gso_size = 0;
auto last_pkt_pos = std::ranges::begin(buf); ngtcp2_path_storage_zero(&ps);
buf = buf.subspan(as_unsigned(nwrite)); auto nwrite = ngtcp2_conn_write_aggregate_pkt(
conn_, &ps.path, &pi, txbuf.data(), txbuf.size(), &gso_size,
if (last_pkt_pos == std::ranges::begin(txbuf)) { shrpx::write_pkt, quic_timestamp());
ngtcp2_path_copy(&prev_ps.path, &ps.path); if (nwrite < 0) {
prev_pi = pi; return handle_error();
gso_size = as_unsigned(nwrite);
} else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
prev_pi.ecn != pi.ecn ||
static_cast<size_t>(nwrite) > gso_size ||
(gso_size > path_max_udp_payload_size &&
static_cast<size_t>(nwrite) != gso_size)) {
pkt = std::span{std::ranges::begin(txbuf), last_pkt_pos};
extra_pkt = std::span{last_pkt_pos, std::ranges::begin(buf)};
break;
}
if (buf.size() < path_max_udp_payload_size ||
static_cast<size_t>(nwrite) < gso_size) {
pkt = std::span{std::ranges::begin(txbuf), std::ranges::begin(buf)};
break;
}
} }
assert(!pkt.empty()); if (nwrite == 0) {
return 0;
if (send_packet(prev_ps.path, prev_pi, pkt, gso_size) ==
SHRPX_ERR_SEND_BLOCKED) {
if (!extra_pkt.empty()) {
on_send_blocked(ps.path, pi, extra_pkt, extra_pkt.size());
}
} else if (!extra_pkt.empty()) {
send_packet(ps.path, pi, extra_pkt, extra_pkt.size());
} }
ngtcp2_conn_update_pkt_tx_time(conn_, ts); send_packet(ps.path, pi, txbuf.first(static_cast<size_t>(nwrite)), gso_size);
return 0; return 0;
} }
int Http3Upstream::send_packet(const ngtcp2_path &path, void Http3Upstream::send_packet(const ngtcp2_path &path,
const ngtcp2_pkt_info &pi, const ngtcp2_pkt_info &pi,
const std::span<const uint8_t> data, const std::span<const uint8_t> data,
size_t gso_size) { size_t gso_size) {
auto faddr = static_cast<UpstreamAddr *>(path.user_data); auto faddr = static_cast<UpstreamAddr *>(path.user_data);
auto [rest, rv] = auto [rest, rv] =
@@ -984,8 +961,6 @@ int Http3Upstream::send_packet(const ngtcp2_path &path,
signal_write_upstream_addr(faddr); signal_write_upstream_addr(faddr);
} }
return rv;
} }
int Http3Upstream::on_timeout(Downstream *downstream) { int Http3Upstream::on_timeout(Downstream *downstream) {
@@ -1937,13 +1912,12 @@ void Http3Upstream::on_send_blocked(const ngtcp2_path &path,
const ngtcp2_pkt_info &pi, const ngtcp2_pkt_info &pi,
std::span<const uint8_t> data, std::span<const uint8_t> data,
size_t gso_size) { size_t gso_size) {
assert(tx_.num_blocked || !tx_.send_blocked); assert(!tx_.send_blocked);
assert(tx_.num_blocked < 2);
assert(gso_size); assert(gso_size);
tx_.send_blocked = true; tx_.send_blocked = true;
auto &p = tx_.blocked[tx_.num_blocked++]; auto &p = tx_.blocked;
memcpy(&p.local_addr.su, path.local.addr, path.local.addrlen); memcpy(&p.local_addr.su, path.local.addr, path.local.addrlen);
memcpy(&p.remote_addr.su, path.remote.addr, path.remote.addrlen); memcpy(&p.remote_addr.su, path.remote.addr, path.remote.addrlen);
@@ -1959,24 +1933,20 @@ void Http3Upstream::on_send_blocked(const ngtcp2_path &path,
int Http3Upstream::send_blocked_packet() { int Http3Upstream::send_blocked_packet() {
assert(tx_.send_blocked); assert(tx_.send_blocked);
for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) { auto &p = tx_.blocked;
auto &p = tx_.blocked[tx_.num_blocked_sent];
auto [rest, rv] = send_packet(p.faddr, &p.remote_addr.su.sa, auto [rest, rv] = send_packet(p.faddr, &p.remote_addr.su.sa,
p.remote_addr.len, &p.local_addr.su.sa, p.remote_addr.len, &p.local_addr.su.sa,
p.local_addr.len, p.pi, p.data, p.gso_size); p.local_addr.len, p.pi, p.data, p.gso_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) { if (rv == SHRPX_ERR_SEND_BLOCKED) {
p.data = rest; p.data = rest;
signal_write_upstream_addr(p.faddr); signal_write_upstream_addr(p.faddr);
return 0; return 0;
}
} }
tx_.send_blocked = false; tx_.send_blocked = false;
tx_.num_blocked = 0;
tx_.num_blocked_sent = 0;
return 0; return 0;
} }

View File

@@ -102,6 +102,8 @@ public:
std::span<const uint8_t> data); std::span<const uint8_t> data);
int write_streams(); int write_streams();
ngtcp2_ssize write_pkt(ngtcp2_path *path, ngtcp2_pkt_info *pi, uint8_t *dest,
size_t destlen, ngtcp2_tstamp ts);
int handle_error(); int handle_error();
@@ -146,8 +148,8 @@ public:
socklen_t remote_salen, const sockaddr *local_sa, socklen_t remote_salen, const sockaddr *local_sa,
socklen_t local_salen, const ngtcp2_pkt_info &pi, socklen_t local_salen, const ngtcp2_pkt_info &pi,
std::span<const uint8_t> data, size_t gso_size); std::span<const uint8_t> data, size_t gso_size);
int send_packet(const ngtcp2_path &path, const ngtcp2_pkt_info &pi, void send_packet(const ngtcp2_path &path, const ngtcp2_pkt_info &pi,
const std::span<const uint8_t> data, size_t gso_size); const std::span<const uint8_t> data, size_t gso_size);
void qlog_write(const void *data, size_t datalen, bool fin); void qlog_write(const void *data, size_t datalen, bool fin);
int open_qlog_file(const std::string_view &dir, const ngtcp2_cid &scid) const; int open_qlog_file(const std::string_view &dir, const ngtcp2_cid &scid) const;
@@ -179,8 +181,6 @@ private:
struct { struct {
bool send_blocked; bool send_blocked;
size_t num_blocked;
size_t num_blocked_sent;
// blocked field is effective only when send_blocked is true. // blocked field is effective only when send_blocked is true.
struct { struct {
const UpstreamAddr *faddr; const UpstreamAddr *faddr;
@@ -189,7 +189,7 @@ private:
ngtcp2_pkt_info pi; ngtcp2_pkt_info pi;
std::span<const uint8_t> data; std::span<const uint8_t> data;
size_t gso_size; size_t gso_size;
} blocked[2]; } blocked;
std::unique_ptr<uint8_t[]> data; std::unique_ptr<uint8_t[]> data;
bool no_gso; bool no_gso;
} tx_; } tx_;