Skip to content

Commit

Permalink
Implement GNU Make 4.4+ jobserver fifo / semaphore client support
Browse files Browse the repository at this point in the history
The principle of such a job server is rather simple: Before starting a
new job (edge in ninja-speak), a token must be acquired from an external
entity. On posix systems, that entity is simply a fifo filled with N
characters. On win32 systems it is a semaphore initialized to N.  Once a
job is finished, the token must be returned to the external entity.

This functionality is desired when ninja is used as part of a bigger
build, such as builds with Yocto/OpenEmbedded, Buildroot and Android.
Here, multiple compile jobs are executed in parallel to maximize cpu
utilization, but if each compile job uses all available cores, the
system is over loaded.
  • Loading branch information
hundeboll committed May 29, 2024
1 parent 0b4b43a commit bbd51b6
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 10 deletions.
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ add_library(libninja OBJECT
src/eval_env.cc
src/graph.cc
src/graphviz.cc
src/jobserver.cc
src/json.cc
src/line_printer.cc
src/manifest_parser.cc
Expand All @@ -153,6 +154,7 @@ add_library(libninja OBJECT
if(WIN32)
target_sources(libninja PRIVATE
src/subprocess-win32.cc
src/jobserver-win32.cc
src/includes_normalize-win32.cc
src/msvc_helper-win32.cc
src/msvc_helper_main-win32.cc
Expand All @@ -169,7 +171,10 @@ if(WIN32)
# errors by telling windows.h to not define those two.
add_compile_definitions(NOMINMAX)
else()
target_sources(libninja PRIVATE src/subprocess-posix.cc)
target_sources(libninja PRIVATE
src/subprocess-posix.cc
src/jobserver-posix.cc
)
if(CMAKE_SYSTEM_NAME STREQUAL "OS400" OR CMAKE_SYSTEM_NAME STREQUAL "AIX")
target_sources(libninja PRIVATE src/getopt.c)
# Build getopt.c, which can be compiled as either C or C++, as C++
Expand Down
3 changes: 3 additions & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ def has_re2c() -> bool:
'eval_env',
'graph',
'graphviz',
'jobserver',
'json',
'line_printer',
'manifest_parser',
Expand All @@ -556,6 +557,7 @@ def has_re2c() -> bool:
objs += cxx(name, variables=cxxvariables)
if platform.is_windows():
for name in ['subprocess-win32',
'jobserver-win32',
'includes_normalize-win32',
'msvc_helper-win32',
'msvc_helper_main-win32']:
Expand All @@ -565,6 +567,7 @@ def has_re2c() -> bool:
objs += cc('getopt')
else:
objs += cxx('subprocess-posix')
objs += cxx('jobserver-posix')
if platform.is_aix():
objs += cc('getopt')
if platform.is_msvc():
Expand Down
33 changes: 27 additions & 6 deletions src/build.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ struct DryRunCommandRunner : public CommandRunner {
virtual ~DryRunCommandRunner() {}

// Overridden from CommandRunner:
virtual size_t CanRunMore() const;
size_t CanRunMore(bool jobserver_enabled) const override;
virtual bool StartCommand(Edge* edge);
virtual bool WaitForCommand(Result* result);

private:
queue<Edge*> finished_;
};

size_t DryRunCommandRunner::CanRunMore() const {
size_t DryRunCommandRunner::CanRunMore(bool jobserver_enabled) const {
return SIZE_MAX;
}

Expand Down Expand Up @@ -164,6 +164,11 @@ Edge* Plan::FindWork() {
if (ready_.empty())
return NULL;

// Don't initiate more work if the jobserver cannot acquire more tokens
if (jobserver_.Enabled() && !jobserver_.Acquire()) {
return nullptr;
}

Edge* work = ready_.top();
ready_.pop();
return work;
Expand Down Expand Up @@ -201,6 +206,11 @@ bool Plan::EdgeFinished(Edge* edge, EdgeResult result, string* err) {
edge->pool()->EdgeFinished(*edge);
edge->pool()->RetrieveReadyEdges(&ready_);

// Return the token for acquired for this very edge to the jobserver
if (jobserver_.Enabled()) {
jobserver_.Release();
}

// The rest of this function only applies to successful commands.
if (result != kEdgeSucceeded)
return true;
Expand Down Expand Up @@ -579,10 +589,15 @@ void Plan::ScheduleInitialEdges() {
}

void Plan::PrepareQueue() {
jobserver_.Init();
ComputeCriticalPath();
ScheduleInitialEdges();
}

bool Plan::JobserverEnabled() const {
return jobserver_.Enabled();
}

void Plan::Dump() const {
printf("pending: %d\n", (int)want_.size());
for (map<Edge*, Want>::const_iterator e = want_.begin(); e != want_.end(); ++e) {
Expand All @@ -596,7 +611,7 @@ void Plan::Dump() const {
struct RealCommandRunner : public CommandRunner {
explicit RealCommandRunner(const BuildConfig& config) : config_(config) {}
virtual ~RealCommandRunner() {}
virtual size_t CanRunMore() const;
size_t CanRunMore(bool jobserver_enabled) const override;
virtual bool StartCommand(Edge* edge);
virtual bool WaitForCommand(Result* result);
virtual vector<Edge*> GetActiveEdges();
Expand All @@ -619,12 +634,18 @@ void RealCommandRunner::Abort() {
subprocs_.Clear();
}

size_t RealCommandRunner::CanRunMore() const {
size_t RealCommandRunner::CanRunMore(bool jobserver_enabled) const {
size_t subproc_number =
subprocs_.running_.size() + subprocs_.finished_.size();

int64_t capacity = config_.parallelism - subproc_number;

// Return "infinite" capacity if a jobserver is used to limit the number
// of parallel subprocesses instead.
if (jobserver_enabled) {
return SIZE_MAX;
}

if (config_.max_load_average > 0.0f) {
int load_capacity = config_.max_load_average - GetLoadAverage();
if (load_capacity < capacity)
Expand Down Expand Up @@ -792,7 +813,7 @@ bool Builder::Build(string* err) {
while (plan_.more_to_do()) {
// See if we can start any more commands.
if (failures_allowed) {
size_t capacity = command_runner_->CanRunMore();
size_t capacity = command_runner_->CanRunMore(plan_.JobserverEnabled());
while (capacity > 0) {
Edge* edge = plan_.FindWork();
if (!edge)
Expand Down Expand Up @@ -820,7 +841,7 @@ bool Builder::Build(string* err) {
--capacity;

// Re-evaluate capacity.
size_t current_capacity = command_runner_->CanRunMore();
size_t current_capacity = command_runner_->CanRunMore(plan_.JobserverEnabled());
if (current_capacity < capacity)
capacity = current_capacity;
}
Expand Down
9 changes: 8 additions & 1 deletion src/build.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "depfile_parser.h"
#include "exit_status.h"
#include "graph.h"
#include "jobserver.h"
#include "util.h" // int64_t

struct BuildLog;
Expand Down Expand Up @@ -52,6 +53,9 @@ struct Plan {
/// Returns true if there's more work to be done.
bool more_to_do() const { return wanted_edges_ > 0 && command_edges_ > 0; }

/// Jobserver status used to skip capacity based on load average
bool JobserverEnabled() const;

/// Dumps the current state of the plan.
void Dump() const;

Expand Down Expand Up @@ -139,14 +143,17 @@ struct Plan {

/// Total remaining number of wanted edges.
int wanted_edges_;

/// Jobserver client
Jobserver jobserver_;
};

/// CommandRunner is an interface that wraps running the build
/// subcommands. This allows tests to abstract out running commands.
/// RealCommandRunner is an implementation that actually runs commands.
struct CommandRunner {
virtual ~CommandRunner() {}
virtual size_t CanRunMore() const = 0;
virtual size_t CanRunMore(bool jobserver_enabled) const = 0;
virtual bool StartCommand(Edge* edge) = 0;

/// The result of waiting for a command.
Expand Down
6 changes: 4 additions & 2 deletions src/build_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ struct FakeCommandRunner : public CommandRunner {
max_active_edges_(1), fs_(fs) {}

// CommandRunner impl
virtual size_t CanRunMore() const;
size_t CanRunMore(bool jobserver_enabled) const override;
virtual bool StartCommand(Edge* edge);
virtual bool WaitForCommand(Result* result);
virtual vector<Edge*> GetActiveEdges();
Expand Down Expand Up @@ -622,7 +622,9 @@ void BuildTest::RebuildTarget(const string& target, const char* manifest,
builder.command_runner_.release();
}

size_t FakeCommandRunner::CanRunMore() const {
size_t FakeCommandRunner::CanRunMore(bool jobserver_enabled) const {
assert(!jobserver_enabled);

if (active_edges_.size() < max_active_edges_)
return SIZE_MAX;

Expand Down
71 changes: 71 additions & 0 deletions src/jobserver-posix.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "jobserver.h"

#include <fcntl.h>
#include <unistd.h>

#include <cassert>
#include <cstring>

#include "util.h"

void Jobserver::Init() {
assert(fd_ < 0);

if (!ParseJobserverAuth("fifo")) {
return;
}

const char* jobserver = jobserver_name_.c_str();

fd_ = open(jobserver, O_NONBLOCK | O_CLOEXEC | O_RDWR);
if (fd_ < 0) {
Fatal("failed to open jobserver: %s: %s", jobserver, strerror(errno));
}

Info("using jobserver: %s", jobserver);
}

Jobserver::~Jobserver() {
assert(token_count_ == 0);

if (fd_ >= 0) {
close(fd_);
}
}

bool Jobserver::Enabled() const {
return fd_ >= 0;
}

bool Jobserver::AcquireToken() {
char token;
int res = read(fd_, &token, 1);
if (res < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
Fatal("failed to read jobserver token: %s", strerror(errno));
}

return res > 0;
}

void Jobserver::ReleaseToken() {
char token = '+';
int res = write(fd_, &token, 1);
if (res != 1) {
Fatal("failed to write token: %s: %s", jobserver_name_.c_str(),
strerror(errno));
}
}
60 changes: 60 additions & 0 deletions src/jobserver-win32.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "jobserver.h"

#include <windows.h>

#include <cassert>

#include "util.h"

void Jobserver::Init() {
assert(sem_ == INVALID_HANDLE_VALUE);

if (!ParseJobserverAuth("sem")) {
return;
}

const char* name = jobserver_name_.c_str();

sem_ = OpenSemaphore(SYNCHRONIZE|SEMAPHORE_MODIFY_STATE, false, name);
if (sem_ == nullptr) {
Win32Fatal("OpenSemaphore", name);
}

Info("using jobserver: %s", name);
}

Jobserver::~Jobserver() {
assert(token_count_ == 0);

if (sem_ != INVALID_HANDLE_VALUE) {
CloseHandle(sem_);
}
}

bool Jobserver::Enabled() const {
return sem_ != INVALID_HANDLE_VALUE;
}

bool Jobserver::AcquireToken() {
return WaitForSingleObject(sem_, 0) == WAIT_OBJECT_0;
}

void Jobserver::ReleaseToken() {
if (!ReleaseSemaphore(sem_, 1, nullptr)) {
Win32Fatal("ReleaseSemaphore");
}
}
Loading

0 comments on commit bbd51b6

Please sign in to comment.