diff --git a/Cargo.toml b/Cargo.toml index 8739771..b376bea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "async-io" version = "2.5.0" authors = ["Stjepan Glavina "] edition = "2021" -rust-version = "1.63" +rust-version = "1.70" description = "Async I/O and timers" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-io" @@ -26,7 +26,6 @@ name = "timer" harness = false [dependencies] -async-lock = "3.0.0" cfg-if = "1" concurrent-queue = "2.2.0" futures-io = { version = "0.3.28", default-features = false, features = ["std"] } diff --git a/src/driver.rs b/src/driver.rs index d3c3539..73ec4cb 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -1,14 +1,12 @@ use std::cell::{Cell, RefCell}; use std::future::Future; +use std::pin::pin; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::Waker; -use std::task::{Context, Poll}; +use std::sync::{Arc, OnceLock}; +use std::task::{Context, Poll, Waker}; use std::thread; use std::time::{Duration, Instant}; -use async_lock::OnceCell; -use futures_lite::pin; use parking::Parker; use crate::reactor::Reactor; @@ -18,9 +16,9 @@ static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0); /// Unparker for the "async-io" thread. fn unparker() -> &'static parking::Unparker { - static UNPARKER: OnceCell = OnceCell::new(); + static UNPARKER: OnceLock = OnceLock::new(); - UNPARKER.get_or_init_blocking(|| { + UNPARKER.get_or_init(|| { let (parker, unparker) = parking::pair(); // Spawn a helper thread driving the reactor. @@ -197,7 +195,7 @@ pub fn block_on(future: impl Future) -> T { } }; - pin!(future); + let mut future = pin!(future); let cx = &mut Context::from_waker(waker); diff --git a/src/lib.rs b/src/lib.rs index e9213cb..173b569 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,12 +61,12 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] -use std::future::Future; +use std::future::{poll_fn, Future}; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket}; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::Arc; -use std::task::{Context, Poll, Waker}; +use std::task::{ready, Context, Poll, Waker}; use std::time::{Duration, Instant}; #[cfg(unix)] @@ -81,7 +81,6 @@ use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, R use futures_io::{AsyncRead, AsyncWrite}; use futures_lite::stream::{self, Stream}; -use futures_lite::{future, pin, ready}; use rustix::io as rio; use rustix::net as rn; @@ -955,14 +954,14 @@ impl Async { /// /// ```no_run /// use async_io::Async; - /// use futures_lite::future; + /// use std::future::poll_fn; /// use std::net::TcpListener; /// /// # futures_lite::future::block_on(async { /// let mut listener = Async::::bind(([127, 0, 0, 1], 0))?; /// /// // Wait until a client can be accepted. - /// future::poll_fn(|cx| listener.poll_readable(cx)).await?; + /// poll_fn(|cx| listener.poll_readable(cx)).await?; /// # std::io::Result::Ok(()) }); /// ``` pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll> { @@ -986,7 +985,7 @@ impl Async { /// /// ``` /// use async_io::Async; - /// use futures_lite::future; + /// use std::future::poll_fn; /// use std::net::{TcpStream, ToSocketAddrs}; /// /// # futures_lite::future::block_on(async { @@ -994,7 +993,7 @@ impl Async { /// let stream = Async::::connect(addr).await?; /// /// // Wait until the stream is writable. - /// future::poll_fn(|cx| stream.poll_writable(cx)).await?; + /// poll_fn(|cx| stream.poll_writable(cx)).await?; /// # std::io::Result::Ok(()) }); /// ``` pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll> { @@ -1251,7 +1250,7 @@ unsafe impl IoSafe for std::process::ChildStderr {} #[cfg(unix)] unsafe impl IoSafe for std::os::unix::net::UnixStream {} -// PipeReader & PipeWriter require std >= 1.87, our MSRV is 1.63, hence +// PipeReader & PipeWriter require std >= 1.87, our MSRV is 1.70, hence // conditional on cfg()s, generated from build.rs #[cfg(not(async_io_no_pipe))] unsafe impl IoSafe for std::io::PipeReader {} @@ -1472,13 +1471,14 @@ impl Async { /// /// ```no_run /// use async_io::Async; - /// use futures_lite::{pin, stream::StreamExt}; + /// use futures_lite::{stream::StreamExt}; /// use std::net::TcpListener; + /// use std::pin::pin; /// /// # futures_lite::future::block_on(async { /// let listener = Async::::bind(([127, 0, 0, 1], 8000))?; /// let incoming = listener.incoming(); - /// pin!(incoming); + /// let mut incoming = pin!(incoming); /// /// while let Some(stream) = incoming.next().await { /// let stream = stream?; @@ -1808,13 +1808,14 @@ impl Async { /// /// ```no_run /// use async_io::Async; - /// use futures_lite::{pin, stream::StreamExt}; + /// use futures_lite::stream::StreamExt; /// use std::os::unix::net::UnixListener; + /// use std::pin::pin; /// /// # futures_lite::future::block_on(async { /// let listener = Async::::bind("/tmp/socket")?; /// let incoming = listener.incoming(); - /// pin!(incoming); + /// let mut incoming = pin!(incoming); /// /// while let Some(stream) = incoming.next().await { /// let stream = stream?; @@ -2055,9 +2056,9 @@ impl TryFrom for Async>) -> io::Result<()> { let mut polled = false; - pin!(fut); + let mut fut = pin!(fut); - future::poll_fn(|cx| { + poll_fn(|cx| { if !polled { polled = true; fut.as_mut().poll(cx) diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs index 8318363..c796590 100644 --- a/src/os/kqueue.rs +++ b/src/os/kqueue.rs @@ -183,16 +183,16 @@ impl Filter { /// # Examples /// /// ```no_run - /// use std::process::Command; /// use async_io::os::kqueue::{Exit, Filter}; - /// use futures_lite::future; + /// use std::future::poll_fn; + /// use std::process::Command; /// /// # futures_lite::future::block_on(async { /// let child = Command::new("sleep").arg("5").spawn()?; /// let process = Filter::new(Exit::new(child))?; /// /// // Wait for the process to exit. - /// future::poll_fn(|cx| process.poll_ready(cx)).await?; + /// poll_fn(|cx| process.poll_ready(cx)).await?; /// # std::io::Result::Ok(()) }); /// ``` pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { diff --git a/src/os/windows.rs b/src/os/windows.rs index dc270c3..b0a5c5e 100644 --- a/src/os/windows.rs +++ b/src/os/windows.rs @@ -160,16 +160,16 @@ impl Waitable { /// # Examples /// /// ```no_run - /// use std::process::Command; /// use async_io::os::windows::Waitable; - /// use futures_lite::future; + /// use std::future::poll_fn; + /// use std::process::Command; /// /// # futures_lite::future::block_on(async { /// let child = Command::new("sleep").arg("5").spawn()?; /// let process = Waitable::new(child)?; /// /// // Wait for the process to exit. - /// future::poll_fn(|cx| process.poll_ready(cx)).await?; + /// poll_fn(|cx| process.poll_ready(cx)).await?; /// # std::io::Result::Ok(()) }); /// ``` pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { diff --git a/src/reactor.rs b/src/reactor.rs index 95e1b67..b62fd07 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -8,13 +8,11 @@ use std::mem; use std::panic; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; -use std::task::{Context, Poll, Waker}; +use std::sync::{Arc, Mutex, MutexGuard, OnceLock}; +use std::task::{ready, Context, Poll, Waker}; use std::time::{Duration, Instant}; -use async_lock::OnceCell; use concurrent_queue::ConcurrentQueue; -use futures_lite::ready; use polling::{Event, Events, Poller}; use slab::Slab; @@ -93,9 +91,9 @@ pub(crate) struct Reactor { impl Reactor { /// Returns a reference to the reactor. pub(crate) fn get() -> &'static Reactor { - static REACTOR: OnceCell = OnceCell::new(); + static REACTOR: OnceLock = OnceLock::new(); - REACTOR.get_or_init_blocking(|| { + REACTOR.get_or_init(|| { crate::driver::init(); Reactor { poller: Poller::new().expect("cannot initialize I/O event notification"), diff --git a/tests/timer.rs b/tests/timer.rs index cdd90db..c598ed7 100644 --- a/tests/timer.rs +++ b/tests/timer.rs @@ -1,4 +1,4 @@ -use std::future::Future; +use std::future::{poll_fn, Future}; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::thread; @@ -91,7 +91,7 @@ fn set() { } }); - future::poll_fn(|cx| Pin::new(&mut *timer.lock().unwrap()).poll(cx)).await; + poll_fn(|cx| Pin::new(&mut *timer.lock().unwrap()).poll(cx)).await; assert!(start.elapsed() >= Duration::from_secs(2)); assert!(start.elapsed() < Duration::from_secs(10));