Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "async-io"
version = "2.5.0"
authors = ["Stjepan Glavina <[email protected]>"]
edition = "2021"
rust-version = "1.63"
rust-version = "1.70"
description = "Async I/O and timers"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/async-io"
Expand All @@ -26,7 +26,6 @@ name = "timer"
harness = false

[dependencies]
async-lock = "3.0.0"
cfg-if = "1"
concurrent-queue = "2.2.0"
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
Expand Down
14 changes: 6 additions & 8 deletions src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::cell::{Cell, RefCell};
use std::future::Future;
use std::pin::pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Waker;
use std::task::{Context, Poll};
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

use async_lock::OnceCell;
use futures_lite::pin;
use parking::Parker;

use crate::reactor::Reactor;
Expand All @@ -18,9 +16,9 @@ static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);

/// Unparker for the "async-io" thread.
fn unparker() -> &'static parking::Unparker {
static UNPARKER: OnceCell<parking::Unparker> = OnceCell::new();
static UNPARKER: OnceLock<parking::Unparker> = OnceLock::new();

UNPARKER.get_or_init_blocking(|| {
UNPARKER.get_or_init(|| {
let (parker, unparker) = parking::pair();

// Spawn a helper thread driving the reactor.
Expand Down Expand Up @@ -197,7 +195,7 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
}
};

pin!(future);
let mut future = pin!(future);

let cx = &mut Context::from_waker(waker);

Expand Down
31 changes: 16 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]

use std::future::Future;
use std::future::{poll_fn, Future};
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::task::{ready, Context, Poll, Waker};
use std::time::{Duration, Instant};

#[cfg(unix)]
Expand All @@ -81,7 +81,6 @@ use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, R

use futures_io::{AsyncRead, AsyncWrite};
use futures_lite::stream::{self, Stream};
use futures_lite::{future, pin, ready};

use rustix::io as rio;
use rustix::net as rn;
Expand Down Expand Up @@ -955,14 +954,14 @@ impl<T> Async<T> {
///
/// ```no_run
/// use async_io::Async;
/// use futures_lite::future;
/// use std::future::poll_fn;
/// use std::net::TcpListener;
///
/// # futures_lite::future::block_on(async {
/// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
///
/// // Wait until a client can be accepted.
/// future::poll_fn(|cx| listener.poll_readable(cx)).await?;
/// poll_fn(|cx| listener.poll_readable(cx)).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Expand All @@ -986,15 +985,15 @@ impl<T> Async<T> {
///
/// ```
/// use async_io::Async;
/// use futures_lite::future;
/// use std::future::poll_fn;
/// use std::net::{TcpStream, ToSocketAddrs};
///
/// # futures_lite::future::block_on(async {
/// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
/// let stream = Async::<TcpStream>::connect(addr).await?;
///
/// // Wait until the stream is writable.
/// future::poll_fn(|cx| stream.poll_writable(cx)).await?;
/// poll_fn(|cx| stream.poll_writable(cx)).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Expand Down Expand Up @@ -1251,7 +1250,7 @@ unsafe impl IoSafe for std::process::ChildStderr {}
#[cfg(unix)]
unsafe impl IoSafe for std::os::unix::net::UnixStream {}

// PipeReader & PipeWriter require std >= 1.87, our MSRV is 1.63, hence
// PipeReader & PipeWriter require std >= 1.87, our MSRV is 1.70, hence
// conditional on cfg()s, generated from build.rs
#[cfg(not(async_io_no_pipe))]
unsafe impl IoSafe for std::io::PipeReader {}
Expand Down Expand Up @@ -1472,13 +1471,14 @@ impl Async<TcpListener> {
///
/// ```no_run
/// use async_io::Async;
/// use futures_lite::{pin, stream::StreamExt};
/// use futures_lite::{stream::StreamExt};
/// use std::net::TcpListener;
/// use std::pin::pin;
///
/// # futures_lite::future::block_on(async {
/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
/// let incoming = listener.incoming();
/// pin!(incoming);
/// let mut incoming = pin!(incoming);
///
/// while let Some(stream) = incoming.next().await {
/// let stream = stream?;
Expand Down Expand Up @@ -1808,13 +1808,14 @@ impl Async<UnixListener> {
///
/// ```no_run
/// use async_io::Async;
/// use futures_lite::{pin, stream::StreamExt};
/// use futures_lite::stream::StreamExt;
/// use std::os::unix::net::UnixListener;
/// use std::pin::pin;
///
/// # futures_lite::future::block_on(async {
/// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
/// let incoming = listener.incoming();
/// pin!(incoming);
/// let mut incoming = pin!(incoming);
///
/// while let Some(stream) = incoming.next().await {
/// let stream = stream?;
Expand Down Expand Up @@ -2055,9 +2056,9 @@ impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::Uni
/// Polls a future once, waits for a wakeup, and then optimistically assumes the future is ready.
async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
let mut polled = false;
pin!(fut);
let mut fut = pin!(fut);

future::poll_fn(|cx| {
poll_fn(|cx| {
if !polled {
polled = true;
fut.as_mut().poll(cx)
Expand Down
6 changes: 3 additions & 3 deletions src/os/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,16 @@ impl<T> Filter<T> {
/// # Examples
///
/// ```no_run
/// use std::process::Command;
/// use async_io::os::kqueue::{Exit, Filter};
/// use futures_lite::future;
/// use std::future::poll_fn;
/// use std::process::Command;
///
/// # futures_lite::future::block_on(async {
/// let child = Command::new("sleep").arg("5").spawn()?;
/// let process = Filter::new(Exit::new(child))?;
///
/// // Wait for the process to exit.
/// future::poll_fn(|cx| process.poll_ready(cx)).await?;
/// poll_fn(|cx| process.poll_ready(cx)).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
Expand Down
6 changes: 3 additions & 3 deletions src/os/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,16 @@ impl<T> Waitable<T> {
/// # Examples
///
/// ```no_run
/// use std::process::Command;
/// use async_io::os::windows::Waitable;
/// use futures_lite::future;
/// use std::future::poll_fn;
/// use std::process::Command;
///
/// # futures_lite::future::block_on(async {
/// let child = Command::new("sleep").arg("5").spawn()?;
/// let process = Waitable::new(child)?;
///
/// // Wait for the process to exit.
/// future::poll_fn(|cx| process.poll_ready(cx)).await?;
/// poll_fn(|cx| process.poll_ready(cx)).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
Expand Down
10 changes: 4 additions & 6 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ use std::mem;
use std::panic;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, Waker};
use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
use std::task::{ready, Context, Poll, Waker};
use std::time::{Duration, Instant};

use async_lock::OnceCell;
use concurrent_queue::ConcurrentQueue;
use futures_lite::ready;
use polling::{Event, Events, Poller};
use slab::Slab;

Expand Down Expand Up @@ -93,9 +91,9 @@ pub(crate) struct Reactor {
impl Reactor {
/// Returns a reference to the reactor.
pub(crate) fn get() -> &'static Reactor {
static REACTOR: OnceCell<Reactor> = OnceCell::new();
static REACTOR: OnceLock<Reactor> = OnceLock::new();

REACTOR.get_or_init_blocking(|| {
REACTOR.get_or_init(|| {
crate::driver::init();
Reactor {
poller: Poller::new().expect("cannot initialize I/O event notification"),
Expand Down
4 changes: 2 additions & 2 deletions tests/timer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::future::Future;
use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::thread;
Expand Down Expand Up @@ -91,7 +91,7 @@ fn set() {
}
});

future::poll_fn(|cx| Pin::new(&mut *timer.lock().unwrap()).poll(cx)).await;
poll_fn(|cx| Pin::new(&mut *timer.lock().unwrap()).poll(cx)).await;

assert!(start.elapsed() >= Duration::from_secs(2));
assert!(start.elapsed() < Duration::from_secs(10));
Expand Down
Loading