Skip to content

Commit

Permalink
Merge pull request #22 from kruserr/2024-05-26-0
Browse files Browse the repository at this point in the history
2024 05 26 0
  • Loading branch information
kruserr committed May 26, 2024
2 parents 89c6724 + b444a74 commit 2e98b97
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 233 deletions.
260 changes: 133 additions & 127 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 10 additions & 7 deletions rapiddb/src/db/mmav_db/mmav.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::db::mmav_db::mmav_unit::MMAVUnit;
use crate::errors::MMAVError;
use crate::errors::Error;

/// Memory Mapped Append-only Vector
///
Expand Down Expand Up @@ -148,12 +148,15 @@ impl MMAV {
return unit_map[&index].len();
}

let unit = MMAVUnit::new(&format!("{id}/{index}"), size, data_start_index);
let result = unit.len();
let mut result: usize = Default::default();

unit_map.insert(index, unit);
let _ = MMAVUnit::new(&format!("{id}/{index}"), size, data_start_index)
.map(|unit| {
result = unit.len();
unit_map.insert(index, unit);
});

result
return result;
}

/// Load unit that contains `index`
Expand Down Expand Up @@ -231,11 +234,11 @@ impl MMAV {
pub fn push(&mut self, value: &[u8]) {
self.unit_map.get_mut(&self.index).unwrap().push(value).unwrap_or_else(
|error| match error {
MMAVError::ArrayFull => {
Error::ArrayFull => {
self.expand();
self.push(value);
}
MMAVError::FileFull => {
Error::FileFull => {
self.expand();
self.push(value);
}
Expand Down
8 changes: 6 additions & 2 deletions rapiddb/src/db/mmav_db/mmav_async_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl MMAVAsyncDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&file_name)
.unwrap_or_else(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -126,6 +127,7 @@ impl MMAVAsyncDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(file_name)
.unwrap()
});
Expand Down Expand Up @@ -155,7 +157,7 @@ impl Default for MMAVAsyncDatabase {
#[async_trait::async_trait]
impl IAsyncDatabase for MMAVAsyncDatabase {
async fn contains(&self, id: &str) -> bool {
self.sensors.get(id).is_some()
self.sensors.contains_key(id)
}

async fn get(&mut self, id: &str, rec_id: usize) -> Vec<u8> {
Expand Down Expand Up @@ -215,6 +217,7 @@ impl IAsyncDatabase for MMAVAsyncDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&file_name)
.unwrap_or_else(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -234,6 +237,7 @@ impl IAsyncDatabase for MMAVAsyncDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(file_name)
.unwrap()
});
Expand All @@ -243,7 +247,7 @@ impl IAsyncDatabase for MMAVAsyncDatabase {
}

async fn get_aggregates(&self, id: &str) -> Vec<u8> {
if self.aggregates.get(id).is_none() {
if !self.aggregates.contains_key(id) {
return Default::default();
}

Expand Down
8 changes: 6 additions & 2 deletions rapiddb/src/db/mmav_db/mmav_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl MMAVDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&file_name)
.unwrap_or_else(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -126,6 +127,7 @@ impl MMAVDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(file_name)
.unwrap()
});
Expand Down Expand Up @@ -154,7 +156,7 @@ impl Default for MMAVDatabase {
}
impl IDatabase for MMAVDatabase {
fn contains(&self, id: &str) -> bool {
self.sensors.get(id).is_some()
self.sensors.contains_key(id)
}

fn get(&mut self, id: &str, rec_id: usize) -> Vec<u8> {
Expand Down Expand Up @@ -214,6 +216,7 @@ impl IDatabase for MMAVDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&file_name)
.unwrap_or_else(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -233,6 +236,7 @@ impl IDatabase for MMAVDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(file_name)
.unwrap()
});
Expand All @@ -242,7 +246,7 @@ impl IDatabase for MMAVDatabase {
}

fn get_aggregates(&self, id: &str) -> Vec<u8> {
if self.aggregates.get(id).is_none() {
if !self.aggregates.contains_key(id) {
return Default::default();
}

Expand Down
96 changes: 37 additions & 59 deletions rapiddb/src/db/mmav_db/mmav_unit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::errors::MMAVError;
use crate::errors::Error;

/// Memory Mapped Append-only Vector Unit
///
Expand All @@ -24,7 +24,6 @@ impl MMAVUnit {
/// Memory Mapped Append-only Vector Unit Constructor
///
/// ## Default params:
///
/// `size` = 4000000
///
/// `data_start_index` = 80008
Expand All @@ -37,59 +36,53 @@ impl MMAVUnit {
/// unit.push(data).unwrap_or_default();
/// assert_eq!(unit.last(), data);
/// ```
pub fn new(file_name: &str, size: usize, data_start_index: usize) -> Self {
let file_did_exist = std::path::Path::new(file_name).exists();
pub fn new(
file_name: &str,
size: usize,
data_start_index: usize,
) -> Result<Self, Error> {
let file_path = std::path::Path::new(file_name);
let file_exists = file_path.exists();

if (!file_exists) {
file_path.parent().map(std::fs::create_dir_all);
}

let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file_name)
.unwrap_or_else(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
std::fs::create_dir(file_name.split('/').collect::<Vec<_>>()[0])
.unwrap_or_default();
}

std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file_name)
.unwrap()
});

file.set_len(size as u64).unwrap_or_default();

let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file).unwrap() };
.truncate(false)
.open(file_name)?;

file.set_len(size as u64)?;

let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file)? };
mmap.advise(memmap2::Advice::Random).unwrap_or_default();

let mut seek = data_start_index;
if file_did_exist {
seek = u32::from_ne_bytes(mmap[0..4].try_into().unwrap()) as usize;
if (file_exists) {
seek = u32::from_ne_bytes(mmap[0..4].try_into()?) as usize;

if seek > mmap.len() {
panic!(
"seek_index must be between {data_start_index} and {}",
mmap.len()
);
return Err(Error::IndexOutOfRange);
}
}

let mut seek_index = 8;
if file_did_exist {
seek_index = u32::from_ne_bytes(mmap[4..8].try_into().unwrap()) as usize;
if (file_exists) {
seek_index = u32::from_ne_bytes(mmap[4..8].try_into()?) as usize;

if seek_index > data_start_index {
panic!("seek_index must be between 8 and {data_start_index}");
return Err(Error::IndexOutOfRange);
}
}

if mmap[seek] == 0 {
if (mmap[seek] == 0) {
mmap[seek] = 0;
}

Self { seek, seek_index, mmap, data_start_index }
return Ok(Self { seek, seek_index, mmap, data_start_index });
}

/// Set seek to `len`
Expand All @@ -116,12 +109,6 @@ impl MMAVUnit {

/// Push `value` to vector
///
/// ## Errors
/// ```ignore
/// MMAVError::ArrayFull
/// MMAVError::FileFull
/// ```
///
/// ## Examples
/// ```ignore
/// let mut unit = MMAVUnit::new("test-0/0", 4000000, 80008);
Expand All @@ -130,13 +117,13 @@ impl MMAVUnit {
/// unit.push(data).unwrap_or_default();
/// assert_eq!(unit.last(), data);
/// ```
pub fn push(&mut self, value: &[u8]) -> Result<(), MMAVError> {
pub fn push(&mut self, value: &[u8]) -> Result<(), Error> {
if self.len() > 9999 {
return Err(MMAVError::ArrayFull);
return Err(Error::ArrayFull);
}

if self.seek + value.len() > self.mmap.len() {
return Err(MMAVError::FileFull);
return Err(Error::FileFull);
}

self.mmap[self.seek..self.seek + value.len()].clone_from_slice(value);
Expand All @@ -147,13 +134,6 @@ impl MMAVUnit {

/// Get `index` from vector
///
/// ## Errors
/// ```ignore
/// MMAVError::ArrayEmpty
/// MMAVError::IndexOutOfRange
/// MMAVError::IndexOutOfBounds
/// ```
///
/// ## Examples
/// ```ignore
/// let mut unit = MMAVUnit::new("test-0/0", 4000000, 80008);
Expand All @@ -162,32 +142,30 @@ impl MMAVUnit {
/// unit.push(data).unwrap_or_default();
/// assert_eq!(unit.get(0), data);
/// ```
pub fn get(&self, index: usize) -> Result<Vec<u8>, MMAVError> {
pub fn get(&self, index: usize) -> Result<Vec<u8>, Error> {
if self.seek_index == 8 {
return Err(MMAVError::ArrayEmpty);
return Err(Error::ArrayEmpty);
}

if index > 9999 {
return Err(MMAVError::IndexOutOfRange);
return Err(Error::IndexOutOfRange);
}

if index > self.len() - 1 {
return Err(MMAVError::IndexOutOfBounds);
return Err(Error::IndexOutOfBounds);
}

let i = 8 * index + 8;

let start =
u32::from_ne_bytes(self.mmap[i..i + 4].try_into().unwrap()) as usize;
let end =
u32::from_ne_bytes(self.mmap[i + 4..i + 8].try_into().unwrap()) as usize;
let start = u32::from_ne_bytes(self.mmap[i..i + 4].try_into()?) as usize;
let end = u32::from_ne_bytes(self.mmap[i + 4..i + 8].try_into()?) as usize;

if start < self.data_start_index || start > self.mmap.len() {
return Err(MMAVError::IndexOutOfRange);
return Err(Error::IndexOutOfRange);
}

if end < self.data_start_index || end > self.mmap.len() {
return Err(MMAVError::IndexOutOfRange);
return Err(Error::IndexOutOfRange);
}

Ok((self.mmap[start..end]).to_vec())
Expand Down
32 changes: 0 additions & 32 deletions rapiddb/src/errors/mmav_error.rs

This file was deleted.

Loading

0 comments on commit 2e98b97

Please sign in to comment.