use nix::libc::{c_int, off_t};
use mio::{Evented, Poll, Token, Ready, PollOpt};
use mio::unix::UnixReady;
use nix;
use nix::errno::Errno;
use nix::sys::aio;
use nix::sys::signal::SigevNotify;
use std::cell::{Cell, RefCell};
use std::io;
use std::iter::Iterator;
use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
use std::pin::Pin;
pub use nix::sys::aio::AioFsyncMode;
pub use nix::sys::aio::LioOpcode;
pub struct LioResult {
pub result: nix::Result<isize>
}
#[derive(Debug)]
pub struct AioCb<'a> {
inner: RefCell<Pin<Box<aio::AioCb<'a>>>>,
}
impl<'a> AioCb<'a> {
pub fn from_fd(fd: RawFd, prio: c_int) -> AioCb<'a> {
let aiocb = aio::AioCb::from_fd(fd, prio, SigevNotify::SigevNone);
AioCb { inner: RefCell::new(aiocb) }
}
pub fn from_mut_slice(fd: RawFd, offs: u64, buf: &'a mut [u8],
prio: c_int, opcode: LioOpcode) -> AioCb {
let aiocb = aio::AioCb::from_mut_slice(fd, offs as off_t, buf, prio,
SigevNotify::SigevNone, opcode);
AioCb { inner: RefCell::new(aiocb) }
}
pub fn from_slice(fd: RawFd, offs: u64, buf: &'a [u8],
prio: c_int, opcode: LioOpcode) -> AioCb {
let aiocb = aio::AioCb::from_slice(fd, offs as off_t, buf, prio,
SigevNotify::SigevNone, opcode);
AioCb { inner: RefCell::new(aiocb) }
}
pub fn aio_return(&self) -> nix::Result<isize> {
self.inner.borrow_mut().aio_return()
}
pub fn cancel(&self) -> nix::Result<aio::AioCancelStat> {
self.inner.borrow_mut().cancel()
}
pub fn error(&self) -> nix::Result<()> {
self.inner.borrow_mut().error()
}
pub fn fsync(&self, mode: AioFsyncMode) -> nix::Result<()> {
self.inner.borrow_mut().fsync(mode)
}
pub fn read(&self) -> nix::Result<()> {
self.inner.borrow_mut().read()
}
pub fn write(&self) -> nix::Result<()> {
self.inner.borrow_mut().write()
}
}
impl<'a> Evented for AioCb<'a> {
fn register(&self,
poll: &Poll,
token: Token,
events: Ready,
_: PollOpt) -> io::Result<()> {
assert!(UnixReady::from(events).is_aio());
let udata = usize::from(token);
let kq = poll.as_raw_fd();
let sigev = SigevNotify::SigevKevent{kq, udata: udata as isize};
self.inner.borrow_mut().set_sigev_notify(sigev);
Ok(())
}
fn reregister(&self,
poll: &Poll,
token: Token,
events: Ready,
opts: PollOpt) -> io::Result<()> {
self.register(poll, token, events, opts)
}
fn deregister(&self, _: &Poll) -> io::Result<()> {
let sigev = SigevNotify::SigevNone;
self.inner.borrow_mut().set_sigev_notify(sigev);
Ok(())
}
}
#[derive(Debug)]
pub struct LioCb<'a> {
inner: aio::LioCb<'a>,
sev: Cell<SigevNotify>
}
impl<'a> LioCb<'a> {
fn fix_submit_error(&mut self, e: nix::Result<()>) -> Result<(), LioError> {
match e {
Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)) |
Err(nix::Error::Sys(nix::errno::Errno::EIO)) |
Err(nix::Error::Sys(nix::errno::Errno::EINTR)) => {
let mut n_error = 0;
let mut n_einprogress = 0;
let mut n_eagain = 0;
let mut n_ok = 0;
let errors = (0..self.inner.aiocbs.len())
.map(|i| {
self.inner.error(i).map_err(|e| e.as_errno().unwrap())
}).collect::<Vec<_>>();
for (i, e) in errors.iter().enumerate() {
match e {
Ok(()) => {
n_ok += 1;
},
Err(Errno::EINPROGRESS) => {
n_einprogress += 1;
},
Err(Errno::EAGAIN) => {
n_eagain += 1;
},
Err(_) => {
let _ = self.inner.aio_return(i);
n_error += 1;
}
}
}
if n_error > 0 {
Err(LioError::EIO(errors))
} else if n_eagain > 0 && n_eagain < self.inner.aiocbs.len() {
Err(LioError::EINCOMPLETE)
} else if n_eagain == self.inner.aiocbs.len() {
Err(LioError::EAGAIN)
} else {
panic!("lio_listio returned EIO for unknown reasons. n_error={}, n_einprogress={}, n_eagain={}, and n_ok={}",
n_error, n_einprogress, n_eagain, n_ok);
}
},
Ok(()) => Ok(()),
_ => panic!("lio_listio returned unhandled error {:?}", e)
}
}
pub fn submit(&mut self) -> Result<(), LioError> {
let e = self.inner.listio(aio::LioMode::LIO_NOWAIT, self.sev.get());
self.fix_submit_error(e)
}
pub fn resubmit(&mut self) -> Result<(), LioError> {
let e = self.inner.listio_resubmit(aio::LioMode::LIO_NOWAIT, self.sev.get());
self.fix_submit_error(e)
}
pub fn emplace_mut_slice(&mut self, fd: RawFd, offset: u64,
buf: &'a mut [u8], prio: i32, opcode: LioOpcode) {
self.inner.emplace_mut_slice(fd, offset as off_t, buf,
prio as c_int, SigevNotify::SigevNone, opcode);
}
pub fn emplace_slice(&mut self, fd: RawFd, offset: u64,
buf: &'a [u8], prio: i32, opcode: LioOpcode) {
self.inner.emplace_slice(fd, offset as off_t, buf,
prio as c_int, SigevNotify::SigevNone, opcode);
}
pub fn into_results<F, R>(self, callback: F) -> R
where F: FnOnce(Box<dyn Iterator<Item=LioResult> + 'a>) -> R {
let mut inner = self.inner;
let iter = (0..inner.aiocbs.len()).map(move |i| {
let result = inner.aio_return(i);
LioResult{result}
});
callback(Box::new(iter))
}
pub fn with_capacity(capacity: usize) -> LioCb<'a> {
LioCb {
inner: aio::LioCb::with_capacity(capacity),
sev: Cell::new(SigevNotify::SigevNone)
}
}
}
impl<'a> Evented for LioCb<'a> {
fn register(&self,
poll: &Poll,
token: Token,
events: Ready,
_: PollOpt) -> io::Result<()> {
assert!(UnixReady::from(events).is_lio());
let udata = usize::from(token);
let kq = poll.as_raw_fd();
let sigev = SigevNotify::SigevKevent{kq, udata: udata as isize};
self.sev.set(sigev);
Ok(())
}
fn reregister(&self,
poll: &Poll,
token: Token,
events: Ready,
opts: PollOpt) -> io::Result<()> {
self.register(poll, token, events, opts)
}
fn deregister(&self, _: &Poll) -> io::Result<()> {
let sigev = SigevNotify::SigevNone;
self.sev.set(sigev);
Ok(())
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum LioError {
EAGAIN,
EINCOMPLETE,
EIO(Vec<Result<(), Errno>>)
}
impl LioError {
pub fn into_eio(self) -> Result<Vec<Result<(), Errno>>, Self> {
if let LioError::EIO(eio) = self {
Ok(eio)
} else {
Err(self)
}
}
}