#include <systemd/sd-bus.h>

#include <sdbusplus/async/context.hpp>

#include <chrono>

namespace sdbusplus::async
{

context::context(bus_t&& b) : bus(std::move(b))
{
    dbus_source = event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle,
                                    this);
}

namespace details
{

/* The sd_bus_wait/process completion event.
 *
 *  The wait/process handshake is modelled as a Sender with the the worker
 *  task `co_await`ing Senders over and over.  This class is the completion
 *  handling for the Sender (to get it back to the Receiver, ie. the worker).
 */
struct wait_process_completion : bus::details::bus_friend
{
    explicit wait_process_completion(context& ctx) : ctx(ctx) {}
    virtual ~wait_process_completion() = default;

    // Called by the `caller` to indicate the Sender is completed.
    virtual void complete() noexcept = 0;
    // Called by the `caller` to indicate the Sender should be stopped.
    virtual void stop() noexcept = 0;

    // Arm the completion event.
    void arm() noexcept;

    // Data to share with the worker.
    context& ctx;
    event_t::time_resolution timeout{};

    static task<> loop(context& ctx);
    static void wait_once(context& ctx);
};

/* The completion template based on receiver type.
 *
 * The type of the receivers (typically the co_awaiter) is only known by
 * a template, so we need a sub-class of completion to hold the receiver.
 */
template <execution::receiver R>
struct wait_process_operation : public wait_process_completion
{
    wait_process_operation(context& ctx, R r) :
        wait_process_completion(ctx), receiver(std::move(r))
    {}

    wait_process_operation(wait_process_operation&&) = delete;

    void complete() noexcept override final
    {
        execution::set_value(std::move(this->receiver));
    }

    void stop() noexcept override final
    {
        // Stop can be called when the context is shutting down,
        // so treat it as if the receiver completed.
        execution::set_value(std::move(this->receiver));
    }

    friend void tag_invoke(execution::start_t,
                           wait_process_operation& self) noexcept
    {
        self.arm();
    }

    R receiver;
};

/* The sender for the wait/process event. */
struct wait_process_sender
{
    explicit wait_process_sender(context& ctx) : ctx(ctx) {}

    friend auto tag_invoke(execution::get_completion_signatures_t,
                           const wait_process_sender&, auto)
        -> execution::completion_signatures<execution::set_value_t()>;

    template <execution::receiver R>
    friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
                           R r) -> wait_process_operation<R>
    {
        // Create the completion for the wait.
        return {self.ctx, std::move(r)};
    }

  private:
    context& ctx;
};

task<> wait_process_completion::loop(context& ctx)
{
    while (!ctx.final_stop.stop_requested())
    {
        // Handle the next sdbus event.
        co_await wait_process_sender(ctx);

        // Completion likely happened on the context 'caller' thread, so
        // we need to switch to the worker thread.
        co_await execution::schedule(ctx.loop.get_scheduler());
    }

    {
        std::lock_guard lock{ctx.lock};
        ctx.wait_process_stopped = true;
    }
}

} // namespace details

context::~context() noexcept(false)
{
    if (worker_thread.joinable())
    {
        throw std::logic_error(
            "sdbusplus::async::context destructed without completion.");
    }
}

void context::run()
{
    // Run the primary portion of the run-loop.
    caller_run();

    // Rethrow the pending exception (if it exists).
    rethrow_pending_exception();

    // Otherwise this should be final_stop...

    // We need to wait for the pending wait process and stop it.
    wait_for_wait_process_stopped();

    // Wait for all the internal tasks to complete.
    stdexec::this_thread::sync_wait(
        internal_tasks.empty() | execution::upon_error([&](auto&& e) {
            pending_exceptions.emplace_back(std::move(e));
        }));

    // Finish up the loop and join the thread.
    // (There shouldn't be anything going on by this point anyhow.)
    loop.finish();
    if (worker_thread.joinable())
    {
        worker_thread.join();
    }

    // Check for one last exception.
    rethrow_pending_exception();
}

void context::worker_run()
{
    // Start the sdbus 'wait/process' loop; treat it as an internal task.
    internal_tasks.spawn(details::wait_process_completion::loop(*this) |
                         execution::upon_stopped([]() {}));

    // Run the execution::run_loop to handle all the tasks.
    loop.run();
}

void context::spawn_complete(std::exception_ptr&& e)
{
    {
        std::lock_guard l{lock};
        spawn_watcher_running = false;

        if (e)
        {
            pending_exceptions.emplace_back(std::move(e));
        }
    }

    if (stop_requested())
    {
        final_stop.request_stop();
    }

    caller_wait.notify_one();
    event_loop.break_run();
}

void context::check_stop_requested()
{
    if (stop_requested())
    {
        throw std::logic_error(
            "sdbusplus::async::context spawn called while already stopped.");
    }
}

void context::spawn_watcher()
{
    {
        std::lock_guard l{lock};
        if (spawn_watcher_running)
        {
            return;
        }

        spawn_watcher_running = true;
    }

    // Spawn the watch for completion / exceptions.
    internal_tasks.spawn(pending_tasks.empty() |
                         execution::then([this]() { spawn_complete(); }) |
                         execution::upon_error([this](auto&& e) {
                             spawn_complete(std::move(e));
                         }));
}

void context::caller_run()
{
    // We are able to run the loop until the context is requested to stop or
    // we get an exception.
    auto keep_running = [this]() {
        std::lock_guard l{lock};
        return !final_stop.stop_requested() && pending_exceptions.empty();
    };

    // If we are suppose to keep running, start the run loop.
    if (keep_running())
    {
        // Start up the worker thread.
        if (!worker_thread.joinable())
        {
            worker_thread = std::thread{[this]() { worker_run(); }};
        }
        else
        {
            // We've already been running and there might an exception or
            // completion pending.  Spawn a new watcher that checks for these.
            spawn_watcher();
        }

        while (keep_running())
        {
            // Handle waiting on all the sd-events.
            details::wait_process_completion::wait_once(*this);
        }
    }
    else
    {
        // There might be pending exceptions still, so spawn a watcher for them.
        spawn_watcher();
    }
}

void context::wait_for_wait_process_stopped()
{
    auto worker = std::exchange(pending, nullptr);
    while (worker == nullptr)
    {
        std::lock_guard l{lock};
        if (wait_process_stopped)
        {
            break;
        }

        worker = std::exchange(staged, nullptr);
        if (!worker)
        {
            std::this_thread::yield();
        }
    }
    if (worker)
    {
        worker->stop();
        wait_process_stopped = true;
    }
}

void context::rethrow_pending_exception()
{
    {
        std::lock_guard l{lock};
        if (!pending_exceptions.empty())
        {
            auto e = pending_exceptions.front();
            pending_exceptions.pop_front();
            std::rethrow_exception(std::move(e));
        }
    }
}

void details::wait_process_completion::arm() noexcept
{
    // Call process.  True indicates something was handled and we do not
    // need to `wait`, because there might be yet another pending operation
    // to process, so immediately signal the operation as complete.
    if (ctx.get_bus().process_discard())
    {
        this->complete();
        return;
    }

    // We need to call wait now, get the current timeout and stage ourselves
    // as the next completion.

    // Get the bus' timeout.
    uint64_t to_usec = 0;
    sd_bus_get_timeout(get_busp(ctx.get_bus()), &to_usec);

    if (to_usec == UINT64_MAX)
    {
        // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
        // Turn this into -1 for sd-event.
        timeout = std::chrono::microseconds{-1};
    }
    else
    {
        timeout = std::chrono::microseconds(to_usec);
    }

    // Assign ourselves as the pending completion and release the caller.
    std::lock_guard lock{ctx.lock};
    ctx.staged = this;
    ctx.caller_wait.notify_one();
}

void details::wait_process_completion::wait_once(context& ctx)
{
    // Scope for lock.
    {
        std::unique_lock lock{ctx.lock};

        // If there isn't a completion waiting already, wait on the condition
        // variable for one to show up (we can't call `poll` yet because we
        // don't have the required parameters).
        ctx.caller_wait.wait(lock, [&] {
            return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
                   ctx.final_stop.stop_requested() ||
                   !ctx.pending_exceptions.empty();
        });

        // Save the waiter as pending.
        if (ctx.pending == nullptr)
        {
            ctx.pending = std::exchange(ctx.staged, nullptr);
        }
    }

    // Run the event loop to process one request.
    // If the context has been requested to be stopped, skip the event loop.
    if (!ctx.final_stop.stop_requested() && ctx.pending)
    {
        ctx.event_loop.run_one(ctx.pending->timeout);
    }
}

int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
{
    auto self = static_cast<context*>(data);

    auto pending = std::exchange(self->pending, nullptr);
    if (pending != nullptr)
    {
        pending->complete();
    }

    return 0;
}

} // namespace sdbusplus::async
