Skip to content

Commit

Permalink
Implement GNU Make 4.4+ jobserver fifo / semaphore client support
Browse files Browse the repository at this point in the history
The principle of such a job server is rather simple: Before starting a
new job (edge in ninja-speak), a token must be acquired from an external
entity. On posix systems, that entity is simply a fifo filled with N
characters. On win32 systems it is a semaphore initialized to N.  Once a
job is finished, the token must be returned to the external entity.

This functionality is desired when ninja is used as part of a bigger
build, such as builds with Yocto/OpenEmbedded, Buildroot and Android.
Here, multiple compile jobs are executed in parallel to maximize cpu
utilization, but if each compile job uses all available cores, the
system is over loaded.
  • Loading branch information
hundeboll committed May 18, 2024
1 parent f14a949 commit a81fb9f
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 10 deletions.
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ add_library(libninja OBJECT
src/eval_env.cc
src/graph.cc
src/graphviz.cc
src/jobserver.cc
src/json.cc
src/line_printer.cc
src/manifest_parser.cc
Expand All @@ -153,6 +154,7 @@ add_library(libninja OBJECT
if(WIN32)
target_sources(libninja PRIVATE
src/subprocess-win32.cc
src/jobserver-win32.cc
src/includes_normalize-win32.cc
src/msvc_helper-win32.cc
src/msvc_helper_main-win32.cc
Expand All @@ -164,7 +166,10 @@ if(WIN32)
# compiler may build ninja.
set_source_files_properties(src/getopt.c PROPERTIES LANGUAGE CXX)
else()
target_sources(libninja PRIVATE src/subprocess-posix.cc)
target_sources(libninja PRIVATE
src/subprocess-posix.cc
src/jobserver-posix.cc
)
if(CMAKE_SYSTEM_NAME STREQUAL "OS400" OR CMAKE_SYSTEM_NAME STREQUAL "AIX")
target_sources(libninja PRIVATE src/getopt.c)
# Build getopt.c, which can be compiled as either C or C++, as C++
Expand Down
3 changes: 3 additions & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ def has_re2c() -> bool:
'eval_env',
'graph',
'graphviz',
'jobserver',
'json',
'line_printer',
'manifest_parser',
Expand All @@ -556,6 +557,7 @@ def has_re2c() -> bool:
objs += cxx(name, variables=cxxvariables)
if platform.is_windows():
for name in ['subprocess-win32',
'jobserver-win32',
'includes_normalize-win32',
'msvc_helper-win32',
'msvc_helper_main-win32']:
Expand All @@ -565,6 +567,7 @@ def has_re2c() -> bool:
objs += cc('getopt')
else:
objs += cxx('subprocess-posix')
objs += cxx('jobserver-posix')
if platform.is_aix():
objs += cc('getopt')
if platform.is_msvc():
Expand Down
28 changes: 22 additions & 6 deletions src/build.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ struct DryRunCommandRunner : public CommandRunner {
virtual ~DryRunCommandRunner() {}

// Overridden from CommandRunner:
virtual size_t CanRunMore() const;
virtual size_t CanRunMore(bool jobserver_enabled) const;
virtual bool StartCommand(Edge* edge);
virtual bool WaitForCommand(Result* result);

private:
queue<Edge*> finished_;
};

size_t DryRunCommandRunner::CanRunMore() const {
size_t DryRunCommandRunner::CanRunMore(bool jobserver_enabled) const {
return SIZE_MAX;
}

Expand Down Expand Up @@ -163,6 +163,11 @@ Edge* Plan::FindWork() {
if (ready_.empty())
return NULL;

// Don't initiate more work if the jobserver cannot acquire more tokens
if (jobserver_.Enabled() && !jobserver_.Acquire()) {
return NULL;
}

Edge* work = ready_.top();
ready_.pop();
return work;
Expand Down Expand Up @@ -200,6 +205,11 @@ bool Plan::EdgeFinished(Edge* edge, EdgeResult result, string* err) {
edge->pool()->EdgeFinished(*edge);
edge->pool()->RetrieveReadyEdges(&ready_);

// Return the token for acquired for this very edge to the jobserver
if (jobserver_.Enabled()) {
jobserver_.Release();
}

// The rest of this function only applies to successful commands.
if (result != kEdgeSucceeded)
return true;
Expand Down Expand Up @@ -578,6 +588,7 @@ void Plan::ScheduleInitialEdges() {
}

void Plan::PrepareQueue() {
jobserver_.Init();
ComputeCriticalPath();
ScheduleInitialEdges();
}
Expand All @@ -595,7 +606,7 @@ void Plan::Dump() const {
struct RealCommandRunner : public CommandRunner {
explicit RealCommandRunner(const BuildConfig& config) : config_(config) {}
virtual ~RealCommandRunner() {}
virtual size_t CanRunMore() const;
virtual size_t CanRunMore(bool jobserver_enabled) const;
virtual bool StartCommand(Edge* edge);
virtual bool WaitForCommand(Result* result);
virtual vector<Edge*> GetActiveEdges();
Expand All @@ -618,12 +629,17 @@ void RealCommandRunner::Abort() {
subprocs_.Clear();
}

size_t RealCommandRunner::CanRunMore() const {
size_t RealCommandRunner::CanRunMore(bool jobserver_enabled) const {
size_t subproc_number =
subprocs_.running_.size() + subprocs_.finished_.size();

int64_t capacity = config_.parallelism - subproc_number;

// Return "infinite" capacity if a jobserver is used to limit the number
// of parallel subprocesses instead.
if (jobserver_enabled)
return SIZE_MAX;

if (config_.max_load_average > 0.0f) {
int load_capacity = config_.max_load_average - GetLoadAverage();
if (load_capacity < capacity)
Expand Down Expand Up @@ -789,7 +805,7 @@ bool Builder::Build(string* err) {
while (plan_.more_to_do()) {
// See if we can start any more commands.
if (failures_allowed) {
size_t capacity = command_runner_->CanRunMore();
size_t capacity = command_runner_->CanRunMore(plan_.JobserverEnabled());
while (capacity > 0) {
Edge* edge = plan_.FindWork();
if (!edge)
Expand Down Expand Up @@ -817,7 +833,7 @@ bool Builder::Build(string* err) {
--capacity;

// Re-evaluate capacity.
size_t current_capacity = command_runner_->CanRunMore();
size_t current_capacity = command_runner_->CanRunMore(plan_.JobserverEnabled());
if (current_capacity < capacity)
capacity = current_capacity;
}
Expand Down
9 changes: 8 additions & 1 deletion src/build.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "depfile_parser.h"
#include "graph.h"
#include "exit_status.h"
#include "jobserver.h"
#include "util.h" // int64_t

struct BuildLog;
Expand Down Expand Up @@ -51,6 +52,9 @@ struct Plan {
/// Returns true if there's more work to be done.
bool more_to_do() const { return wanted_edges_ > 0 && command_edges_ > 0; }

/// Jobserver status used to skip capacity based on load average
bool JobserverEnabled() const { return jobserver_.Enabled(); }

/// Dumps the current state of the plan.
void Dump() const;

Expand Down Expand Up @@ -138,14 +142,17 @@ struct Plan {

/// Total remaining number of wanted edges.
int wanted_edges_;

/// Jobserver client
Jobserver jobserver_;
};

/// CommandRunner is an interface that wraps running the build
/// subcommands. This allows tests to abstract out running commands.
/// RealCommandRunner is an implementation that actually runs commands.
struct CommandRunner {
virtual ~CommandRunner() {}
virtual size_t CanRunMore() const = 0;
virtual size_t CanRunMore(bool jobserver_enabled) const = 0;
virtual bool StartCommand(Edge* edge) = 0;

/// The result of waiting for a command.
Expand Down
7 changes: 5 additions & 2 deletions src/build_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ struct FakeCommandRunner : public CommandRunner {
max_active_edges_(1), fs_(fs) {}

// CommandRunner impl
virtual size_t CanRunMore() const;
virtual size_t CanRunMore(bool jobserver_enabled) const;
virtual bool StartCommand(Edge* edge);
virtual bool WaitForCommand(Result* result);
virtual vector<Edge*> GetActiveEdges();
Expand Down Expand Up @@ -622,10 +622,13 @@ void BuildTest::RebuildTarget(const string& target, const char* manifest,
builder.command_runner_.release();
}

size_t FakeCommandRunner::CanRunMore() const {
size_t FakeCommandRunner::CanRunMore(bool jobserver_enabled) const {
if (active_edges_.size() < max_active_edges_)
return SIZE_MAX;

if (jobserver_enabled)
return SIZE_MAX;

return 0;
}

Expand Down
72 changes: 72 additions & 0 deletions src/jobserver-posix.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <fcntl.h>
#include <cassert>
#include <cstring>
#include <unistd.h>

#include "jobserver.h"
#include "util.h"

Jobserver::Jobserver() : token_count_(0), fd_(-1)
{}

void Jobserver::Init() {
assert(fd_ < 0);

if (!ParseJobserverAuth("fifo")) {
return;
}

const char *jobserver = jobserver_name_.c_str();

fd_ = open(jobserver, O_NONBLOCK | O_CLOEXEC | O_RDWR);
if (fd_ < 0) {
Fatal("failed to open jobserver: %s: %s", jobserver, strerror(errno));
}

Info("Using jobserver: %s", jobserver);
}

Jobserver::~Jobserver() {
assert(token_count_ == 0);

if (fd_ >= 0) {
close(fd_);
}
}

bool Jobserver::Enabled() const {
return fd_ >= 0;
}

bool Jobserver::AcquireToken() {
char token;
int res = read(fd_, &token, 1);
if (res < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
Fatal("failed to read jobserver token: %s", strerror(errno));
}

return res > 0;
}

void Jobserver::ReleaseToken() {
char token = '+';
int res = write(fd_, &token, 1);
if (res != 1) {
Fatal("failed to write token: %s: %s", jobserver_name_.c_str(),
strerror(errno));
}
}
61 changes: 61 additions & 0 deletions src/jobserver-win32.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <windows.h>
#include <cassert>

#include "jobserver.h"
#include "util.h"

Jobserver::Jobserver() : token_count_(0), sem_(NULL)
{}

void Jobserver::Init() {
assert(sem_ == NULL);

if (!ParseJobserverAuth("sem")) {
return;
}

const char *name = jobserver_name_.c_str();

sem_ = OpenSemaphore(SEMAPHORE_ALL_ACCESS, false, name);
if (sem_ == NULL) {
Win32Fatal("OpenSemaphore");
}

Info("using jobserver: %s", name);
}

Jobserver::~Jobserver() {
assert(token_count_ == 0);

if (sem_ != NULL) {
CloseHandle(sem_);
}
}

bool Jobserver::Enabled() const {
return sem_ != NULL;
}

bool Jobserver::AcquireToken() {
return WaitForSingleObject(sem_, 0) == WAIT_OBJECT_0;
}

void Jobserver::ReleaseToken() {
if (ReleaseSemaphore(sem_, 1, NULL)) {
Win32Fatal("ReleaseSemaphore");
}
}
Loading

0 comments on commit a81fb9f

Please sign in to comment.