Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tiering): Lots of metrics #2977

Merged
merged 4 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 11 additions & 23 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,34 +257,22 @@ bool ParseDouble(string_view src, double* value) {

#define ADD(x) (x) += o.x

IoMgrStats& IoMgrStats::operator+=(const IoMgrStats& rhs) {
static_assert(sizeof(IoMgrStats) == 16);

read_total += rhs.read_total;
read_delay_usec += rhs.read_delay_usec;

return *this;
}

TieredStats& TieredStats::operator+=(const TieredStats& o) {
static_assert(sizeof(TieredStats) == 48);

ADD(tiered_writes);
ADD(storage_capacity);
ADD(storage_reserved);
ADD(aborted_write_cnt);
ADD(flush_skip_cnt);
ADD(throttled_write_cnt);

return *this;
}

TieredStatsV2& TieredStatsV2::operator+=(const TieredStatsV2& o) {
static_assert(sizeof(TieredStatsV2) == 24);
static_assert(sizeof(TieredStats) == 80);

ADD(total_stashes);
ADD(total_fetches);
ADD(total_cancels);

ADD(allocated_bytes);
ADD(capacity_bytes);

ADD(pending_read_cnt);
ADD(pending_stash_cnt);

ADD(small_bins_cnt);
ADD(small_bins_entries_cnt);
ADD(small_bins_filling_bytes);

return *this;
}
Expand Down
33 changes: 11 additions & 22 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,22 @@ struct LockTagOptions {
static const LockTagOptions& instance();
};

struct IoMgrStats {
uint64_t read_total = 0;
uint64_t read_delay_usec = 0;

IoMgrStats& operator+=(const IoMgrStats& rhs);
};

struct TieredStats {
uint64_t tiered_writes = 0;

size_t storage_capacity = 0;

// how much was reserved by actively stored items.
size_t storage_reserved = 0;
uint64_t aborted_write_cnt = 0;
uint64_t flush_skip_cnt = 0;
uint64_t throttled_write_cnt = 0;

TieredStats& operator+=(const TieredStats&);
};

struct TieredStatsV2 {
size_t total_stashes = 0;
size_t total_fetches = 0;
size_t total_cancels = 0;

size_t allocated_bytes = 0;
size_t capacity_bytes = 0;

TieredStatsV2& operator+=(const TieredStatsV2&);
size_t pending_read_cnt = 0;
size_t pending_stash_cnt = 0;

size_t small_bins_cnt = 0;
size_t small_bins_entries_cnt = 0;
size_t small_bins_filling_bytes = 0;

TieredStats& operator+=(const TieredStats&);
};

struct SearchStats {
Expand Down
41 changes: 14 additions & 27 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1144,15 +1144,6 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("reply_total", "", m.facade_stats.reply_stats.send_stats.count,
MetricType::COUNTER, &resp->body());

// Tiered metrics.
if (m.disk_stats.read_total > 0) {
AppendMetricWithoutLabels("tiered_reads_total", "", m.disk_stats.read_total,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("tiered_reads_latency_seconds", "",
double(m.disk_stats.read_delay_usec) * 1e-6, MetricType::COUNTER,
&resp->body());
}
}

AppendMetricWithoutLabels("script_error_total", "", m.facade_stats.reply_stats.script_error_count,
Expand Down Expand Up @@ -1845,7 +1836,7 @@ Metrics ServerFamily::GetMetrics() const {
result.shard_stats += shard->stats();

if (shard->tiered_storage_v2()) {
result.tiered_stats_v2 += shard->tiered_storage_v2()->GetStats();
result.tiered_stats += shard->tiered_storage_v2()->GetStats();
}

if (shard->search_indices()) {
Expand Down Expand Up @@ -2057,23 +2048,19 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}

if (should_enter("TIERED", true)) {
append("tiered_entries", total.tiered_entries);
append("tiered_bytes", total.tiered_size);
append("tiered_bytes_human", HumanReadableNumBytes(total.tiered_size));
append("tiered_reads", m.disk_stats.read_total);
append("tiered_read_latency_usec", m.disk_stats.read_delay_usec);
append("tiered_writes", m.tiered_stats.tiered_writes);
append("tiered_reserved", m.tiered_stats.storage_reserved);
append("tiered_capacity", m.tiered_stats.storage_capacity);
append("tiered_aborted_writes", m.tiered_stats.aborted_write_cnt);
append("tiered_flush_skipped", m.tiered_stats.flush_skip_cnt);
append("tiered_throttled_writes", m.tiered_stats.throttled_write_cnt);
}

if (should_enter("TIERED_V2", true)) {
append("tiered_v2_total_stashes", m.tiered_stats_v2.total_stashes);
append("tiered_v2_total_fetches", m.tiered_stats_v2.total_fetches);
append("tiered_v2_allocated_bytes", m.tiered_stats_v2.allocated_bytes);
append("tiered_total_stashes", m.tiered_stats.total_stashes);
append("tiered_total_fetches", m.tiered_stats.total_fetches);
append("tiered_total_cancels", m.tiered_stats.total_cancels);

append("tiered_allocated_bytes", m.tiered_stats.allocated_bytes);
append("tiered_capacity_bytes", m.tiered_stats.capacity_bytes);

append("tiered_pending_read_cnt", m.tiered_stats.pending_read_cnt);
append("tiered_pending_stash_cnt", m.tiered_stats.pending_stash_cnt);

append("tiered_small_bins_cnt", m.tiered_stats.small_bins_cnt);
append("tiered_small_bins_entries_cnt", m.tiered_stats.small_bins_entries_cnt);
append("tiered_small_bins_filling_bytes", m.tiered_stats.small_bins_filling_bytes);
}

if (should_enter("PERSISTENCE", true)) {
Expand Down
4 changes: 1 addition & 3 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,8 @@ struct Metrics {
EngineShard::Stats shard_stats; // per-shard stats

facade::FacadeStats facade_stats; // client stats and buffer sizes
TieredStats tiered_stats; // stats for tiered storage
TieredStatsV2 tiered_stats_v2;
TieredStats tiered_stats;

IoMgrStats disk_stats; // disk stats for io_mgr
SearchStats search_stats;
ServerState::Stats coordinator_stats; // stats on transaction running
PeakStats peak_stats;
Expand Down
44 changes: 34 additions & 10 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {

// Clear IO pending flag for entry
void ClearIoPending(string_view key) {
if (auto pv = Find(key); pv)
if (auto pv = Find(key); pv) {
pv->SetIoPending(false);
stats_.total_cancels++;
}
}

// Clear IO pending flag for all contained entries of bin
Expand Down Expand Up @@ -109,13 +111,9 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {
}
}

TieredStatsV2 GetStats() const {
auto stats = stats_;
stats.allocated_bytes = OpManager::storage_.GetStats().allocated_bytes;
return stats;
}

private:
friend class TieredStorageV2;

PrimeValue* Find(string_view key) {
// TODO: Get DbContext for transaction for correct dbid and time
auto it = db_slice_->FindMutable(DbContext{}, key);
Expand All @@ -124,7 +122,9 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {

bool cache_fetched_ = false;

TieredStatsV2 stats_;
struct {
size_t total_stashes = 0, total_fetches = 0, total_cancels = 0;
} stats_;

TieredStorageV2* ts_;
DbSlice* db_slice_;
Expand Down Expand Up @@ -218,8 +218,32 @@ bool TieredStorageV2::ShouldStash(const PrimeValue& pv) {
return !pv.IsExternal() && pv.ObjType() == OBJ_STRING && pv.Size() >= kMinValueSize;
}

TieredStatsV2 TieredStorageV2::GetStats() const {
return op_manager_->GetStats();
TieredStats TieredStorageV2::GetStats() const {
TieredStats stats{};

{ // ShardOpManager stats
auto shard_stats = op_manager_->stats_;
stats.total_fetches = shard_stats.total_fetches;
stats.total_stashes = shard_stats.total_stashes;
stats.total_cancels = shard_stats.total_cancels;
}

{ // OpManager stats
tiering::OpManager::Stats op_stats = op_manager_->GetStats();
stats.pending_read_cnt = op_stats.pending_read_cnt;
stats.pending_stash_cnt = op_stats.pending_stash_cnt;
stats.allocated_bytes = op_stats.disk_stats.allocated_bytes;
stats.capacity_bytes = op_stats.disk_stats.capacity_bytes;
}

{ // SmallBins stats
tiering::SmallBins::Stats bins_stats = bins_->GetStats();
stats.small_bins_cnt = bins_stats.stashed_bins_cnt;
stats.small_bins_entries_cnt = bins_stats.stashed_entries_cnt;
stats.small_bins_filling_bytes = bins_stats.current_bin_bytes;
}

return stats;
}

} // namespace dfly
6 changes: 3 additions & 3 deletions src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TieredStorageV2 {
// Returns if a value should be stashed
bool ShouldStash(const PrimeValue& pv);

TieredStatsV2 GetStats() const;
TieredStats GetStats() const;

private:
std::unique_ptr<ShardOpManager> op_manager_;
Expand Down Expand Up @@ -108,8 +108,8 @@ class TieredStorageV2 {
void Delete(std::string_view key, PrimeValue* value) {
}

TieredStatsV2 GetStats() {
return TieredStatsV2{};
TieredStats GetStats() {
return TieredStats{};
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/server/tiered_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ TEST_F(TieredStorageV2Test, SimpleGetSet) {
// Make sure all entries were stashed, except the one few not filling a small page
size_t stashes = 0;
ExpectConditionWithinTimeout([this, &stashes] {
stashes = GetMetrics().tiered_stats_v2.total_stashes;
stashes = GetMetrics().tiered_stats.total_stashes;
return stashes >= kMax - 64 - 1;
});

Expand Down
3 changes: 1 addition & 2 deletions src/server/tiering/disk_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
memcpy(buf.bytes.data(), bytes.data(), bytes.length());

auto io_cb = [this, cb, offset, buf, len = bytes.size()](int io_res) {
VLOG(0) << "IoRes " << io_res << " " << len;
if (io_res < 0) {
MarkAsFree({size_t(offset), len});
cb({}, std::error_code{-io_res, std::system_category()});
Expand All @@ -134,7 +133,7 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
}

DiskStorage::Stats DiskStorage::GetStats() const {
return {alloc_.allocated_bytes()};
return {alloc_.allocated_bytes(), alloc_.capacity()};
}

} // namespace dfly::tiering
1 change: 1 addition & 0 deletions src/server/tiering/disk_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class DiskStorage {
public:
struct Stats {
size_t allocated_bytes = 0;
size_t capacity_bytes = 0;
};

using ReadCb = std::function<void(std::string_view, std::error_code)>;
Expand Down
14 changes: 13 additions & 1 deletion src/server/tiering/disk_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "base/gtest.h"
#include "base/logging.h"
#include "server/tiering/common.h"
#include "server/tiering/test_common.h"
#include "util/fibers/fibers.h"
#include "util/fibers/pool.h"
Expand Down Expand Up @@ -59,12 +60,16 @@ struct DiskStorageTest : public PoolTestBase {
last_reads_.erase(index);
}

void Wait() {
void Wait() const {
while (pending_ops_ > 0) {
::util::ThisFiber::SleepFor(1ms);
}
}

DiskStorage::Stats GetStats() const {
return storage_->GetStats();
}

protected:
int pending_ops_ = 0;

Expand All @@ -82,6 +87,8 @@ TEST_F(DiskStorageTest, Basic) {
Wait();
EXPECT_EQ(segments_.size(), 100);

EXPECT_EQ(GetStats().allocated_bytes, 100 * kPageSize);

// Read all 100 values
for (size_t i = 0; i < 100; i++)
Read(i);
Expand All @@ -91,6 +98,11 @@ TEST_F(DiskStorageTest, Basic) {
for (size_t i = 0; i < 100; i++)
EXPECT_EQ(last_reads_[i], absl::StrCat("value", i));

// Delete all values
for (size_t i = 0; i < 100; i++)
Delete(i);
EXPECT_EQ(GetStats().allocated_bytes, 0);

Close();
});
}
Expand Down
5 changes: 0 additions & 5 deletions src/server/tiering/io_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,11 @@ class IoMgr {
return grow_progress_;
}

const IoMgrStats& GetStats() const {
return stats_;
}

private:
std::unique_ptr<util::fb2::LinuxFile> backing_file_;
size_t sz_ = 0;

bool grow_progress_ = false;
IoMgrStats stats_;
};

} // namespace dfly::tiering
11 changes: 9 additions & 2 deletions src/server/tiering/op_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) {
void OpManager::Delete(EntryId id) {
// If the item isn't offloaded, it has io pending, so cancel it
DCHECK(pending_stash_ver_.count(ToOwned(id)));
++pending_stash_ver_[ToOwned(id)];
pending_stash_ver_.erase(ToOwned(id));
}

void OpManager::Delete(DiskSegment segment) {
Expand Down Expand Up @@ -89,7 +89,8 @@ void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment
it != pending_stash_ver_.end() && it->second == version) {
pending_stash_ver_.erase(it);
ReportStashed(id, segment, ec);
} else {
} else if (!ec) {
// Throw away the value because it's no longer up-to-date even if no error occured
storage_.MarkAsFree(segment);
}
}
Expand Down Expand Up @@ -125,4 +126,10 @@ OpManager::EntryOps& OpManager::ReadOp::ForId(EntryId id, DiskSegment key_segmen
return key_ops.emplace_back(ToOwned(id), key_segment);
}

OpManager::Stats OpManager::GetStats() const {
return {.disk_stats = storage_.GetStats(),
.pending_read_cnt = pending_reads_.size(),
.pending_stash_cnt = pending_stash_ver_.size()};
}

} // namespace dfly::tiering