nghttpx: Split thread into worker_process and thread

Split thread into worker_process and thread.  Use thread to O(1)
lookup for Worker.  This new machinery is not compatible to the
previous version.  The old instance of nghttpx must not be upgraded
with USR2 signal.  It should be restarted instead.
This commit is contained in:
Tatsuhiro Tsujikawa
2024-03-31 00:16:25 +09:00
parent cd7d5166f1
commit edd2070a11
7 changed files with 76 additions and 57 deletions

View File

@@ -205,7 +205,7 @@ struct WorkerProcess {
WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
, ,
int quic_ipc_fd, std::vector<WorkerID> worker_ids int quic_ipc_fd, std::vector<WorkerID> worker_ids, uint16_t seq
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
) )
: loop(loop), : loop(loop),
@@ -214,7 +214,8 @@ struct WorkerProcess {
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
, ,
quic_ipc_fd(quic_ipc_fd), quic_ipc_fd(quic_ipc_fd),
worker_ids(std::move(worker_ids)) worker_ids(std::move(worker_ids)),
seq(seq)
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
{ {
ev_child_init(&worker_process_childev, worker_process_child_cb, worker_pid, ev_child_init(&worker_process_childev, worker_process_child_cb, worker_pid,
@@ -246,6 +247,7 @@ struct WorkerProcess {
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
int quic_ipc_fd; int quic_ipc_fd;
std::vector<WorkerID> worker_ids; std::vector<WorkerID> worker_ids;
uint16_t seq;
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
}; };
@@ -255,6 +257,10 @@ void reload_config();
namespace { namespace {
std::deque<std::unique_ptr<WorkerProcess>> worker_processes; std::deque<std::unique_ptr<WorkerProcess>> worker_processes;
#ifdef ENABLE_HTTP3
uint16_t worker_process_seq;
#endif // ENABLE_HTTP3
} // namespace } // namespace
namespace { namespace {
@@ -1288,11 +1294,19 @@ get_inherited_quic_lingering_worker_process_from_env() {
p = end + 1; p = end + 1;
} }
std::sort(std::begin(worker_ids), std::end(worker_ids));
lwps.emplace_back(std::move(worker_ids), fd); lwps.emplace_back(std::move(worker_ids), fd);
} }
if (!lwps.empty()) {
const auto &lwp = lwps.back();
if (!lwp.worker_ids.empty() &&
worker_process_seq <= lwp.worker_ids[0].worker_process) {
worker_process_seq = lwp.worker_ids[0].worker_process;
++worker_process_seq;
}
}
return lwps; return lwps;
} }
} // namespace } // namespace
@@ -1422,7 +1436,7 @@ int create_quic_ipc_socket(std::array<int, 2> &quic_ipc_fd) {
} // namespace } // namespace
namespace { namespace {
int generate_worker_id(std::vector<WorkerID> &worker_ids, int generate_worker_id(std::vector<WorkerID> &worker_ids, uint16_t wp_seq,
const Config *config) { const Config *config) {
auto &apiconf = config->api; auto &apiconf = config->api;
auto &quicconf = config->quic; auto &quicconf = config->quic;
@@ -1443,13 +1457,13 @@ int generate_worker_id(std::vector<WorkerID> &worker_ids,
worker_ids.resize(num_wid); worker_ids.resize(num_wid);
for (auto &wid : worker_ids) { uint16_t idx = 0;
if (create_worker_id(wid, quicconf.server_id) != 0) {
return -1;
}
}
std::sort(std::begin(worker_ids), std::end(worker_ids)); for (auto &wid : worker_ids) {
wid.server = quicconf.server_id;
wid.worker_process = wp_seq;
wid.thread = idx++;
}
return 0; return 0;
} }
@@ -1840,7 +1854,7 @@ int event_loop() {
std::vector<WorkerID> worker_ids; std::vector<WorkerID> worker_ids;
if (generate_worker_id(worker_ids, config) != 0) { if (generate_worker_id(worker_ids, worker_process_seq, config) != 0) {
return -1; return -1;
} }
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
@@ -1872,11 +1886,11 @@ int event_loop() {
ev_timer_init(&worker_process_grace_period_timer, ev_timer_init(&worker_process_grace_period_timer,
worker_process_grace_period_timercb, 0., 0.); worker_process_grace_period_timercb, 0., 0.);
worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd worker_process_add(std::make_unique<WorkerProcess>(
loop, pid, ipc_fd
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
, ,
quic_ipc_fd, quic_ipc_fd, std::move(worker_ids), worker_process_seq++
std::move(worker_ids)
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
)); ));
@@ -4010,7 +4024,8 @@ void reload_config() {
std::vector<WorkerID> worker_ids; std::vector<WorkerID> worker_ids;
if (generate_worker_id(worker_ids, new_config.get()) != 0) { if (generate_worker_id(worker_ids, worker_process_seq, new_config.get()) !=
0) {
close_not_inherited_fd(new_config.get(), iaddrs); close_not_inherited_fd(new_config.get(), iaddrs);
return; return;
} }
@@ -4046,11 +4061,11 @@ void reload_config() {
close_unused_inherited_addr(iaddrs); close_unused_inherited_addr(iaddrs);
worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd worker_process_add(std::make_unique<WorkerProcess>(
loop, pid, ipc_fd
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
, ,
quic_ipc_fd, quic_ipc_fd, std::move(worker_ids), worker_process_seq++
std::move(worker_ids)
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
)); ));

View File

@@ -2975,13 +2975,28 @@ int parse_config(Config *config, int optid, const StringRef &opt,
return 0; return 0;
} }
case SHRPX_OPTID_WORKERS: case SHRPX_OPTID_WORKERS: {
#ifdef NOTHREADS #ifdef NOTHREADS
LOG(WARN) << "Threading disabled at build time, no threads created."; LOG(WARN) << "Threading disabled at build time, no threads created.";
return 0; return 0;
#else // !NOTHREADS #else // !NOTHREADS
return parse_uint(&config->num_worker, opt, optarg); size_t n;
if (parse_uint(&n, opt, optarg) != 0) {
return -1;
}
if (n > 65530) {
LOG(ERROR) << opt << ": the number of workers must not exceed 65530";
return -1;
}
config->num_worker = n;
return 0;
#endif // !NOTHREADS #endif // !NOTHREADS
}
case SHRPX_OPTID_HTTP2_MAX_CONCURRENT_STREAMS: { case SHRPX_OPTID_HTTP2_MAX_CONCURRENT_STREAMS: {
LOG(WARN) << opt << ": deprecated. Use " LOG(WARN) << opt << ": deprecated. Use "
<< SHRPX_OPT_FRONTEND_HTTP2_MAX_CONCURRENT_STREAMS << " and " << SHRPX_OPT_FRONTEND_HTTP2_MAX_CONCURRENT_STREAMS << " and "

View File

@@ -1039,22 +1039,35 @@ void ConnectionHandler::set_worker_ids(std::vector<WorkerID> worker_ids) {
worker_ids_ = std::move(worker_ids); worker_ids_ = std::move(worker_ids);
} }
namespace {
ssize_t find_worker_index(const std::vector<WorkerID> &worker_ids,
const WorkerID &wid) {
assert(!worker_ids.empty());
if (wid.server != worker_ids[0].server ||
wid.worker_process != worker_ids[0].worker_process ||
wid.thread >= worker_ids.size()) {
return -1;
}
return wid.thread;
}
} // namespace
Worker *ConnectionHandler::find_worker(const WorkerID &wid) const { Worker *ConnectionHandler::find_worker(const WorkerID &wid) const {
auto it = auto idx = find_worker_index(worker_ids_, wid);
std::lower_bound(std::begin(worker_ids_), std::end(worker_ids_), wid); if (idx == -1) {
if (it == std::end(worker_ids_) || *it != wid) {
return nullptr; return nullptr;
} }
return workers_[std::distance(std::begin(worker_ids_), it)].get(); return workers_[idx].get();
} }
QUICLingeringWorkerProcess * QUICLingeringWorkerProcess *
ConnectionHandler::match_quic_lingering_worker_process_worker_id( ConnectionHandler::match_quic_lingering_worker_process_worker_id(
const WorkerID &wid) { const WorkerID &wid) {
for (auto &lwps : quic_lingering_worker_processes_) { for (auto &lwps : quic_lingering_worker_processes_) {
if (std::binary_search(std::begin(lwps.worker_ids), if (find_worker_index(lwps.worker_ids, wid) != -1) {
std::end(lwps.worker_ids), wid)) {
return &lwps; return &lwps;
} }
} }

View File

@@ -121,7 +121,6 @@ enum class QUICIPCType {
// WorkerProcesses which are in graceful shutdown period. // WorkerProcesses which are in graceful shutdown period.
struct QUICLingeringWorkerProcess { struct QUICLingeringWorkerProcess {
// |worker_ids| must be sorted in the lexicographical order.
QUICLingeringWorkerProcess(std::vector<WorkerID> worker_ids, int quic_ipc_fd) QUICLingeringWorkerProcess(std::vector<WorkerID> worker_ids, int quic_ipc_fd)
: worker_ids{std::move(worker_ids)}, quic_ipc_fd{quic_ipc_fd} {} : worker_ids{std::move(worker_ids)}, quic_ipc_fd{quic_ipc_fd} {}
@@ -202,7 +201,6 @@ public:
void set_quic_keying_materials(std::shared_ptr<QUICKeyingMaterials> qkms); void set_quic_keying_materials(std::shared_ptr<QUICKeyingMaterials> qkms);
const std::shared_ptr<QUICKeyingMaterials> &get_quic_keying_materials() const; const std::shared_ptr<QUICKeyingMaterials> &get_quic_keying_materials() const;
// |worker_ids| must be sorted in the lexicographical order.
void set_worker_ids(std::vector<WorkerID> worker_ids); void set_worker_ids(std::vector<WorkerID> worker_ids);
Worker *find_worker(const WorkerID &wid) const; Worker *find_worker(const WorkerID &wid) const;

View File

@@ -87,7 +87,8 @@ struct WorkerID {
union { union {
struct { struct {
uint32_t server; uint32_t server;
uint32_t thread; uint16_t worker_process;
uint16_t thread;
}; };
uint64_t worker; uint64_t worker;
}; };
@@ -104,10 +105,6 @@ inline bool operator!=(const WorkerID &lhd, const WorkerID &rhd) {
return lhd.worker != rhd.worker; return lhd.worker != rhd.worker;
} }
inline bool operator<(const WorkerID &lhd, const WorkerID &rhd) {
return lhd.worker < rhd.worker;
}
struct ConnectionID { struct ConnectionID {
WorkerID worker; WorkerID worker;
uint64_t client; uint64_t client;

View File

@@ -1441,17 +1441,4 @@ void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
} }
} }
#ifdef ENABLE_HTTP3
int create_worker_id(WorkerID &dest, uint32_t server_id) {
dest.server = server_id;
if (RAND_bytes(reinterpret_cast<unsigned char *>(&dest.thread),
sizeof(dest.thread)) != 1) {
return -1;
}
return 0;
}
#endif // ENABLE_HTTP3
} // namespace shrpx } // namespace shrpx

View File

@@ -468,12 +468,6 @@ size_t match_downstream_addr_group(
// nullptr. This function may schedule live check. // nullptr. This function may schedule live check.
void downstream_failure(DownstreamAddr *addr, const Address *raddr); void downstream_failure(DownstreamAddr *addr, const Address *raddr);
#ifdef ENABLE_HTTP3
// Creates WorkerID used as a prefix of QUIC Connection ID. This
// function returns -1 on failure.
int create_worker_id(WorkerID &dest, uint32_t server_id);
#endif // ENABLE_HTTP3
} // namespace shrpx } // namespace shrpx
#endif // SHRPX_WORKER_H #endif // SHRPX_WORKER_H