use nix::libc::{c_int, off_t};
use mio::{Evented, Poll, Token, Ready, PollOpt};
use mio::unix::UnixReady;
use nix;
use nix::errno::Errno;
use nix::sys::aio;
use nix::sys::signal::SigevNotify;
use std::borrow::{Borrow, BorrowMut};
use std::cell::{Cell, RefCell};
use std::io;
use std::iter::Iterator;
use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
pub use nix::sys::aio::AioFsyncMode;
pub use nix::sys::aio::LioOpcode;
pub enum BufRef {
None,
BoxedSlice(Box<dyn Borrow<[u8]>>),
BoxedMutSlice(Box<dyn BorrowMut<[u8]>>)
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::len_without_is_empty))]
impl BufRef {
pub fn boxed_slice(&self) -> Option<&dyn Borrow<[u8]>> {
match *self {
BufRef::BoxedSlice(ref x) => Some(x.as_ref()),
_ => None
}
}
pub fn boxed_mut_slice(&mut self) -> Option<&mut dyn BorrowMut<[u8]>> {
match *self {
BufRef::BoxedMutSlice(ref mut x) => Some(x.as_mut()),
_ => None
}
}
pub fn is_none(&self) -> bool {
match *self {
BufRef::None => true,
_ => false,
}
}
pub fn len(&self) -> Option<usize> {
match *self {
BufRef::BoxedSlice(ref x) => Some(x.as_ref().borrow().len()),
BufRef::BoxedMutSlice(ref x) => Some(x.as_ref().borrow().len()),
BufRef::None => None
}
}
}
fn nix_buffer_to_buf_ref(b: aio::Buffer) -> BufRef {
match b {
aio::Buffer::BoxedSlice(x) => BufRef::BoxedSlice(x),
aio::Buffer::BoxedMutSlice(x) => BufRef::BoxedMutSlice(x),
_ => BufRef::None
}
}
pub struct LioResult {
pub buf_ref: BufRef,
pub result: nix::Result<isize>
}
#[derive(Debug)]
pub struct AioCb<'a> {
inner: RefCell<Box<aio::AioCb<'a>>>,
}
impl<'a> AioCb<'a> {
pub fn from_fd(fd: RawFd, prio: c_int) -> AioCb<'a> {
let aiocb = aio::AioCb::from_fd(fd, prio, SigevNotify::SigevNone);
AioCb { inner: RefCell::new(Box::new(aiocb)) }
}
pub fn from_boxed_slice(fd: RawFd, offs: u64, buf: Box<dyn Borrow<[u8]>>,
prio: c_int, opcode: LioOpcode) -> AioCb<'a> {
let aiocb = aio::AioCb::from_boxed_slice(fd, offs as off_t, buf, prio,
SigevNotify::SigevNone, opcode);
AioCb { inner: RefCell::new(Box::new(aiocb)) }
}
pub fn from_boxed_mut_slice(fd: RawFd, offs: u64,
buf: Box<dyn BorrowMut<[u8]>>, prio: c_int,
opcode: LioOpcode) -> AioCb<'a> {
let aiocb = aio::AioCb::from_boxed_mut_slice(fd, offs as off_t, buf,
prio, SigevNotify::SigevNone, opcode);
AioCb { inner: RefCell::new(Box::new(aiocb)) }
}
pub fn from_mut_slice(fd: RawFd, offs: u64, buf: &'a mut [u8],
prio: c_int, opcode: LioOpcode) -> AioCb {
let aiocb = aio::AioCb::from_mut_slice(fd, offs as off_t, buf, prio,
SigevNotify::SigevNone, opcode);
AioCb { inner: RefCell::new(Box::new(aiocb)) }
}
pub fn from_slice(fd: RawFd, offs: u64, buf: &'a [u8],
prio: c_int, opcode: LioOpcode) -> AioCb {
let aiocb = aio::AioCb::from_slice(fd, offs as off_t, buf, prio,
SigevNotify::SigevNone, opcode);
AioCb { inner: RefCell::new(Box::new(aiocb)) }
}
pub fn buf_ref(&mut self) -> BufRef {
nix_buffer_to_buf_ref(self.inner.borrow_mut().buffer())
}
pub fn aio_return(&self) -> nix::Result<isize> {
self.inner.borrow_mut().aio_return()
}
pub fn cancel(&self) -> nix::Result<aio::AioCancelStat> {
self.inner.borrow_mut().cancel()
}
pub fn error(&self) -> nix::Result<()> {
self.inner.borrow_mut().error()
}
pub fn fsync(&self, mode: AioFsyncMode) -> nix::Result<()> {
self.inner.borrow_mut().fsync(mode)
}
pub fn read(&self) -> nix::Result<()> {
self.inner.borrow_mut().read()
}
pub fn write(&self) -> nix::Result<()> {
self.inner.borrow_mut().write()
}
}
impl<'a> Evented for AioCb<'a> {
fn register(&self,
poll: &Poll,
token: Token,
events: Ready,
_: PollOpt) -> io::Result<()> {
assert!(UnixReady::from(events).is_aio());
let udata = usize::from(token);
let kq = poll.as_raw_fd();
let sigev = SigevNotify::SigevKevent{kq, udata: udata as isize};
self.inner.borrow_mut().set_sigev_notify(sigev);
Ok(())
}
fn reregister(&self,
poll: &Poll,
token: Token,
events: Ready,
opts: PollOpt) -> io::Result<()> {
self.register(poll, token, events, opts)
}
fn deregister(&self, _: &Poll) -> io::Result<()> {
let sigev = SigevNotify::SigevNone;
self.inner.borrow_mut().set_sigev_notify(sigev);
Ok(())
}
}
#[derive(Debug)]
pub struct LioCb {
inner: aio::LioCb<'static>,
sev: Cell<SigevNotify>
}
impl<'a> LioCb {
fn fix_submit_error(&mut self, e: nix::Result<()>) -> Result<(), LioError> {
match e {
Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)) |
Err(nix::Error::Sys(nix::errno::Errno::EIO)) |
Err(nix::Error::Sys(nix::errno::Errno::EINTR)) => {
let mut n_error = 0;
let mut n_einprogress = 0;
let mut n_eagain = 0;
let mut n_ok = 0;
let errors = (0..self.inner.aiocbs.len())
.map(|i| {
self.inner.error(i).map_err(|e| e.as_errno().unwrap())
}).collect::<Vec<_>>();
for (i, e) in errors.iter().enumerate() {
match e {
Ok(()) => {
n_ok += 1;
},
Err(Errno::EINPROGRESS) => {
n_einprogress += 1;
},
Err(Errno::EAGAIN) => {
n_eagain += 1;
},
Err(_) => {
let _ = self.inner.aio_return(i);
n_error += 1;
}
}
}
if n_error > 0 {
Err(LioError::EIO(errors))
} else if n_eagain > 0 && n_eagain < self.inner.aiocbs.len() {
Err(LioError::EINCOMPLETE)
} else if n_eagain == self.inner.aiocbs.len() {
Err(LioError::EAGAIN)
} else {
panic!("lio_listio returned EIO for unknown reasons. n_error={}, n_einprogress={}, n_eagain={}, and n_ok={}",
n_error, n_einprogress, n_eagain, n_ok);
}
},
Ok(()) => Ok(()),
_ => panic!("lio_listio returned unhandled error {:?}", e)
}
}
pub fn submit(&mut self) -> Result<(), LioError> {
let e = self.inner.listio(aio::LioMode::LIO_NOWAIT, self.sev.get());
self.fix_submit_error(e)
}
pub fn resubmit(&mut self) -> Result<(), LioError> {
let e = self.inner.listio_resubmit(aio::LioMode::LIO_NOWAIT, self.sev.get());
self.fix_submit_error(e)
}
pub fn emplace_boxed_slice(&mut self, fd: RawFd, offset: u64,
buf: Box<dyn Borrow<[u8]>>, prio: i32, opcode: LioOpcode) {
self.inner.aiocbs.push(aio::AioCb::from_boxed_slice(fd, offset as off_t,
buf, prio as c_int, SigevNotify::SigevNone, opcode))
}
pub fn emplace_boxed_mut_slice(&mut self, fd: RawFd, offset: u64,
buf: Box<dyn BorrowMut<[u8]>>, prio: i32, opcode: LioOpcode) {
self.inner.aiocbs.push(aio::AioCb::from_boxed_mut_slice(fd,
offset as off_t, buf, prio as c_int, SigevNotify::SigevNone,
opcode))
}
pub fn emplace_slice(&mut self, fd: RawFd, offset: u64,
buf: &'static [u8], prio: i32, opcode: LioOpcode) {
let aiocb = aio::AioCb::from_slice(fd, offset as off_t, buf,
prio as c_int, SigevNotify::SigevNone, opcode);
self.inner.aiocbs.push(aiocb);
}
pub fn into_results<F, R>(self, callback: F) -> R
where F: FnOnce(Box<dyn Iterator<Item=LioResult> + 'a>) -> R {
let mut inner = self.inner;
let iter = (0..inner.aiocbs.len()).map(move |i| {
let result = inner.aio_return(i);
let buf_ref = nix_buffer_to_buf_ref(inner.aiocbs[i].buffer());
LioResult{result, buf_ref, }
});
callback(Box::new(iter))
}
pub fn with_capacity(capacity: usize) -> LioCb {
LioCb {
inner: aio::LioCb::with_capacity(capacity),
sev: Cell::new(SigevNotify::SigevNone)
}
}
}
impl Evented for LioCb {
fn register(&self,
poll: &Poll,
token: Token,
events: Ready,
_: PollOpt) -> io::Result<()> {
assert!(UnixReady::from(events).is_lio());
let udata = usize::from(token);
let kq = poll.as_raw_fd();
let sigev = SigevNotify::SigevKevent{kq, udata: udata as isize};
self.sev.set(sigev);
Ok(())
}
fn reregister(&self,
poll: &Poll,
token: Token,
events: Ready,
opts: PollOpt) -> io::Result<()> {
self.register(poll, token, events, opts)
}
fn deregister(&self, _: &Poll) -> io::Result<()> {
let sigev = SigevNotify::SigevNone;
self.sev.set(sigev);
Ok(())
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum LioError {
EAGAIN,
EINCOMPLETE,
EIO(Vec<Result<(), Errno>>)
}
impl LioError {
pub fn into_eio(self) -> Result<Vec<Result<(), Errno>>, Self> {
if let LioError::EIO(eio) = self {
Ok(eio)
} else {
Err(self)
}
}
}