From 4d883f016f1f4c5f68436d5daeac0296fc39e024 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 8 Apr 2020 12:30:48 +0200 Subject: [PATCH 1/8] Dynamic dispatch for Storage, no generics in Feed This removes the generic argument from the Feed struct, making it simpler to work with. Instead, the Storage is internally put into a Box, where DynStorage is an async trait with the public Storage functions. --- Cargo.toml | 1 + examples/async.rs | 15 +--- src/feed.rs | 62 +++++++-------- src/feed_builder.rs | 19 ++--- src/lib.rs | 8 +- src/storage/mod.rs | 170 +++++++++++++++++++++++++++++------------ src/storage/persist.rs | 4 +- tests/feed.rs | 4 +- 8 files changed, 171 insertions(+), 112 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fe6040c..367236c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ tree-index = "0.6.0" bitfield-rle = "0.2.0" futures = "0.3.4" async-std = "1.5.0" +async-trait = "0.1.30" [dev-dependencies] quickcheck = "0.9.2" diff --git a/examples/async.rs b/examples/async.rs index a052bfe..5c8e20c 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -1,27 +1,18 @@ use async_std::task; use hypercore::Feed; -use random_access_storage::RandomAccess; -use std::fmt::Debug; -async fn append(feed: &mut Feed, content: &[u8]) -where - T: RandomAccess> + Debug + Send, -{ +async fn append(feed: &mut Feed, content: &[u8]) { feed.append(content).await.unwrap(); } -async fn print(feed: &mut Feed) -where - T: RandomAccess> + Debug + Send, -{ +async fn print(feed: &mut Feed) { println!("{:?}", feed.get(0).await); println!("{:?}", feed.get(1).await); } fn main() { task::block_on(task::spawn(async { - let mut feed = Feed::default(); - + let mut feed = Feed::open_in_memory().await.unwrap(); append(&mut feed, b"hello").await; append(&mut feed, b"world").await; print(&mut feed).await; diff --git a/src/feed.rs b/src/feed.rs index afa1487..6d44a0a 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -2,7 +2,9 @@ use crate::feed_builder::FeedBuilder; use crate::replicate::{Message, Peer}; -pub use crate::storage::{Node, NodeTrait, Storage, Store}; +pub use crate::storage::{ + storage_disk, storage_memory, BoxStorage, Node, NodeTrait, Storage, Store, +}; use crate::audit::Audit; use crate::bitfield::Bitfield; @@ -13,12 +15,8 @@ use crate::proof::Proof; use anyhow::{bail, ensure, Result}; use flat_tree as flat; use pretty_hash::fmt as pretty_fmt; -use random_access_disk::RandomAccessDisk; -use random_access_memory::RandomAccessMemory; -use random_access_storage::RandomAccess; use tree_index::TreeIndex; -use std::borrow::Borrow; use std::cmp; use std::fmt::{self, Debug, Display}; use std::ops::Range; @@ -55,15 +53,12 @@ use std::sync::Arc; /// [builder]: crate::feed_builder::FeedBuilder /// [with_storage]: crate::feed::Feed::with_storage #[derive(Debug)] -pub struct Feed -where - T: RandomAccess> + Debug, -{ +pub struct Feed { /// Merkle tree instance. pub(crate) merkle: Merkle, pub(crate) public_key: PublicKey, pub(crate) secret_key: Option, - pub(crate) storage: Storage, + pub(crate) storage: BoxStorage, /// Total length of raw data stored in bytes. pub(crate) byte_length: u64, /// Total number of entries stored in the `Feed` @@ -74,12 +69,9 @@ where pub(crate) peers: Vec, } -impl Feed -where - T: RandomAccess> + Debug + Send, -{ +impl Feed { /// Create a new instance with a custom storage backend. - pub async fn with_storage(mut storage: crate::storage::Storage) -> Result { + pub async fn with_storage(mut storage: BoxStorage) -> Result { match storage.read_partial_keypair().await { Some(partial_keypair) => { let builder = FeedBuilder::new(partial_keypair.public, storage); @@ -113,11 +105,27 @@ where } /// Starts a `FeedBuilder` with the provided `PublicKey` and `Storage`. - pub fn builder(public_key: PublicKey, storage: Storage) -> FeedBuilder { + pub fn builder(public_key: PublicKey, storage: BoxStorage) -> FeedBuilder { FeedBuilder::new(public_key, storage) } - /// Get the number of entries in the feed. + /// Create a new instance that persists to disk at the location of `dir`. + // TODO: Ensure that dir is always a directory. + // NOTE: Should we `mkdirp` here? + // NOTE: Should we call these `data.bitfield` / `data.tree`? + pub async fn open_from_disk>(path: P) -> Result { + let dir = path.as_ref().to_owned(); + let storage = storage_disk(&dir).await?; + Self::with_storage(storage).await + } + + /// Create a new in-memory instance. + pub async fn open_in_memory() -> Result { + let storage = storage_memory().await.unwrap(); + Self::with_storage(storage).await + } + + /// Get the amount of entries in the feed. #[inline] pub fn len(&self) -> u64 { self.length @@ -159,7 +167,7 @@ where let index = self.length; let message = hash_with_length_as_bytes(hash, index + 1); let signature = sign(&self.public_key, key, &message); - self.storage.put_signature(index, signature).await?; + self.storage.put_signature(index, &signature).await?; for node in self.merkle.nodes() { self.storage.put_node(node).await?; @@ -397,8 +405,7 @@ where } if let Some(sig) = sig { - let sig = sig.borrow(); - self.storage.put_signature(index, sig).await?; + self.storage.put_signature(index, &sig).await?; } for node in nodes { @@ -603,7 +610,7 @@ where } } -impl Feed { +impl Feed { /// Create a new instance that persists to disk at the location of `dir`. /// If dir was not there, it will be created. // NOTE: Should we call these `data.bitfield` / `data.tree`? @@ -618,7 +625,7 @@ impl Feed { let dir = path.as_ref().to_owned(); - let storage = Storage::new_disk(&dir, false).await?; + let storage = storage_disk(&dir).await?; Self::with_storage(storage).await } } @@ -628,18 +635,13 @@ impl Feed { /// ## Panics /// Can panic if constructing the in-memory store fails, which is highly /// unlikely. -impl Default for Feed { +impl Default for Feed { fn default() -> Self { - async_std::task::block_on(async { - let storage = Storage::new_memory().await.unwrap(); - Self::with_storage(storage).await.unwrap() - }) + async_std::task::block_on(async { Self::open_in_memory().await.unwrap() }) } } -impl> + Debug + Send> Display - for Feed -{ +impl Display for Feed { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: yay, we should find a way to convert this .unwrap() to an error // type that's accepted by `fmt::Result<(), fmt::Error>`. diff --git a/src/feed_builder.rs b/src/feed_builder.rs index e4ac541..93bda72 100644 --- a/src/feed_builder.rs +++ b/src/feed_builder.rs @@ -2,8 +2,7 @@ use ed25519_dalek::{PublicKey, SecretKey}; use crate::bitfield::Bitfield; use crate::crypto::Merkle; -use crate::storage::Storage; -use random_access_storage::RandomAccess; +use crate::storage::BoxStorage; use std::fmt::Debug; use tree_index::TreeIndex; @@ -14,22 +13,16 @@ use anyhow::Result; // TODO: make this an actual builder pattern. // https://deterministic.space/elegant-apis-in-rust.html#builder-pattern #[derive(Debug)] -pub struct FeedBuilder -where - T: RandomAccess + Debug, -{ - storage: Storage, +pub struct FeedBuilder { + storage: BoxStorage, public_key: PublicKey, secret_key: Option, } -impl FeedBuilder -where - T: RandomAccess> + Debug + Send, -{ +impl FeedBuilder { /// Create a new instance. #[inline] - pub fn new(public_key: PublicKey, storage: Storage) -> Self { + pub fn new(public_key: PublicKey, storage: BoxStorage) -> Self { Self { storage, public_key, @@ -45,7 +38,7 @@ where /// Finalize the builder. #[inline] - pub async fn build(mut self) -> Result> { + pub async fn build(mut self) -> Result { let (bitfield, tree) = if let Ok(bitfield) = self.storage.read_bitfield().await { Bitfield::from_slice(&bitfield) } else { diff --git a/src/lib.rs b/src/lib.rs index 0c99939..b70156c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,14 +48,12 @@ pub use crate::feed::Feed; pub use crate::feed_builder::FeedBuilder; pub use crate::proof::Proof; pub use crate::replicate::Peer; -pub use crate::storage::{Node, NodeTrait, Storage, Store}; +pub use crate::storage::{storage_disk, storage_memory, Node, NodeTrait, Storage, Store}; pub use ed25519_dalek::{PublicKey, SecretKey}; use std::path::Path; /// Create a new Hypercore `Feed`. -pub async fn open>( - path: P, -) -> anyhow::Result> { - Feed::open(path).await +pub async fn open>(path: P) -> anyhow::Result { + Feed::open_from_disk(path).await } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 96a75b6..bb0c93d 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -8,6 +8,7 @@ pub use self::persist::Persist; pub use merkle_tree_stream::Node as NodeTrait; use anyhow::{anyhow, ensure, Result}; +use async_trait::async_trait; use ed25519_dalek::{PublicKey, SecretKey, Signature, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH}; use flat_tree as flat; use futures::future::FutureExt; @@ -28,6 +29,102 @@ pub struct PartialKeypair { pub secret: Option, } +pub type BoxStorage = Box; + +/// Create a new instance backed by a `RandomAccessMemory` instance. +pub async fn storage_memory() -> Result> { + let create = |_| async { Ok(RandomAccessMemory::default()) }.boxed(); + Ok(Storage::new(create, false).await?) +} + +/// Create a new instance backed by a `RandomAccessDisk` instance. +pub async fn storage_disk(dir: &PathBuf) -> Result> { + let storage = |storage: Store| { + let name = match storage { + Store::Tree => "tree", + Store::Data => "data", + Store::Bitfield => "bitfield", + Store::Signatures => "signatures", + Store::Keypair => "key", + }; + RandomAccessDisk::open(dir.as_path().join(name)).boxed() + }; + Ok(Storage::new(storage, false).await?) +} + +#[async_trait] +pub trait DynStorage: Debug + Send { + /// Write data to the feed. + async fn write_data(&mut self, offset: u64, data: &[u8]) -> Result<()>; + + /// Write a byte vector to a data storage (random-access instance) at the + /// position of `index`. + /// + /// NOTE: Meant to be called from the `.put()` feed method. Probably used to + /// insert data as-is after receiving it from the network (need to confirm + /// with mafintosh). + /// TODO: Ensure the signature size is correct. + /// NOTE: Should we create a `Data` entry type? + async fn put_data(&mut self, index: u64, data: &[u8], nodes: &[Node]) -> Result<()>; + + /// Get data from disk that the user has written to it. This is stored + /// unencrypted, so there's no decryption needed. + // FIXME: data_offset always reads out index 0, length 0 + async fn get_data(&mut self, index: u64) -> Result>; + + /// Search the signature stores for a `Signature`, starting at `index`. + fn next_signature<'a>( + &'a mut self, + index: u64, + ) -> futures::future::BoxFuture<'a, Result>; + + /// Get a `Signature` from the store. + async fn get_signature(&mut self, index: u64) -> Result; + + /// Write a `Signature` to `self.Signatures`. + /// TODO: Ensure the signature size is correct. + /// NOTE: Should we create a `Signature` entry type? + async fn put_signature(&mut self, index: u64, signature: &Signature) -> Result<()>; + + /// TODO(yw) docs + /// Get the offset for the data, return `(offset, size)`. + /// + /// ## Panics + /// A panic can occur if no maximum value is found. + async fn data_offset(&mut self, index: u64, cached_nodes: &[Node]) -> Result>; + + /// Get a `Node` from the `tree` storage. + async fn get_node(&mut self, index: u64) -> Result; + + /// Write a `Node` to the `tree` storage. + /// TODO: prevent extra allocs here. Implement a method on node that can reuse + /// a buffer. + async fn put_node(&mut self, node: &Node) -> Result<()>; + + /// Write data to the internal bitfield module. + /// TODO: Ensure the chunk size is correct. + /// NOTE: Should we create a bitfield entry type? + async fn put_bitfield(&mut self, offset: u64, data: &[u8]) -> Result<()>; + + /// Read a public key from storage + async fn read_public_key(&mut self) -> Result; + + /// Read a secret key from storage + async fn read_secret_key(&mut self) -> Result; + + /// Write a public key to the storage + async fn write_public_key(&mut self, public_key: &PublicKey) -> Result<()>; + + /// Write a secret key to the storage + async fn write_secret_key(&mut self, secret_key: &SecretKey) -> Result<()>; + + /// Tries to read a partial keypair (ie: with an optional secret_key) from the storage + async fn read_partial_keypair(&mut self) -> Option; + + /// Read bitfield header. + async fn read_bitfield(&mut self) -> Result>; +} + /// The types of stores that can be created. #[derive(Debug)] pub enum Store { @@ -58,13 +155,13 @@ where impl Storage where - T: RandomAccess> + Debug + Send, + T: RandomAccess> + Debug + Send + 'static, { /// Create a new instance. Takes a keypair and a callback to create new /// storage instances. // Named `.open()` in the JS version. Replaces the `.openKey()` method too by // requiring a key pair to be initialized before creating a new instance. - pub async fn new(create: Cb, overwrite: bool) -> Result + pub async fn new(create: Cb, overwrite: bool) -> Result> where Cb: Fn(Store) -> std::pin::Pin> + Send>>, { @@ -108,12 +205,18 @@ where .map_err(|e| anyhow!(e))?; } - Ok(instance) + Ok(Box::new(instance)) } +} +#[async_trait] +impl DynStorage for Storage +where + T: RandomAccess> + Debug + Send, +{ /// Write data to the feed. #[inline] - pub async fn write_data(&mut self, offset: u64, data: &[u8]) -> Result<()> { + async fn write_data(&mut self, offset: u64, data: &[u8]) -> Result<()> { self.data.write(offset, &data).await.map_err(|e| anyhow!(e)) } @@ -125,7 +228,7 @@ where /// with mafintosh). /// TODO: Ensure the signature size is correct. /// NOTE: Should we create a `Data` entry type? - pub async fn put_data(&mut self, index: u64, data: &[u8], nodes: &[Node]) -> Result<()> { + async fn put_data(&mut self, index: u64, data: &[u8], nodes: &[Node]) -> Result<()> { if data.is_empty() { return Ok(()); } @@ -147,7 +250,7 @@ where /// unencrypted, so there's no decryption needed. // FIXME: data_offset always reads out index 0, length 0 #[inline] - pub async fn get_data(&mut self, index: u64) -> Result> { + async fn get_data(&mut self, index: u64) -> Result> { let cached_nodes = Vec::new(); // TODO: reuse allocation. let range = self.data_offset(index, &cached_nodes).await?; self.data @@ -157,7 +260,7 @@ where } /// Search the signature stores for a `Signature`, starting at `index`. - pub fn next_signature<'a>( + fn next_signature<'a>( &'a mut self, index: u64, ) -> futures::future::BoxFuture<'a, Result> { @@ -180,7 +283,7 @@ where /// Get a `Signature` from the store. #[inline] - pub async fn get_signature(&mut self, index: u64) -> Result { + async fn get_signature(&mut self, index: u64) -> Result { let bytes = self .signatures .read(HEADER_OFFSET + 64 * index, 64) @@ -194,11 +297,7 @@ where /// TODO: Ensure the signature size is correct. /// NOTE: Should we create a `Signature` entry type? #[inline] - pub async fn put_signature( - &mut self, - index: u64, - signature: impl Borrow, - ) -> Result<()> { + async fn put_signature(&mut self, index: u64, signature: &Signature) -> Result<()> { let signature = signature.borrow(); self.signatures .write(HEADER_OFFSET + 64 * index, &signature.to_bytes()) @@ -211,7 +310,7 @@ where /// /// ## Panics /// A panic can occur if no maximum value is found. - pub async fn data_offset(&mut self, index: u64, cached_nodes: &[Node]) -> Result> { + async fn data_offset(&mut self, index: u64, cached_nodes: &[Node]) -> Result> { let mut roots = Vec::new(); // TODO: reuse alloc flat::full_roots(tree_index(index), &mut roots); @@ -258,7 +357,7 @@ where /// Get a `Node` from the `tree` storage. #[inline] - pub async fn get_node(&mut self, index: u64) -> Result { + async fn get_node(&mut self, index: u64) -> Result { let buf = self .tree .read(HEADER_OFFSET + 40 * index, 40) @@ -272,7 +371,7 @@ where /// TODO: prevent extra allocs here. Implement a method on node that can reuse /// a buffer. #[inline] - pub async fn put_node(&mut self, node: &Node) -> Result<()> { + async fn put_node(&mut self, node: &Node) -> Result<()> { let index = node.index(); let buf = node.to_bytes()?; self.tree @@ -285,7 +384,7 @@ where /// TODO: Ensure the chunk size is correct. /// NOTE: Should we create a bitfield entry type? #[inline] - pub async fn put_bitfield(&mut self, offset: u64, data: &[u8]) -> Result<()> { + async fn put_bitfield(&mut self, offset: u64, data: &[u8]) -> Result<()> { self.bitfield .write(HEADER_OFFSET + offset, data) .await @@ -293,7 +392,7 @@ where } /// Read bitfield header. - pub async fn read_bitfield(&mut self) -> Result> { + async fn read_bitfield(&mut self) -> Result> { let buf = self .bitfield .read(0, 32) @@ -321,7 +420,7 @@ where } /// Read a public key from storage - pub async fn read_public_key(&mut self) -> Result { + async fn read_public_key(&mut self) -> Result { let buf = self .keypair .read(0, PUBLIC_KEY_LENGTH as u64) @@ -332,7 +431,7 @@ where } /// Read a secret key from storage - pub async fn read_secret_key(&mut self) -> Result { + async fn read_secret_key(&mut self) -> Result { let buf = self .keypair .read(PUBLIC_KEY_LENGTH as u64, SECRET_KEY_LENGTH as u64) @@ -343,13 +442,13 @@ where } /// Write a public key to the storage - pub async fn write_public_key(&mut self, public_key: &PublicKey) -> Result<()> { + async fn write_public_key(&mut self, public_key: &PublicKey) -> Result<()> { let buf: [u8; PUBLIC_KEY_LENGTH] = public_key.to_bytes(); self.keypair.write(0, &buf).await.map_err(|e| anyhow!(e)) } /// Write a secret key to the storage - pub async fn write_secret_key(&mut self, secret_key: &SecretKey) -> Result<()> { + async fn write_secret_key(&mut self, secret_key: &SecretKey) -> Result<()> { let buf: [u8; SECRET_KEY_LENGTH] = secret_key.to_bytes(); self.keypair .write(PUBLIC_KEY_LENGTH as u64, &buf) @@ -358,7 +457,7 @@ where } /// Tries to read a partial keypair (ie: with an optional secret_key) from the storage - pub async fn read_partial_keypair(&mut self) -> Option { + async fn read_partial_keypair(&mut self) -> Option { match self.read_public_key().await { Ok(public) => match self.read_secret_key().await { Ok(secret) => Some(PartialKeypair { @@ -375,31 +474,6 @@ where } } -impl Storage { - /// Create a new instance backed by a `RandomAccessMemory` instance. - pub async fn new_memory() -> Result { - let create = |_| async { Ok(RandomAccessMemory::default()) }.boxed(); - Ok(Self::new(create, true).await?) - } -} - -impl Storage { - /// Create a new instance backed by a `RandomAccessDisk` instance. - pub async fn new_disk(dir: &PathBuf, overwrite: bool) -> Result { - let storage = |storage: Store| { - let name = match storage { - Store::Tree => "tree", - Store::Data => "data", - Store::Bitfield => "bitfield", - Store::Signatures => "signatures", - Store::Keypair => "key", - }; - RandomAccessDisk::open(dir.as_path().join(name)).boxed() - }; - Ok(Self::new(storage, overwrite).await?) - } -} - /// Get a node from a vector of nodes. #[inline] fn find_node(nodes: &[Node], index: u64) -> Option<&Node> { diff --git a/src/storage/persist.rs b/src/storage/persist.rs index 70a3ec0..b514d15 100644 --- a/src/storage/persist.rs +++ b/src/storage/persist.rs @@ -1,4 +1,4 @@ -use super::Storage; +use super::BoxStorage; use anyhow::Result; use random_access_storage::RandomAccess; use std::fmt::Debug; @@ -15,5 +15,5 @@ where fn to_vec(&self) -> Result>; /// Persist into a storage backend. - fn store(&self, index: u64, store: Storage) -> Result<()>; + fn store(&self, index: u64, store: BoxStorage) -> Result<()>; } diff --git a/tests/feed.rs b/tests/feed.rs index 3bb1828..2709e5f 100644 --- a/tests/feed.rs +++ b/tests/feed.rs @@ -3,7 +3,7 @@ extern crate random_access_memory as ram; mod common; use common::create_feed; -use hypercore::{generate_keypair, Feed, NodeTrait, PublicKey, SecretKey, Storage}; +use hypercore::{generate_keypair, storage_disk, Feed, NodeTrait, PublicKey, SecretKey, Storage}; use random_access_storage::RandomAccess; use std::env::temp_dir; use std::fmt::Debug; @@ -264,7 +264,7 @@ async fn audit() { async fn audit_bad_data() { let mut dir = temp_dir(); dir.push("audit_bad_data"); - let storage = Storage::new_disk(&dir, false).await.unwrap(); + let storage = storage_disk(&dir, false).await.unwrap(); let mut feed = Feed::with_storage(storage).await.unwrap(); feed.append(b"hello").await.unwrap(); feed.append(b"world").await.unwrap(); From ca0c64fcbe2c4f9ea0c4f3c077eef52c79e9aea2 Mon Sep 17 00:00:00 2001 From: Bruno Tavares Date: Sun, 17 May 2020 15:31:47 -0300 Subject: [PATCH 2/8] Expose internal functions to create storages, and make tests pass --- benches/bench.rs | 9 +++------ src/lib.rs | 4 +++- src/storage/mod.rs | 1 + tests/common/mod.rs | 2 +- tests/compat.rs | 4 ++-- tests/feed.rs | 21 +++++++++------------ tests/storage.rs | 10 +++++----- 7 files changed, 24 insertions(+), 27 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index 5c113a8..b3ac1b1 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -7,12 +7,9 @@ use test::Bencher; use hypercore::{Feed, Storage}; -async fn create_feed(page_size: usize) -> Result, Error> { - let storage = Storage::new( - |_| Box::pin(async move { Ok(RandomAccessMemory::new(page_size)) }), - true, - ) - .await?; +async fn create_feed(page_size: usize) -> Result { + let storage = + Storage::new(|_| Box::pin(async move { Ok(RandomAccessMemory::new(page_size)) })).await?; Feed::with_storage(storage).await } diff --git a/src/lib.rs b/src/lib.rs index b70156c..73259a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,7 +48,9 @@ pub use crate::feed::Feed; pub use crate::feed_builder::FeedBuilder; pub use crate::proof::Proof; pub use crate::replicate::Peer; -pub use crate::storage::{storage_disk, storage_memory, Node, NodeTrait, Storage, Store}; +pub use crate::storage::{ + storage_disk, storage_memory, BoxStorage, Node, NodeTrait, Storage, Store, +}; pub use ed25519_dalek::{PublicKey, SecretKey}; use std::path::Path; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index bb0c93d..84ee802 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -29,6 +29,7 @@ pub struct PartialKeypair { pub secret: Option, } +/// Dynamic-dispatch Storage wrapper pub type BoxStorage = Box; /// Create a new instance backed by a `RandomAccessMemory` instance. diff --git a/tests/common/mod.rs b/tests/common/mod.rs index fa43ddb..2e7df3a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -5,7 +5,7 @@ use futures::future::FutureExt; use hypercore::{Feed, Storage, Store}; use random_access_memory as ram; -pub async fn create_feed(page_size: usize) -> Result, Error> { +pub async fn create_feed(page_size: usize) -> Result { let create = |_store: Store| async move { Ok(ram::RandomAccessMemory::new(page_size)) }.boxed(); let storage = Storage::new(create, false).await?; Feed::with_storage(storage).await diff --git a/tests/compat.rs b/tests/compat.rs index 59698ad..ee6ef14 100644 --- a/tests/compat.rs +++ b/tests/compat.rs @@ -11,7 +11,7 @@ use std::path::{Path, PathBuf}; use data_encoding::HEXLOWER; use ed25519_dalek::{Keypair, Signature}; use hypercore::Feed; -use hypercore::{Storage, Store}; +use hypercore::{Storage, Store, BoxStorage}; use random_access_disk::RandomAccessDisk; use remove_dir_all::remove_dir_all; @@ -145,7 +145,7 @@ fn storage_path>(dir: P, s: Store) -> PathBuf { dir.as_ref().join(filename) } -async fn mk_storage() -> (PathBuf, Storage) { +async fn mk_storage() -> (PathBuf, BoxStorage) { let temp_dir = tempfile::tempdir().unwrap(); let dir = temp_dir.into_path(); let storage = Storage::new( diff --git a/tests/feed.rs b/tests/feed.rs index 2709e5f..c6ea54c 100644 --- a/tests/feed.rs +++ b/tests/feed.rs @@ -3,17 +3,16 @@ extern crate random_access_memory as ram; mod common; use common::create_feed; -use hypercore::{generate_keypair, storage_disk, Feed, NodeTrait, PublicKey, SecretKey, Storage}; -use random_access_storage::RandomAccess; +use hypercore::{generate_keypair, Feed, NodeTrait, PublicKey, SecretKey, Storage}; +use hypercore::{storage_disk, storage_memory}; use std::env::temp_dir; -use std::fmt::Debug; use std::fs; use std::io::Write; #[async_std::test] async fn create_with_key() { let keypair = generate_keypair(); - let storage = Storage::new_memory().await.unwrap(); + let storage = storage_memory().await.unwrap(); let _feed = Feed::builder(keypair.public, storage) .secret_key(keypair.secret) .build() @@ -164,7 +163,7 @@ async fn put_with_data() { // Create a second feed with the first feed's key. let (public, secret) = copy_keys(&a); - let storage = Storage::new_memory().await.unwrap(); + let storage = storage_memory().await.unwrap(); let mut b = Feed::builder(public, storage) .secret_key(secret) .build() @@ -197,7 +196,7 @@ async fn put_with_data() { #[async_std::test] async fn create_with_storage() { - let storage = Storage::new_memory().await.unwrap(); + let storage = storage_memory().await.unwrap(); assert!( Feed::with_storage(storage).await.is_ok(), "Could not create a feed with a storage." @@ -206,7 +205,7 @@ async fn create_with_storage() { #[async_std::test] async fn create_with_stored_public_key() { - let mut storage = Storage::new_memory().await.unwrap(); + let mut storage = storage_memory().await.unwrap(); let keypair = generate_keypair(); storage.write_public_key(&keypair.public).await.unwrap(); assert!( @@ -217,7 +216,7 @@ async fn create_with_stored_public_key() { #[async_std::test] async fn create_with_stored_keys() { - let mut storage = Storage::new_memory().await.unwrap(); + let mut storage = storage_memory().await.unwrap(); let keypair = generate_keypair(); storage.write_public_key(&keypair.public).await.unwrap(); storage.write_secret_key(&keypair.secret).await.unwrap(); @@ -227,9 +226,7 @@ async fn create_with_stored_keys() { ); } -fn copy_keys( - feed: &Feed> + Debug + Send>, -) -> (PublicKey, SecretKey) { +fn copy_keys(feed: &Feed) -> (PublicKey, SecretKey) { match &feed.secret_key() { Some(secret) => { let secret = secret.to_bytes(); @@ -264,7 +261,7 @@ async fn audit() { async fn audit_bad_data() { let mut dir = temp_dir(); dir.push("audit_bad_data"); - let storage = storage_disk(&dir, false).await.unwrap(); + let storage = storage_disk(&dir).await.unwrap(); let mut feed = Feed::with_storage(storage).await.unwrap(); feed.append(b"hello").await.unwrap(); feed.append(b"world").await.unwrap(); diff --git a/tests/storage.rs b/tests/storage.rs index d540b31..c8be8cc 100644 --- a/tests/storage.rs +++ b/tests/storage.rs @@ -1,5 +1,5 @@ use ed25519_dalek::PublicKey; -use hypercore::{generate_keypair, sign, verify, Signature, Storage}; +use hypercore::{generate_keypair, sign, storage_memory, verify, Signature}; #[async_std::test] async fn should_write_and_read_keypair() { @@ -8,7 +8,7 @@ async fn should_write_and_read_keypair() { // prepare a signature let sig: Signature = sign(&keypair.public, &keypair.secret, msg); - let mut storage = Storage::new_memory().await.unwrap(); + let mut storage = storage_memory().await.unwrap(); assert!( storage.write_secret_key(&keypair.secret).await.is_ok(), "Can not store secret key." @@ -27,7 +27,7 @@ async fn should_write_and_read_keypair() { #[async_std::test] async fn should_read_partial_keypair() { let keypair = generate_keypair(); - let mut storage = Storage::new_memory().await.unwrap(); + let mut storage = storage_memory().await.unwrap(); assert!( storage.write_public_key(&keypair.public).await.is_ok(), "Can not store public key." @@ -39,13 +39,13 @@ async fn should_read_partial_keypair() { #[async_std::test] async fn should_read_no_keypair() { - let mut storage = Storage::new_memory().await.unwrap(); + let mut storage = storage_memory().await.unwrap(); let partial = storage.read_partial_keypair().await; assert!(partial.is_none(), "A key is present"); } #[async_std::test] async fn should_read_empty_public_key() { - let mut storage = Storage::new_memory().await.unwrap(); + let mut storage = storage_memory().await.unwrap(); assert!(storage.read_public_key().await.is_err()); } From 272da27d8d294dd9be8daea8942f109cd284375c Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 20 Jul 2020 22:54:31 +0200 Subject: [PATCH 3/8] Make dyn storage Send --- src/storage/mod.rs | 8 ++++---- src/storage/node.rs | 7 +++++++ tests/feed.rs | 35 ++++++++++++++++++++++++++++++++++- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 84ee802..52b3a7c 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -30,16 +30,16 @@ pub struct PartialKeypair { } /// Dynamic-dispatch Storage wrapper -pub type BoxStorage = Box; +pub type BoxStorage = Box; /// Create a new instance backed by a `RandomAccessMemory` instance. -pub async fn storage_memory() -> Result> { +pub async fn storage_memory() -> Result> { let create = |_| async { Ok(RandomAccessMemory::default()) }.boxed(); Ok(Storage::new(create, false).await?) } /// Create a new instance backed by a `RandomAccessDisk` instance. -pub async fn storage_disk(dir: &PathBuf) -> Result> { +pub async fn storage_disk(dir: &PathBuf) -> Result> { let storage = |storage: Store| { let name = match storage { Store::Tree => "tree", @@ -162,7 +162,7 @@ where /// storage instances. // Named `.open()` in the JS version. Replaces the `.openKey()` method too by // requiring a key pair to be initialized before creating a new instance. - pub async fn new(create: Cb, overwrite: bool) -> Result> + pub async fn new(create: Cb, overwrite: bool) -> Result> where Cb: Fn(Store) -> std::pin::Pin> + Send>>, { diff --git a/src/storage/node.rs b/src/storage/node.rs index ba1fc41..8d1c2dd 100644 --- a/src/storage/node.rs +++ b/src/storage/node.rs @@ -70,6 +70,13 @@ impl Node { writer.write_u64::(self.length as u64)?; Ok(writer) } + + // Write into a provided buffer. + // pub fn write(&self, buf: &mut [u8]) -> Result<()> { + // buf[0..32].copy_from_slice(&self.hash); + // (&mut buf[32..]).write_u64::(self.length as u64)?; + // Ok(()) + // } } impl NodeTrait for Node { diff --git a/tests/feed.rs b/tests/feed.rs index c6ea54c..3f1fab0 100644 --- a/tests/feed.rs +++ b/tests/feed.rs @@ -3,7 +3,8 @@ extern crate random_access_memory as ram; mod common; use common::create_feed; -use hypercore::{generate_keypair, Feed, NodeTrait, PublicKey, SecretKey, Storage}; +use hypercore::{generate_keypair, Event, Feed, NodeTrait, PublicKey, SecretKey, Storage}; +use futures::stream::StreamExt; use hypercore::{storage_disk, storage_memory}; use std::env::temp_dir; use std::fs; @@ -27,6 +28,31 @@ async fn display() { assert_eq!(output.len(), 61); } +#[async_std::test] +async fn task_send() { + use async_std::sync::{Arc, Mutex}; + use async_std::task; + let mut feed = create_feed(50).await.unwrap(); + feed.append(b"hello").await.unwrap(); + let feed_arc = Arc::new(Mutex::new(feed)); + let feed = feed_arc.clone(); + task::spawn(async move { + feed.lock().await.append(b"world").await.unwrap(); + }) + .await; + let feed = feed_arc.clone(); + let t1 = task::spawn(async move { + let value = feed.lock().await.get(0).await.unwrap(); + assert_eq!(value, Some(b"hello".to_vec())); + }); + let feed = feed_arc.clone(); + let t2 = task::spawn(async move { + let value = feed.lock().await.get(1).await.unwrap(); + assert_eq!(value, Some(b"world".to_vec())); + }); + futures::future::join_all(vec![t1, t2]).await; +} + #[async_std::test] /// Verify `.append()` and `.get()` work. async fn set_get() { @@ -162,6 +188,7 @@ async fn put_with_data() { let mut a = create_feed(50).await.unwrap(); // Create a second feed with the first feed's key. +<<<<<<< HEAD let (public, secret) = copy_keys(&a); let storage = storage_memory().await.unwrap(); let mut b = Feed::builder(public, storage) @@ -169,6 +196,9 @@ async fn put_with_data() { .build() .await .unwrap(); +======= + let mut b = create_clone(&a).await.unwrap(); +>>>>>>> c79679a... Make dyn storage Send // Append 4 blocks of data to the writable feed. a.append(b"hi").await.unwrap(); @@ -226,6 +256,7 @@ async fn create_with_stored_keys() { ); } +<<<<<<< HEAD fn copy_keys(feed: &Feed) -> (PublicKey, SecretKey) { match &feed.secret_key() { Some(secret) => { @@ -241,6 +272,8 @@ fn copy_keys(feed: &Feed) -> (PublicKey, SecretKey) { } } +======= +>>>>>>> c79679a... Make dyn storage Send #[async_std::test] async fn audit() { let mut feed = create_feed(50).await.unwrap(); From 458b1310171a533bfab7afa8f003f00fb76f2b9b Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 20 Jul 2020 23:47:36 +0200 Subject: [PATCH 4/8] fix git foo --- tests/feed.rs | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/tests/feed.rs b/tests/feed.rs index 3f1fab0..3a9eb5a 100644 --- a/tests/feed.rs +++ b/tests/feed.rs @@ -3,8 +3,7 @@ extern crate random_access_memory as ram; mod common; use common::create_feed; -use hypercore::{generate_keypair, Event, Feed, NodeTrait, PublicKey, SecretKey, Storage}; -use futures::stream::StreamExt; +use hypercore::{generate_keypair, Feed, NodeTrait, PublicKey, SecretKey, Storage}; use hypercore::{storage_disk, storage_memory}; use std::env::temp_dir; use std::fs; @@ -188,17 +187,7 @@ async fn put_with_data() { let mut a = create_feed(50).await.unwrap(); // Create a second feed with the first feed's key. -<<<<<<< HEAD - let (public, secret) = copy_keys(&a); - let storage = storage_memory().await.unwrap(); - let mut b = Feed::builder(public, storage) - .secret_key(secret) - .build() - .await - .unwrap(); -======= let mut b = create_clone(&a).await.unwrap(); ->>>>>>> c79679a... Make dyn storage Send // Append 4 blocks of data to the writable feed. a.append(b"hi").await.unwrap(); @@ -256,7 +245,6 @@ async fn create_with_stored_keys() { ); } -<<<<<<< HEAD fn copy_keys(feed: &Feed) -> (PublicKey, SecretKey) { match &feed.secret_key() { Some(secret) => { @@ -272,8 +260,6 @@ fn copy_keys(feed: &Feed) -> (PublicKey, SecretKey) { } } -======= ->>>>>>> c79679a... Make dyn storage Send #[async_std::test] async fn audit() { let mut feed = create_feed(50).await.unwrap(); @@ -368,3 +354,13 @@ async fn try_open_file_as_dir() { panic!("Opening path that points to a file must result in error"); } } + +async fn create_clone(feed: &Feed) -> Result { + let (public, secret) = copy_keys(&feed); + let storage = storage_memory().await?; + let clone = Feed::builder(public, storage) + .secret_key(secret) + .build() + .await?; + Ok(clone) +} From 1217147850de7fd538a77883cf7aa7f4a3265533 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 18 May 2020 18:04:09 +0200 Subject: [PATCH 5/8] Emit Download and Append events --- src/event.rs | 9 +++-- src/feed.rs | 24 ++++++++++++- src/feed_builder.rs | 1 + tests/feed.rs | 82 ++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 112 insertions(+), 4 deletions(-) diff --git a/src/event.rs b/src/event.rs index 37d072b..a9f0557 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,3 +1,8 @@ -/// Events emitted. +/// An event emitted by a Feed. #[derive(Debug, Clone, PartialEq)] -pub enum Event {} +pub enum Event { + /// A new block has been appended. + Append, + /// A new block has been downloaded. + Download(u64), +} diff --git a/src/feed.rs b/src/feed.rs index 6d44a0a..6b295fc 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -1,5 +1,6 @@ //! Hypercore's main abstraction. Exposes an append-only, secure log structure. +use crate::event::Event; use crate::feed_builder::FeedBuilder; use crate::replicate::{Message, Peer}; pub use crate::storage::{ @@ -14,6 +15,10 @@ use crate::crypto::{ use crate::proof::Proof; use anyhow::{bail, ensure, Result}; use flat_tree as flat; +use futures::channel::mpsc::{ + unbounded as channel, UnboundedReceiver as Receiver, UnboundedSender as Sender, +}; +use futures::sink::SinkExt; use pretty_hash::fmt as pretty_fmt; use tree_index::TreeIndex; @@ -67,6 +72,7 @@ pub struct Feed { pub(crate) bitfield: Bitfield, pub(crate) tree: TreeIndex, pub(crate) peers: Vec, + pub(crate) subscribers: Vec>, } impl Feed { @@ -181,9 +187,25 @@ impl Feed { let bytes = self.bitfield.to_bytes(&self.tree)?; self.storage.put_bitfield(0, &bytes).await?; + self.emit(Event::Append).await; + Ok(()) } + /// Subscribe to events emitted by this feed. + pub fn subscribe(&mut self) -> Receiver { + let (send, recv) = channel(); + self.subscribers.push(send); + recv + } + + /// Emit an event on the feed. + async fn emit(&self, event: Event) { + for mut sender in self.subscribers.iter() { + sender.send(event.clone()).await.unwrap(); + } + } + /// Get the block of data at the tip of the feed. This will be the most /// recently appended block. #[inline] @@ -416,7 +438,7 @@ impl Feed { if let Some(_data) = data { if self.bitfield.set(index, true).is_changed() { - // TODO: emit "download" event + self.emit(Event::Download(index)).await; } // TODO: check peers.length, call ._announce if peers exist. } diff --git a/src/feed_builder.rs b/src/feed_builder.rs index 93bda72..ad881f8 100644 --- a/src/feed_builder.rs +++ b/src/feed_builder.rs @@ -77,6 +77,7 @@ impl FeedBuilder { secret_key: self.secret_key, storage: self.storage, peers: vec![], + subscribers: vec![], }) } } diff --git a/tests/feed.rs b/tests/feed.rs index 3a9eb5a..a5f9383 100644 --- a/tests/feed.rs +++ b/tests/feed.rs @@ -5,6 +5,7 @@ mod common; use common::create_feed; use hypercore::{generate_keypair, Feed, NodeTrait, PublicKey, SecretKey, Storage}; use hypercore::{storage_disk, storage_memory}; +use futures::stream::StreamExt; use std::env::temp_dir; use std::fs; use std::io::Write; @@ -185,7 +186,6 @@ async fn put() { async fn put_with_data() { // Create a writable feed. let mut a = create_feed(50).await.unwrap(); - // Create a second feed with the first feed's key. let mut b = create_clone(&a).await.unwrap(); @@ -364,3 +364,83 @@ async fn create_clone(feed: &Feed) -> Result { .await?; Ok(clone) } + +async fn events_append() { + let mut feed = create_feed(50).await.unwrap(); + let event_task = collect_events(&mut feed, 3); + feed.append(br#"one"#).await.unwrap(); + feed.append(br#"two"#).await.unwrap(); + feed.append(br#"three"#).await.unwrap(); + + let event_list = event_task.await; + let mut expected = vec![]; + for _i in 0..3 { + expected.push(Event::Append); + } + assert_eq!(event_list, expected, "Correct events emitted") +} + +#[async_std::test] +async fn events_download() { + let mut a = create_feed(50).await.unwrap(); + // Create a second feed with the first feed's key. + let mut b = create_clone(&a).await.unwrap(); + + let event_task = collect_events(&mut b, 3); + + a.append(b"one").await.unwrap(); + a.append(b"two").await.unwrap(); + a.append(b"three").await.unwrap(); + + for i in 0..3 { + let a_proof = a.proof(i, false).await.unwrap(); + let a_data = a.get(i).await.unwrap(); + b.put(i, a_data.as_deref(), a_proof).await.unwrap(); + } + + let event_list = event_task.await; + + let mut expected = vec![]; + for i in 0..3 { + expected.push(Event::Download(i)); + } + assert_eq!(event_list, expected, "Correct events emitted") +} + +fn copy_keys( + feed: &Feed> + Debug + Send>, +) -> (PublicKey, SecretKey) { + match &feed.secret_key() { + Some(secret) => { + let secret = secret.to_bytes(); + let public = &feed.public_key().to_bytes(); + + let public = PublicKey::from_bytes(public).unwrap(); + let secret = SecretKey::from_bytes(&secret).unwrap(); + + (public, secret) + } + _ => panic!(": Could not access secret key"), + } +} + +fn collect_events( + feed: &mut Feed< + impl RandomAccess> + Debug + Send, + >, + n: usize, +) -> async_std::task::JoinHandle> { + let mut events = feed.subscribe(); + let event_task = async_std::task::spawn(async move { + let mut event_list = vec![]; + while let Some(event) = events.next().await { + event_list.push(event); + if event_list.len() == n { + return event_list; + } + } + event_list + }); + event_task +} +>>>>>>> c7e774c... Emit Download and Append events From 6d5f1dd699a9fdffd6ca516d5cd308934f2561e9 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 19 May 2020 12:24:23 +0200 Subject: [PATCH 6/8] Mark Event enum non_exhaustive ... so that addding new events is not a breaking change. --- src/event.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/event.rs b/src/event.rs index a9f0557..6e97653 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,5 +1,6 @@ /// An event emitted by a Feed. #[derive(Debug, Clone, PartialEq)] +#[non_exhaustive] pub enum Event { /// A new block has been appended. Append, From 99a846d4136399660a9745c049662eaa08dd7a68 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 19 May 2020 16:59:08 +0200 Subject: [PATCH 7/8] Make emit take &mut self Without this, some examples fail. Not sure exactly why. --- src/feed.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/feed.rs b/src/feed.rs index 6b295fc..e22debe 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -200,7 +200,7 @@ impl Feed { } /// Emit an event on the feed. - async fn emit(&self, event: Event) { + async fn emit(&mut self, event: Event) { for mut sender in self.subscribers.iter() { sender.send(event.clone()).await.unwrap(); } From a844f465ba91686470614b6464d64e799228f8e8 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 20 Jul 2020 23:42:16 +0200 Subject: [PATCH 8/8] fix git foo --- tests/feed.rs | 52 +++++++++++++++------------------------------------ 1 file changed, 15 insertions(+), 37 deletions(-) diff --git a/tests/feed.rs b/tests/feed.rs index a5f9383..7a9cdfc 100644 --- a/tests/feed.rs +++ b/tests/feed.rs @@ -3,9 +3,9 @@ extern crate random_access_memory as ram; mod common; use common::create_feed; -use hypercore::{generate_keypair, Feed, NodeTrait, PublicKey, SecretKey, Storage}; -use hypercore::{storage_disk, storage_memory}; use futures::stream::StreamExt; +use hypercore::{generate_keypair, Event, Feed, NodeTrait, PublicKey, SecretKey, Storage}; +use hypercore::{storage_disk, storage_memory}; use std::env::temp_dir; use std::fs; use std::io::Write; @@ -245,21 +245,6 @@ async fn create_with_stored_keys() { ); } -fn copy_keys(feed: &Feed) -> (PublicKey, SecretKey) { - match &feed.secret_key() { - Some(secret) => { - let secret = secret.to_bytes(); - let public = &feed.public_key().to_bytes(); - - let public = PublicKey::from_bytes(public).unwrap(); - let secret = SecretKey::from_bytes(&secret).unwrap(); - - (public, secret) - } - _ => panic!(": Could not access secret key"), - } -} - #[async_std::test] async fn audit() { let mut feed = create_feed(50).await.unwrap(); @@ -355,16 +340,7 @@ async fn try_open_file_as_dir() { } } -async fn create_clone(feed: &Feed) -> Result { - let (public, secret) = copy_keys(&feed); - let storage = storage_memory().await?; - let clone = Feed::builder(public, storage) - .secret_key(secret) - .build() - .await?; - Ok(clone) -} - +#[async_std::test] async fn events_append() { let mut feed = create_feed(50).await.unwrap(); let event_task = collect_events(&mut feed, 3); @@ -407,9 +383,17 @@ async fn events_download() { assert_eq!(event_list, expected, "Correct events emitted") } -fn copy_keys( - feed: &Feed> + Debug + Send>, -) -> (PublicKey, SecretKey) { +async fn create_clone(feed: &Feed) -> Result { + let (public, secret) = copy_keys(&feed); + let storage = storage_memory().await?; + let clone = Feed::builder(public, storage) + .secret_key(secret) + .build() + .await?; + Ok(clone) +} + +fn copy_keys(feed: &Feed) -> (PublicKey, SecretKey) { match &feed.secret_key() { Some(secret) => { let secret = secret.to_bytes(); @@ -424,12 +408,7 @@ fn copy_keys( } } -fn collect_events( - feed: &mut Feed< - impl RandomAccess> + Debug + Send, - >, - n: usize, -) -> async_std::task::JoinHandle> { +fn collect_events(feed: &mut Feed, n: usize) -> async_std::task::JoinHandle> { let mut events = feed.subscribe(); let event_task = async_std::task::spawn(async move { let mut event_list = vec![]; @@ -443,4 +422,3 @@ fn collect_events( }); event_task } ->>>>>>> c7e774c... Emit Download and Append events