Skip to content

Commit

Permalink
shit, revert this mess
Browse files Browse the repository at this point in the history
  • Loading branch information
slyphon committed Jun 27, 2018
1 parent 6ec0e55 commit e369bd7
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 96 deletions.
6 changes: 4 additions & 2 deletions src/storage/cdb/cdb_rs/src/cdb/errors.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::error;
use std::ffi::NulError;
use std::fmt;
use std::io;
use std::num::ParseIntError;
use std::str::Utf8Error;
use std::ffi::NulError;

#[derive(Debug)]
pub enum CDBError {
Expand Down Expand Up @@ -32,7 +32,9 @@ impl From<io::Error> for CDBError {
}

impl From<NulError> for CDBError {
fn from(err: NulError) -> CDBError { CDBError::NulError(err) }
fn from(err: NulError) -> CDBError {
CDBError::NulError(err)
}
}

impl error::Error for CDBError {
Expand Down
192 changes: 148 additions & 44 deletions src/storage/cdb/cdb_rs/src/cdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::result;

pub mod errors;
pub mod input;
pub mod writer;
pub mod storage;
pub mod writer;

use self::storage::SliceFactory;
use self::storage::Sliceable;

pub use self::errors::CDBError;

Expand Down Expand Up @@ -124,7 +124,6 @@ impl KV {
}
}


struct IndexEntry {
hash: CDBHash, // the hash of the stored key
ptr: usize, // pointer to the absolute position of the data in the db
Expand All @@ -141,7 +140,12 @@ pub struct KVIter {
impl KVIter {
fn new(cdb: Box<CDB>) -> Result<Self> {
let bkt = cdb.bucket_at(0)?;
Ok(KVIter{cdb, bkt_idx: 0, entry_n: 0, bkt})
Ok(KVIter {
cdb,
bkt_idx: 0,
entry_n: 0,
bkt,
})
}
}

Expand All @@ -151,7 +155,7 @@ impl Iterator for KVIter {
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.bkt_idx >= MAIN_TABLE_SIZE {
return None
return None;
}

if self.entry_n >= self.bkt.num_ents {
Expand All @@ -163,24 +167,23 @@ impl Iterator for KVIter {
Err(err) => return Some(Err(err)),
}
}
continue
continue;
}

let idx_ent =
match self.cdb.index_entry_at(self.bkt.entry_n_pos(self.entry_n)) {
Ok(index_entry) => index_entry,
Err(err) => return Some(Err(err)),
};
let idx_ent = match self.cdb.index_entry_at(self.bkt.entry_n_pos(self.entry_n)) {
Ok(index_entry) => index_entry,
Err(err) => return Some(Err(err)),
};

self.entry_n += 1;

if idx_ent.ptr == 0 {
continue
continue;
} else {
return match self.cdb.get_kv(idx_ent) {
Ok(kv) => Some(Ok(kv)),
Err(err) => Some(Err(err)),
}
};
}
}
}
Expand All @@ -189,29 +192,36 @@ impl Iterator for KVIter {
#[derive(Debug)]
#[repr(C)]
pub struct CDB {
data: SliceFactory,
inner: Box<Sliceable + Clone>,
}

impl Clone for CDB {
fn clone(&self) -> Self {
CDB{data: self.data.clone()}
CDB {
inner: Box::new((*self.inner).clone()),
}
}
}

impl CDB {
pub fn new(sf: SliceFactory) -> CDB { CDB{data: sf} }

pub fn stdio(path: &str) -> Result<CDB> {
Ok(CDB::new(SliceFactory::make_filewrap(path)?))
pub fn new<T>(inner: T) -> CDB
where
T: Sliceable + Clone,
{
CDB{inner: Box::new(inner)}
}

pub fn mmap(path: &str) -> Result<CDB> {
Ok(CDB::new(SliceFactory::make_map(path)?))
pub fn stdio(path: &str) -> Result<CDB> {
Ok(CDB::new(Sliceable::make_filewrap(path)?))
}

pub fn load(path: &str) -> Result<CDB> {
Ok(CDB::new(SliceFactory::load(path)?))
}
// pub fn mmap(path: &str) -> Result<CDB> {
// Ok(CDB::new(Sliceable::make_map(path)?))
// }
//
// pub fn load(path: &str) -> Result<CDB> {
// Ok(CDB::new(Sliceable::load(path)?))
// }

pub fn kvs_iter(&self) -> Result<KVIter> {
Ok(KVIter::new(Box::new(self.clone()))?)
Expand All @@ -223,7 +233,7 @@ impl CDB {

let off = 8 * idx;

let b = self.data.slice(off, off + 8)?;
let b = self.data.slice(off..(off + 8))?;
assert_eq!(b.len(), 8);
trace!("bucket_at idx: {}, got buf: {:?}", idx, b);

Expand All @@ -232,7 +242,7 @@ impl CDB {
let ptr = buf.get_u32_le() as usize;
let num_ents = buf.get_u32_le() as usize;

Ok(Bucket{ptr, num_ents})
Ok(Bucket { ptr, num_ents })
}

// returns the index entry at absolute position 'pos' in the db
Expand All @@ -244,27 +254,30 @@ impl CDB {
panic!("position {:?} was in the main table!", pos)
}

let mut b = self.data.slice(pos, pos + 8)?.into_buf();
let mut b = self.data.slice(pos..(pos + 8))?.into_buf();
let hash = CDBHash(b.get_u32_le());
let ptr = b.get_u32_le() as usize;

Ok(IndexEntry{hash, ptr})
Ok(IndexEntry { hash, ptr })
}

#[inline]
fn get_kv(&self, ie: IndexEntry) -> Result<KV> {
let b = self.data.slice(ie.ptr, ie.ptr + DATA_HEADER_SIZE)?;
let b = self.data.slice(ie.ptr..(ie.ptr + DATA_HEADER_SIZE))?;

let ksize = b.slice_to(4).into_buf().get_u32_le() as usize;
let vsize = b.slice_from(4).into_buf().get_u32_le() as usize;

let kstart = ie.ptr + DATA_HEADER_SIZE;
let vstart = kstart + ksize;

let k = self.data.slice(kstart, kstart + ksize)?;
let v = self.data.slice(vstart, vstart + vsize)?;
let k = self.data.slice(kstart..(kstart + ksize))?;
let v = self.data.slice(vstart..(vstart + vsize))?;

Ok(KV{k: Bytes::from(k), v: Bytes::from(v)})
Ok(KV {
k: Bytes::from(k),
v: Bytes::from(v),
})
}

pub fn get(&self, key: &[u8], buf: &mut Vec<u8>) -> Result<Option<usize>> {
Expand Down Expand Up @@ -301,8 +314,101 @@ impl CDB {
}
}

trait KVStore {
type S: Sliceable + Clone;

fn inner<'a>(&'a self) -> &'a Self::S;

#[inline]
fn bucket_at(&self, idx: usize) -> Result<Bucket> {
assert!(idx < MAIN_TABLE_SIZE);

let off = 8 * idx;

let b = self.inner().slice(off..(off + 8))?;
assert_eq!(b.len(), 8);
trace!("bucket_at idx: {}, got buf: {:?}", idx, b);

let mut buf = b.into_buf();

let ptr = buf.get_u32_le() as usize;
let num_ents = buf.get_u32_le() as usize;

Ok(Bucket { ptr, num_ents })
}

// returns the index entry at absolute position 'pos' in the db
#[inline]
fn index_entry_at(&self, pos: IndexEntryPos) -> Result<IndexEntry> {
let pos: usize = pos.into();

if pos < MAIN_TABLE_SIZE_BYTES {
panic!("position {:?} was in the main table!", pos)
}

let mut b = self.inner().slice(pos..(pos + 8))?.into_buf();
let hash = CDBHash(b.get_u32_le());
let ptr = b.get_u32_le() as usize;

Ok(IndexEntry { hash, ptr })
}

#[inline]
fn get_kv(&self, ie: IndexEntry) -> Result<KV> {
let b = self.inner().slice(ie.ptr..(ie.ptr + DATA_HEADER_SIZE))?;

let ksize = b.slice_to(4).into_buf().get_u32_le() as usize;
let vsize = b.slice_from(4).into_buf().get_u32_le() as usize;

let kstart = ie.ptr + DATA_HEADER_SIZE;
let vstart = kstart + ksize;

let k = self.inner().slice(kstart..(kstart + ksize))?;
let v = self.inner().slice(vstart..(vstart + vsize))?;

Ok(KV {
k: Bytes::from(k),
v: Bytes::from(v),
})
}

fn get(&self, key: &[u8], buf: &mut Vec<u8>) -> Result<Option<usize>> {
let key = key.into();
let hash = CDBHash::new(key);
let bucket = self.bucket_at(hash.table())?;

if bucket.num_ents == 0 {
trace!("bucket empty, returning none");
return Ok(None);
}

let slot = hash.slot(bucket.num_ents);

for x in 0..bucket.num_ents {
let index_entry_pos = bucket.entry_n_pos((x + slot) % bucket.num_ents);

let idx_ent = self.index_entry_at(index_entry_pos)?;

if idx_ent.ptr == 0 {
return Ok(None);
} else if idx_ent.hash == hash {
let kv = self.get_kv(idx_ent)?;
if &kv.k[..] == key {
buf.write_all(&kv.k[..]).unwrap();
return Ok(Some(kv.k.len()));
} else {
continue;
}
}
}

Ok(None)
}
}

#[cfg(test)]
mod tests {
use super::*;
use env_logger;
use proptest::collection::vec;
use proptest::prelude::*;
Expand All @@ -313,7 +419,6 @@ mod tests {
use std::io::{BufRead, BufReader};
use std::path::Path;
use std::path::PathBuf;
use super::*;
use tempfile::NamedTempFile;
use tinycdb::Cdb as TCDB;

Expand All @@ -325,7 +430,7 @@ mod tests {
struct QueryResult(String, Option<String>);

#[allow(dead_code)]
fn create_temp_cdb<'a>(kvs: &Vec<(String, String)>) -> Result<SliceFactory> {
fn create_temp_cdb<'a>(kvs: &Vec<(String, String)>) -> Result<Box<Sliceable>> {
let path: PathBuf;

{
Expand All @@ -349,9 +454,9 @@ mod tests {
}
}).unwrap();

let sf = self::storage::SliceFactory::load(path.to_str().unwrap())?;
let sf = self::storage::Sliceable::load(path.to_str().unwrap())?;

Ok(sf)
Ok(Box::new(sf))
}

proptest! {
Expand All @@ -370,21 +475,20 @@ mod tests {

type QueryResultIter<'a> = Box<Iterator<Item = QueryResult> + 'a>;

fn read_keys<'a>(cdb: &'a CDB, xs: &'a Vec<String>) -> QueryResultIter<'a> {
fn read_keys<'a, T>(cdb: &'a CDB, xs: &'a Vec<String>) -> QueryResultIter<'a>
where
T: Sliceable + Clone,
{
Box::new(xs.iter().map(move |x| {
let mut buf = Vec::with_capacity(1024 * 1024);
let res = cdb.get(x.as_ref(), &mut buf).unwrap();
QueryResult(
x.clone(),
res.map(|_| String::from_utf8(buf).unwrap()),
)
QueryResult(x.clone(), res.map(|_| String::from_utf8(buf).unwrap()))
}))
}

#[allow(dead_code)]
fn make_temp_cdb_single_vals(xs: &Vec<String>) -> SliceFactory {
let kvs: Vec<(String, String)> =
xs.iter().map(|k| (k.to_owned(), k.to_owned())).collect();
fn make_temp_cdb_single_vals(xs: &Vec<String>) -> Sliceable {
let kvs: Vec<(String, String)> = xs.iter().map(|k| (k.to_owned(), k.to_owned())).collect();
create_temp_cdb(&kvs).unwrap()
}

Expand Down
Loading

0 comments on commit e369bd7

Please sign in to comment.