From 983f2b70741f17f30fe2321451f10cabecc013d2 Mon Sep 17 00:00:00 2001 From: Liam Date: Sun, 16 Oct 2022 01:53:56 -0400 Subject: kernel: invert session request handling flow --- src/core/hle/kernel/service_thread.cpp | 232 ++++++++++++++++++++++++--------- 1 file changed, 167 insertions(+), 65 deletions(-) (limited to 'src/core/hle/kernel/service_thread.cpp') diff --git a/src/core/hle/kernel/service_thread.cpp b/src/core/hle/kernel/service_thread.cpp index d23d76706..1fc2edf52 100644 --- a/src/core/hle/kernel/service_thread.cpp +++ b/src/core/hle/kernel/service_thread.cpp @@ -1,15 +1,17 @@ -// SPDX-FileCopyrightText: Copyright 2020 yuzu Emulator Project +// SPDX-FileCopyrightText: Copyright 2022 yuzu Emulator Project // SPDX-License-Identifier: GPL-2.0-or-later -#include #include #include #include #include -#include #include "common/scope_exit.h" #include "common/thread.h" +#include "core/hle/ipc_helpers.h" +#include "core/hle/kernel/hle_ipc.h" +#include "core/hle/kernel/k_event.h" +#include "core/hle/kernel/k_scoped_resource_reservation.h" #include "core/hle/kernel/k_session.h" #include "core/hle/kernel/k_thread.h" #include "core/hle/kernel/kernel.h" @@ -19,101 +21,201 @@ namespace Kernel { class ServiceThread::Impl final { public: - explicit Impl(KernelCore& kernel, std::size_t num_threads, const std::string& name); + explicit Impl(KernelCore& kernel, const std::string& service_name); ~Impl(); - void QueueSyncRequest(KSession& session, std::shared_ptr&& context); + void WaitAndProcessImpl(); + void SessionClosed(KServerSession* server_session, + std::shared_ptr manager); + void LoopProcess(); + + void RegisterServerSession(KServerSession* session, + std::shared_ptr manager); private: - std::vector threads; - std::queue> requests; - std::mutex queue_mutex; - std::condition_variable_any condition; - const std::string service_name; + KernelCore& kernel; + + std::jthread m_thread; + std::mutex m_session_mutex; + std::vector m_sessions; + std::vector> m_managers; + KEvent* m_wakeup_event; + KProcess* m_process; + std::atomic m_shutdown_requested; + const std::string m_service_name; }; -ServiceThread::Impl::Impl(KernelCore& kernel, std::size_t num_threads, const std::string& name) - : service_name{name} { - for (std::size_t i = 0; i < num_threads; ++i) { - threads.emplace_back([this, &kernel](std::stop_token stop_token) { - Common::SetCurrentThreadName(std::string{service_name}.c_str()); +void ServiceThread::Impl::WaitAndProcessImpl() { + // Create local list of waitable sessions. + std::vector objs; + std::vector> managers; - // Wait for first request before trying to acquire a render context - { - std::unique_lock lock{queue_mutex}; - condition.wait(lock, stop_token, [this] { return !requests.empty(); }); - } + { + // Lock to get the list. + std::scoped_lock lk{m_session_mutex}; - if (stop_token.stop_requested()) { - return; - } + // Resize to the needed quantity. + objs.resize(m_sessions.size() + 1); + managers.resize(m_managers.size()); - // Allocate a dummy guest thread for this host thread. - kernel.RegisterHostThread(); + // Copy to our local list. + std::copy(m_sessions.begin(), m_sessions.end(), objs.begin()); + std::copy(m_managers.begin(), m_managers.end(), managers.begin()); - while (true) { - std::function task; + // Insert the wakeup event at the end. + objs.back() = &m_wakeup_event->GetReadableEvent(); + } - { - std::unique_lock lock{queue_mutex}; - condition.wait(lock, stop_token, [this] { return !requests.empty(); }); + // Wait on the list of sessions. + s32 index{-1}; + Result rc = KSynchronizationObject::Wait(kernel, &index, objs.data(), + static_cast(objs.size()), -1); + ASSERT(!rc.IsFailure()); + + // If this was the wakeup event, clear it and finish. + if (index >= static_cast(objs.size() - 1)) { + m_wakeup_event->Clear(); + return; + } - if (stop_token.stop_requested()) { - return; - } + // This event is from a server session. + auto* server_session = static_cast(objs[index]); + auto& manager = managers[index]; - if (requests.empty()) { - continue; - } + // Fetch the HLE request context. + std::shared_ptr context; + rc = server_session->ReceiveRequest(&context, manager); - task = std::move(requests.front()); - requests.pop(); - } + // If the session was closed, handle that. + if (rc == ResultSessionClosed) { + SessionClosed(server_session, manager); - task(); - } - }); + // Finish. + return; } + + // TODO: handle other cases + ASSERT(rc == ResultSuccess); + + // Perform the request. + Result service_rc = manager->CompleteSyncRequest(server_session, *context); + + // Reply to the client. + rc = server_session->SendReply(true); + + if (rc == ResultSessionClosed || service_rc == IPC::ERR_REMOTE_PROCESS_DEAD) { + SessionClosed(server_session, manager); + return; + } + + // TODO: handle other cases + ASSERT(rc == ResultSuccess); + ASSERT(service_rc == ResultSuccess); } -void ServiceThread::Impl::QueueSyncRequest(KSession& session, - std::shared_ptr&& context) { +void ServiceThread::Impl::SessionClosed(KServerSession* server_session, + std::shared_ptr manager) { { - std::unique_lock lock{queue_mutex}; + // Lock to get the list. + std::scoped_lock lk{m_session_mutex}; + + // Get the index of the session. + const auto index = + std::find(m_sessions.begin(), m_sessions.end(), server_session) - m_sessions.begin(); + ASSERT(index < static_cast(m_sessions.size())); + + // Remove the session and its manager. + m_sessions.erase(m_sessions.begin() + index); + m_managers.erase(m_managers.begin() + index); + } - auto* server_session{&session.GetServerSession()}; + // Close our reference to the server session. + server_session->Close(); +} - // Open a reference to the session to ensure it is not closes while the service request - // completes asynchronously. - server_session->Open(); +void ServiceThread::Impl::LoopProcess() { + Common::SetCurrentThreadName(m_service_name.c_str()); - requests.emplace([server_session, context{std::move(context)}]() { - // Close the reference. - SCOPE_EXIT({ server_session->Close(); }); + kernel.RegisterHostThread(); - // Complete the service request. - server_session->CompleteSyncRequest(*context); - }); + while (!m_shutdown_requested.load()) { + WaitAndProcessImpl(); } - condition.notify_one(); +} + +void ServiceThread::Impl::RegisterServerSession(KServerSession* server_session, + std::shared_ptr manager) { + // Open the server session. + server_session->Open(); + + { + // Lock to get the list. + std::scoped_lock lk{m_session_mutex}; + + // Insert the session and manager. + m_sessions.push_back(server_session); + m_managers.push_back(manager); + } + + // Signal the wakeup event. + m_wakeup_event->Signal(); } ServiceThread::Impl::~Impl() { - condition.notify_all(); - for (auto& thread : threads) { - thread.request_stop(); - thread.join(); + // Shut down the processing thread. + m_shutdown_requested.store(true); + m_wakeup_event->Signal(); + m_thread.join(); + + // Lock mutex. + m_session_mutex.lock(); + + // Close all remaining sessions. + for (size_t i = 0; i < m_sessions.size(); i++) { + m_sessions[i]->Close(); } + + // Close event. + m_wakeup_event->GetReadableEvent().Close(); + m_wakeup_event->Close(); + + // Close process. + m_process->Close(); +} + +ServiceThread::Impl::Impl(KernelCore& kernel_, const std::string& service_name) + : kernel{kernel_}, m_service_name{service_name} { + // Initialize process. + m_process = KProcess::Create(kernel); + KProcess::Initialize(m_process, kernel.System(), service_name, + KProcess::ProcessType::KernelInternal, kernel.GetSystemResourceLimit()); + + // Reserve a new event from the process resource limit + KScopedResourceReservation event_reservation(m_process, LimitableResource::Events); + ASSERT(event_reservation.Succeeded()); + + // Initialize event. + m_wakeup_event = KEvent::Create(kernel); + m_wakeup_event->Initialize(m_process); + + // Commit the event reservation. + event_reservation.Commit(); + + // Register the event. + KEvent::Register(kernel, m_wakeup_event); + + // Start thread. + m_thread = std::jthread([this] { LoopProcess(); }); } -ServiceThread::ServiceThread(KernelCore& kernel, std::size_t num_threads, const std::string& name) - : impl{std::make_unique(kernel, num_threads, name)} {} +ServiceThread::ServiceThread(KernelCore& kernel, const std::string& name) + : impl{std::make_unique(kernel, name)} {} ServiceThread::~ServiceThread() = default; -void ServiceThread::QueueSyncRequest(KSession& session, - std::shared_ptr&& context) { - impl->QueueSyncRequest(session, std::move(context)); +void ServiceThread::RegisterServerSession(KServerSession* session, + std::shared_ptr manager) { + impl->RegisterServerSession(session, manager); } } // namespace Kernel -- cgit v1.2.3